Kafka

Metrics, Dashboards, Alerts and more for Kafka Integration in Sysdig Monitor.
Kafka

This integration is enabled by default.

Versions supported: > v2.7.x

This integration uses a standalone exporter that is available in UBI or scratch base image.

This integration has 37 metrics.

Timeseries generated: The JMX-Exporter generates ~270 timeseries and the Kafka-Exporter ~138 timeseries (the number of topics, partitions and consumers increases this number).

List of Alerts

AlertDescriptionFormat
[Kafka] Broker DownThere are less Kafka brokers up than expected. The ‘workload’ label of the Kafka Deployment/Stateful set must be specified.Prometheus
[Kafka] No LeaderThere is no ActiveController or ’leader’ in the Kafka cluster.Prometheus
[Kafka] Too Many LeadersThere is more than one ActiveController or ’leader’ in the Kafka cluster.Prometheus
[Kafka] Offline PartitionsThere are one or more Offline Partitions. These partitions don’t have an active leader and are hence not writable or readable.Prometheus
[Kafka] Under Replicated PartitionsThere are one or more Under Replicated Partitions.Prometheus
[Kafka] Under In-Sync Replicated PartitionsThere are one or more Under In-Sync Replicated Partitions. These partitions will be unavailable to producers who use ‘acks=all’.Prometheus
[Kafka] ConsumerGroup Lag Not DecreasingThe ConsumerGroup lag is not decreasing. The Consumers might be down, failing to process the messages and continuously retrying, or their consumption rate is lower than the production rate of messages.Prometheus
[Kafka] ConsumerGroup Without MembersThe ConsumerGroup doesn’t have any members.Prometheus
[Kafka] Producer High ThrottleTime By Client-IdThe Producer has reached its quota and has high throttle time. Applicable when Client-Id-only quotas are being used.Prometheus
[Kafka] Producer High ThrottleTime By UserThe Producer has reached its quota and has high throttle time. Applicable when User-only quotas are being used.Prometheus
[Kafka] Producer High ThrottleTime By User And Client-IdThe Producer has reached its quota and has high throttle time. Applicable when Client-Id + User quotas are being used.Prometheus
[Kafka] Consumer High ThrottleTime By Client-IdThe Consumer has reached its quota and has high throttle time. Applicable when Client-Id-only quotas are being used.Prometheus
[Kafka] Consumer High ThrottleTime By UserThe Consumer has reached its quota and has high throttle time. Applicable when User-only quotas are being used.Prometheus
[Kafka] Consumer High ThrottleTime By User And Client-IdThe Consumer has reached its quota and has high throttle time. Applicable when Client-Id + User quotas are being used.Prometheus

List of Dashboards

Kafka

The dashboard provides information on the status of Kafka. Kafka

List of Metrics

Metric name
kafka_brokers
kafka_consumergroup_current_offset
kafka_consumergroup_lag
kafka_consumergroup_members
kafka_controller_active_controller
kafka_controller_offline_partitions
kafka_log_size
kafka_network_consumer_request_time_milliseconds
kafka_network_fetch_follower_time_milliseconds
kafka_network_producer_request_time_milliseconds
kafka_server_bytes_in
kafka_server_bytes_out
kafka_server_consumer_client_byterate
kafka_server_consumer_client_throttle_time
kafka_server_consumer_user_byterate
kafka_server_consumer_user_client_byterate
kafka_server_consumer_user_client_throttle_time
kafka_server_consumer_user_throttle_time
kafka_server_messages_in
kafka_server_partition_leader_count
kafka_server_producer_client_byterate
kafka_server_producer_client_throttle_time
kafka_server_producer_user_byterate
kafka_server_producer_user_client_byterate
kafka_server_producer_user_client_throttle_time
kafka_server_producer_user_throttle_time
kafka_server_under_isr_partitions
kafka_server_under_replicated_partitions
kafka_server_zookeeper_auth_failures
kafka_server_zookeeper_disconnections
kafka_server_zookeeper_expired_sessions
kafka_server_zookeeper_read_only_connections
kafka_server_zookeeper_sasl_authentications
kafka_server_zookeeper_sync_connections
kafka_topic_partition_current_offset
kafka_topic_partition_oldest_offset
kube_workload_status_desired

Prerequisites

Installation of the JMX-Exporter as a Sidecar

The JMX-Exporter can be easily installed in two steps.

First deploy the ConfigMap which contains the Kafka JMX configurations. The following example is for a Kafka cluster which exposes the jmx port 9010:

helm repo add promcat-charts https://sysdiglabs.github.io/integrations-charts 
helm repo update
helm -n kafka install kafka-jmx-exporter promcat-charts/jmx-exporter --set jmx_port=9010 --set integrationType=kafka --set onlyCreateJMXConfigMap=true

Then generate a patch file and apply it to your workload (your Kafka Deployment/StatefulSet/Daemonset). The following example is for a Kafka cluster which exposes the jmx port 9010, and is deployed as a StatefulSet called ‘kafka-cp-kafka’:

helm template kafka-jmx-exporter promcat-charts/jmx-exporter --set jmx_port=9010 --set integrationType=kafka --set onlyCreateSidecarPatch=true --set sysdigAnnotations=true > jmx-exporter-sidecar-patch.yaml
kubectl -n kafka patch sts kafka-cp-kafka --patch-file jmx-exporter-sidecar-patch.yaml

Create Secrets for Authentication for the Kafka-Exporter

Your Kafka cluster external endpoints might be secured by using authentication for the clients that want to connect to it (TLS, SASL+SCARM, SASL+Kerberos). If you are going to make the Kafka-Exporter (which will be deployed in the next tab) use these secured external endpoints, then you’ll need to create Kubernetes Secrets in the following step. If you prefer using an internal not-secured (plaintext) endpoint for the Kafka-Exporter to connect to the Kafka cluster, then skip this step.

If using TLS, you’ll need to create a Secret which contains the CA, the client certificate and the client key. The names of these files must be “ca.crt”, “tls.crt” and “tls.key”. The name of the secret can be any name that you want. Example:

kubectl create secret generic kafka-exporter-certs --from-file=./tls.key --from-file=./tls.crt --from-file=./ca.crt --dry-run=true -o yaml | kubectl apply -f -

If using SASL+SCRAM, you’ll need to create a Secret which contains the “username” and “password”. Example:

echo -n 'admin' > username
echo -n '1f2d1e2e67df' > password
kubectl create secret generic kafka-exporter-sasl-scram --from-file=username --from-file=password --dry-run=true -o yaml | kubectl apply -f -

If using SASL+Kerberos, you’ll need to create a Secret which contains the “kerberos.conf”. If the ‘Kerberos Auth Type’ is ‘keytabAuth’, it should also contain the “kerberos.keytab”. Example:

kubectl create secret generic kafka-exporter-sasl-kerberos --from-file=./kerberos.conf --from-file=./kerberos.keytab --dry-run=true -o yaml | kubectl apply -f -

Installation

An automated wizard is present in the Monitoring Integrations in Sysdig Monitor. Expert users can also use the Helm charts for installation:

Monitoring and Troubleshooting Kafka

Here are some interesting metrics and queries to monitor and troubleshoot Kafka.

Brokers

Broker Down

Let’s get the number of expected Brokers, and the actual number of Brokers up and running. If the number is not the same, then there might a problem.

sum by (kube_cluster_name,kube_namespace_name, kube_workload_name)(kube_workload_status_desired)
-
sum by (kube_cluster_name,kube_namespace_name, kube_workload_name)(kafka_brokers)
> 0

Leadership

Let’s get the number of Kafka leaders. There should always be one leader. If not, a Kafka misconfiguration or a networking issue might be the problem.

sum(kafka_controller_active_controller) < 1

If there are more than one leader, that might be a temporal situation while the leadership is changing. If this case doesn’t get fixed by itslef over time, a split-brain situation might be happening.

sum(kafka_controller_active_controller) > 1

Offline, Under Replicated and In-Sync Under Replicated Partitions

When a Broker goes down, the other Brokers in the cluster will take leadership of the partitions it was leading. If several brokers go down, or just a few but the topic had a low replication factor, there will be Offline partitions. These partitions don’t have an active leader and are hence not writable or readable, which will most likely dangerous for business.

Let’s check if there are offline partitions:

sum(kafka_controller_offline_partitions) > 0

If other Brokers had replicas of those partitions, one of them will take leadership and the service won’t be down. In this situation there will be Under Replicated partitions. If there are enough Brokers where these partitions can be replicated, the situation will be fixed by itself over time. If there aren’t enough Brokers, the situation will only be fixed once the Brokers which went down come up again.

The following expression is used to get the under replication partitions:

sum(kafka_server_under_replicated_partitions) > 0

But there is a situation when having no Offline partitons but having Under Replicated partitions might pose a real problem. That’s the case of topics with ‘Minimum In-Sync Replicas’, and Kafka Producers with the configuration ‘acks=all’.

If one of this topics has any partition with less replicas than its ‘Minimum In-Sync Replicas’ configuration, and there is Producer with ‘acks=all’, that Producer won’t be able to produce messages into that partition, since ‘acks=all’ means that it waits for the produced messages to be replicated in all the minimum replicas in the Kafka cluster.

If the Producers have any configuration different than ‘acks=all’, then there won’t be any problem.

This is how Under In-Sync Replicated partitions can be checked:

sum(kafka_server_under_isr_partitions) > 0

Network

Broker Bytes In

Let’s get the amount of bytes produced into each Broker:

sum by (kube_cluster_name, kube_namespace_name, kube_workload_name, kube_pod_name)(kafka_server_bytes_in)

Broker Bytes Out

Now the same, but for bytes consumed from each Broker:

sum by (kube_cluster_name, kube_namespace_name, kube_workload_name, kube_pod_name)(kafka_server_bytes_out)

Broker Messages In

And similar, but for number of messages produced into each Broker:

sum by (kube_cluster_name, kube_namespace_name, kube_workload_name, kube_pod_name)(kafka_server_messages_in)

Topics

Topic Size

This query returns the size of a topic in the whole Kafka cluster. It also includes the size of all replicas, so increasing the replication factor of a topic will increase the overall size across the Kafka cluster.

sum by(kube_cluster_name, kube_namespace_name, kube_workload_name, topic)(kafka_log_size)

In case of needing the size of a topic in each Broker, use the following query:

sum by(kube_cluster_name, kube_namespace_name, kube_workload_name, kube_pod_name)(kafka_log_size)

In a situation where the Broker disk space is running low, the retention of the topics can be decreased to free up some space. Let’s get the top 10 biggest topics:

topk(10,sum by(kube_cluster_name, kube_namespace_name, kube_workload_name, topic)(kafka_log_size))

If this “low disk space” situation happened out of the blue, there might be a problem in a topic with a Producer filling it with unwanted messages. The following query will help find which topics increased their size the most in the past few hours, which will allow to find the responsible of the sudden increase of messages. It wouldn’t be the first time an exhausted developer wanted to perform a stress test in a topic in a Staging environment, but accidentally did it in Production.

topk(10,sum by(kube_cluster_name, kube_namespace_name, kube_workload_name, topic)(delta(kafka_log_size[$__interval])))

Topic Messages

Calculating the number of messages inside a topic is as easy as substracting the offset of the newest message minus the offset of the oldest message:

sum by(kube_cluster_name, kube_namespace_name, kube_workload_name, topic)(kafka_topic_partition_current_offset) - sum by(kube_cluster_name, kube_namespace_name, kube_workload_name, topic)(kafka_topic_partition_oldest_offset)

But it’s very important to acknowledge that this is only true for topics with ‘compaction’ disabled, since compacted topics might have deleted messages in the middle. To get the number of messages in a compacted topic, a new Consumer must consume all the messages in that topic to count them.

It’s also quite easy to calculate the rate per second of messages being produced into a topic:

sum by(kube_cluster_name, kube_namespace_name, kube_workload_name, topic)(rate(kafka_topic_partition_current_offset[$__interval]))

ConsumerGroup

ConsumerGroup Lag

Let’s check the ConsumerGroup lag of a Consumer in each partition of a topic:

sum by(kube_cluster_name, kube_namespace_name, kube_workload_name, consumergroup, topic, partition)(kafka_consumergroup_lag)

If the lag of a ConsumerGroup is constantly increasing and never decreases, it might have different causes. The Consumers of the ConsumerGroups might be down, one of them might be failing to process the messages and continuously retrying, or their consumption rate might be lower than the production rate of messages.

A non-stop increasing lag can be detected using the following expression:

(sum by(kube_cluster_name, kube_namespace_name, kube_workload_name, consumergroup, topic)(kafka_consumergroup_lag) > 0)
and
(sum by(kube_cluster_name, kube_namespace_name, kube_workload_name, consumergroup, topic)(delta(kafka_consumergroup_lag[2m])) >= 0)

ConsumerGroup Consumption Rate

It might be useful to get the consumption speed of the Consumers of a ConsumerGroup, to detect any issues while processing messages, like internal issues related to the messages, or external issues related to the business. For example, the Consumers might want to send the processed messages to another microservice or another database, but there might be networking issues, or the database performance might be degraded so it slows down the Consumer.

Here you can check the consumption rate:

sum by(kube_cluster_name, kube_namespace_name, kube_workload_name, consumergroup, topic, partition)(rate(kafka_consumergroup_current_offset[$__interval]))

ConsumerGroup Members

It might be also help to know the number of Consumers in a ConsumerGroup, in case there are less than expected:

sum by(kube_cluster_name, kube_namespace_name, kube_workload_name, consumergroup)(kafka_consumergroup_members)

Quotas

Kafka has the option to enforce quotas on requests to control the Broker resources used by clients (Producers and Consumers).

Quotas can be applied to user, client-id or both groups at the same time.

Each client can utilize this quota per Broker before it gets throttled. Throttling means that the client will need to wait some time before being able to produce or consume messages again.

Production/Consumption Rate

Depending if the client is a Consumer or a Producer, or if the quota is applied at cliend-id or user level, or both at the same time, a different metric will be used:

  • kafka_server_producer_client_byterate
  • kafka_server_producer_user_byterate
  • kafka_server_producer_user_client_byterate
  • kafka_server_consumer_client_byterate
  • kafka_server_consumer_user_byterate
  • kafka_server_consumer_user_client_byterate

Let’s check for example the production rate of a Producer using both user and client-id:

sum by(kube_cluster_name, kube_namespace_name, kube_workload_name, kube_pod_name, user, client_id)(kafka_server_producer_user_client_byterate)

Production/Consumption Throttle Time

Similar to the rate, there are throttle time for the same combinations of clients and quota groups:

  • kafka_server_producer_client_throttle_time
  • kafka_server_producer_user_throttle_time
  • kafka_server_producer_user_client_throttle_time
  • kafka_server_consumer_client_throttle_time
  • kafka_server_consumer_user_throttle_time
  • kafka_server_consumer_user_client_throttle_time

Let’s see in this case if the throtte time of a Consumer using user and client-id is higher than one second, at least in one Broker:

max by(kube_cluster_name, kube_namespace_name, kube_workload_name, user, client_id)(kafka_server_consumer_user_client_throttle_time) > 1000

Agent Configuration

The default agent jobs for this integration are as follows:

- job_name: 'kafka-exporter-default'
  tls_config:
    insecure_skip_verify: true
  kubernetes_sd_configs:
  - role: pod
  relabel_configs:
  - action: keep
    source_labels: [__meta_kubernetes_pod_host_ip]
    regex: __HOSTIPS__
  - action: drop
    source_labels: [__meta_kubernetes_pod_annotation_promcat_sysdig_com_omit]
    regex: true
  - source_labels: [__meta_kubernetes_pod_phase]
    action: keep
    regex: Running
  - action: replace
    source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scheme]
    target_label: __scheme__
    regex: (https?)
  - action: replace
    source_labels:
    - __meta_kubernetes_pod_container_name
    - __meta_kubernetes_pod_annotation_promcat_sysdig_com_integration_type
    regex: (kafka-exporter);(.{0}$)
    replacement: kafka
    target_label: __meta_kubernetes_pod_annotation_promcat_sysdig_com_integration_type
  - action: keep
    source_labels:
    - __meta_kubernetes_pod_container_name
    - __meta_kubernetes_pod_annotation_promcat_sysdig_com_integration_type
    regex: (kafka-exporter);(kafka)
  - action: replace
    source_labels: [__meta_kubernetes_pod_annotation_promcat_sysdig_com_target_ns]
    target_label: kube_namespace_name
  - action: replace
    source_labels: [__meta_kubernetes_pod_annotation_promcat_sysdig_com_target_workload_type]
    target_label: kube_workload_type
  - action: replace
    source_labels: [__meta_kubernetes_pod_annotation_promcat_sysdig_com_target_workload_name]
    target_label: kube_workload_name
  - action: replace
    replacement: true
    target_label: sysdig_omit_source
  - action: replace
    source_labels: [__meta_kubernetes_pod_uid]
    target_label: sysdig_k8s_pod_uid
  - action: replace
    source_labels: [__meta_kubernetes_pod_container_name]
    target_label: sysdig_k8s_pod_container_name
  metric_relabel_configs:
    - source_labels: [__name__]
      regex: (kafka_brokers|kafka_consumergroup_current_offset|kafka_consumergroup_lag|kafka_consumergroup_members|kafka_topic_partition_current_offset|kafka_topic_partition_oldest_offset|kube_workload_status_desired)
      action: keep
- job_name: 'kafka-jmx-default'
  tls_config:
    insecure_skip_verify: true
  kubernetes_sd_configs:
  - role: pod
  relabel_configs:
  - action: keep
    source_labels: [__meta_kubernetes_pod_host_ip]
    regex: __HOSTIPS__
  - action: drop
    source_labels: [__meta_kubernetes_pod_annotation_promcat_sysdig_com_omit]
    regex: true
  - source_labels: [__meta_kubernetes_pod_phase]
    action: keep
    regex: Running
  - action: replace
    source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scheme]
    target_label: __scheme__
    regex: (https?)
  - action: replace
    source_labels:
    - __meta_kubernetes_pod_container_name
    - __meta_kubernetes_pod_annotation_promcat_sysdig_com_integration_type
    regex: (kafka-jmx-exporter);(kafka)
    replacement: kafka
    target_label: __meta_kubernetes_pod_annotation_promcat_sysdig_com_integration_type
  - action: keep
    source_labels:
    - __meta_kubernetes_pod_container_name
    - __meta_kubernetes_pod_annotation_promcat_sysdig_com_integration_type
    regex: (kafka-jmx-exporter);(kafka)
  - action: replace
    source_labels: [__meta_kubernetes_pod_uid]
    target_label: sysdig_k8s_pod_uid
  - action: replace
    source_labels: [__meta_kubernetes_pod_container_name]
    target_label: sysdig_k8s_pod_container_name
  metric_relabel_configs:
    - source_labels: [__name__]
      regex: (kafka_controller_active_controller|kafka_controller_offline_partitions|kafka_log_size|kafka_network_consumer_request_time_milliseconds|kafka_network_fetch_follower_time_milliseconds|kafka_network_producer_request_time_milliseconds|kafka_server_bytes_in|kafka_server_bytes_out|kafka_server_consumer_client_byterate|kafka_server_consumer_client_throttle_time|kafka_server_consumer_user_byterate|kafka_server_consumer_user_client_byterate|kafka_server_consumer_user_client_throttle_time|kafka_server_consumer_user_throttle_time|kafka_server_messages_in|kafka_server_partition_leader_count|kafka_server_producer_client_byterate|kafka_server_producer_client_throttle_time|kafka_server_producer_user_byterate|kafka_server_producer_user_client_byterate|kafka_server_producer_user_client_throttle_time|kafka_server_producer_user_throttle_time|kafka_server_under_isr_partitions|kafka_server_under_replicated_partitions|kafka_server_zookeeper_auth_failures|kafka_server_zookeeper_disconnections|kafka_server_zookeeper_expired_sessions|kafka_server_zookeeper_read_only_connections|kafka_server_zookeeper_sasl_authentications|kafka_server_zookeeper_sync_connections)
      action: keep