Typical examples of Kafka-based real-time data processing applications include real-time analytics platforms, recommendation engines, and financial transaction systems. The performance of real-time streaming applications is measured in terms of the lag experienced by downstream applications in receiving data.
In Kafka's case, minimizing the lag between the Kafka producer and consumer requires careful tuning of deployment configurations.
This article explores Kafka lag, how to diagnose it, and best practices to reduce it.
Summary of key Kafka lag concepts
Kafka’s storage architecture handles data as an ordered log-based structure. Logs are partitioned, and partitions are replicated across multiple brokers to ensure fault tolerance. Developers tweak many parameters related to Kafka logs to balance performance and reliability.
What is Kafka lag?
Consumer lag is the difference between the last offset stored by the broker and the last committed offset for a specific partition.
Consumer configurations to optimize Kafka lag
Rate of consumption, partition assignment and message batch size.
Broker configurations to optimize Kafka lag
Rebalancing and IO performance and message batch size.
Producer configurations to optimize lag
Producer configurations related to throughput can lead to a lag in the Kafka cluster if consumer configurations are not in sync for the same strategy.
Understanding Kafka lag
At a high level, Kafka can be thought of as a distributed continuous commit log. Its working model consists of producers, consumers, and brokers.
Kafka producers are client applications that send messages or events.
Brokers store the messages durably and make them available in logical partitions while maintaining order.
Consumers are client applications that react to messages.
Kafka allows separating the producer messages into topics. Topics are logical groups of messages that represent common functionality. Engineers modularize application development by dividing messages into different categories to uniquely process them. Topic names must be distinct across the cluster.
Kafka uses the concept of partitions and consumer groups to enable horizontal scaling. Kafka partitions are subsets of data present in a topic. Partitions are distributed across the cluster. Consumer groups represent a collection of consumers that are assigned to a particular topic. Kafka ensures only one consumer from the group can read from a partition. This helps Kafka to maintain consumption order within a partition. That said, a single consumer can read from multiple partitions.
While processing messages, Kafka consumers keep track of the last position in the partition from where they last read the message. This position is known as the current offset or consumer offset in Kafka. The position corresponding to the last available message in a partition is called log-end offset. The difference between log-end-offset and current-offset is called consumer lag. Kafka lag is the difference between the last message produced by the producer and the last message consumed by the consumer group.
Reasons for Kafka lag
Several factors can increase Kafka lag.
Sudden traffic spike
The applications that send messages to Kafka brokers don't always behave in a consistent manner because of various external factors. For example, consider a network of IOT devices that sends messages whenever the temperature crosses a threshold value. If the external environment changes substantially, many such devices might send data at a larger-than-expected rate, clogging up the broker. Consumers are unable to catch up, which increases the Kafka lag.
Producer partition assignment issues
When a new message comes in, Kafka first determines the partition to which the message is written. The logic that it uses to determine the correct partition to write is called the producer partition assignment strategy. Kafka uses one of the predefined partition assignment strategies or a custom partition strategy to achieve this. The default partitioner uses the hash of the key to assign a partition. There are other partition assignment strategies like round-robin, sticky partition, and so on. Kafka also supports a custom partition strategy.
If the selected partition strategy is not in sync with the data distribution, Kafka users experience unbalanced partitions. Some partitions get too much data, causing bottlenecks and lag. For example, let’s say you select the default key hash-based partition but most messages use the same key—you may experience unbalanced partitions and hence abnormal lag.
Consumer partition assignment
Partition assignment strategies are responsible for ensuring that Kafka assigns consumers to partitions in an equitable manner. When new consumers join the group, Kafka rebalances the cluster and attempts to maintain optimal mapping as much as possible. However rebalance operations are expensive and may create some unavailability of consumers—leading to lag.
By default, Kafka uses the RangeAssignor as the partition assignment strategy for consumers. It prioritizes the colocation of partitions within the same topic by sorting consumers in alphabetical order of member_id and matching them against the sorted topic-partition combination. A drawback of this approach is that it is dependent on the order in which consumers are created. It can lead to skewed consumer partition assignment especially if there are more consumers than partitions.
Consumers often execute complex business logic as part of their processing routing. Some of the back-end code operations can be time-consuming and genuinely slow. Or they may be errors leading to infinite loops or inefficient algorithm implementation. Such instances will lead to consumers always being overwhelmed with messages. For example, imagine a consumer that accesses a third-party microservice to complete its task. If the microservice is slow to respond, that results in a slow consumer and an increased lag.
Monitoring Kafka lag
Kafka provides a default mechanism to monitor the lag of a cluster. The
kafka-consumer-groups.sh provides details about the lag for all partitions. One can use the below command to view the lag.
$KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server <> --describe --group <group_name>
Executing the above command in a running Kafka cluster provides an output similar to the one below.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
ub-kf test-topic 0 15 17 2 ub-kf-1/127.0.0.1
ub-kf test-topic 1 14 15 1 ub-kf-2/127.0.0.1
In the above output, one can see the current offset, log-end-offset, and the difference between them as lag. Other than the built-in scripts, there are open-source tools to monitor Kafka lag. Burrow is an example of an open-source monitoring tool for Kafka. One can also use log exporters like Kafka Lag Exporter or Kafka Exporter along with generic monitoring tools like Prometheus or Grafana to monitor Kafka lag.
A small constant lag value often exists in a healthy cluster and is nothing to worry about. However, a continuously increasing lag value or a sudden spike in lag value often indicates a problem.
Reducing Kafka lag
The easiest method to solve Kafka lag is to scale horizontally by adding more consumers. But merely adding consumers without adding partitions may result in idle consumers. Hence this decision requires updating the Kafka topic configurations as well. Another approach is to implement multi-threading within the consumer to improve its performance.
Message consumption rate
Kafka provides many configuration parameters to control the rate at which consumers pull messages from the broker. By tweaking these configurations, one can achieve higher throughput from consumers and thereby reduce lag.
fetch.max.bytes: This configuration controls the maximum amount of data returned by the server in a single request. Setting higher values ensures fewer requests, but higher lag.
fetch.min.bytes: This configuration controls the minimum amount of data returned by the server in a single request. Setting a higher value here leads to fewer requests but a higher lag.
max.partition.fetch.bytes: This configuration controls the maximum amount of data returned by the server per partition. Similar to the above configurations, setting higher values leads to higher lag.
fetch.max.wait.ms: This configuration represents the amount of time waited by the consumer to fill up a batch. Setting higher values results in lower request counts but higher lag.
The partition assignment strategy decides how Kafka consumers are mapped to various partitions. The default partition assignment strategy is the RangeAssignor. It relies on sorting the consumers according to the member_id assigned to them at the start. The member_id value assigned is influenced by the order in which consumers are started. Hence these algorithms often result in sub-optimal consumer-partition mapping that does not result in the lowest lag.
Instead, Kafka provides three other partition assignment strategies that you can use to optimize partition mapping. They are RoundRobinAssignor, StickyAssignor, and CooperativeStickyAssignor.
The RoundRobinAssignor ensures equal distribution of partitions among consumers but relies on the sort order of member_id just like the RangeAssignor. Hence it also results in sub-optimal consumer mapping.
The StickyAssignor preserves the existing mappings whenever new consumers are created. This helps in minimizing rebalance operations.
The CooperativeStickyAssignor is similar to StickyAssignor but allows consumers unaffected by rebalancing to process messages, hence playing a part in minimizing lag.
partition.assignment.strategy controls the partition assignment strategy. To change the partition assignment strategy to CooperativeStickyAssignor, use the below snippet.
partition.assignment.strategy = org.apache.kafka.clients.consumer.CooperativeStickyAssignor
An alternative is to implement a custom partition assignment strategy that considers the consumer group lag while assigning partitions. Kafka’s own developer community is working on an implementation here.
A Kafka broker’s performance becomes a bottleneck in achieving the least possible lag in a Kafka system. Kafka provides several configuration options to finetune the broker. For example, Kafka broker exposes a configuration called
group.initial.rebalance.delay.ms to control the group rebalancing behavior. The parameter defines the time the broker waits for new consumers to join before initiating a rebalance. Setting an optimal value that does not leave out any slow-starting consumers before rebalancing starts helps to reduce rebalancing operations.
Kafka also exposes two parameters to control the number of threads used in message processing. The parameter
num.network.thread defines the number of network threads that are used for requests from client applications. If there are many simultaneous requests, one should consider increasing this value so that consumers are not starved of messages due to network congestion. The parameter
num.io.threads controls the number of threads for input and output operations. The value must be at least equal to the number of processors in the instance.
Higher batch size for messages often results in increasing consumer lag because consumers get a large number of messages at once when batch requests come in. The broker parameter
message.max.bytes defines the maximum message size that the broker can support. Setting this to a lower value that the consumers can easily handle helps in reducing lag.
Kafka’s default partition assignment uses the hash of the message key to decide the partition to which data is appended. This approach creates problems if the keys in the data are not uniformly distributed. This can result in some partitions being overloaded with messages and lead to a lag in the cluster.
partitioner.class controls the partition strategy for producers. One can use the RoundRobin partition or implement a custom partition to ensure that messages are evenly distributed across partitions. Kafka also provides a mechanism called adaptive partitioning to direct messages to faster partitions. If the configuration
partitioner.adaptive.partitioning.enable is set to true, Kafka pushes messages to partitions that are faster.
Producers that send large message batches often create a lag at the consumer end. To reduce the lag, engineers must finetune the values for
linger.ms parameters that impact the size of the batches. The
batch.size parameter controls the number of messages in the batch and
linger.ms defines the amount of time Kafka waits to fill up a batch.
Compression of messages from producers can have an adverse effect on the lag. If the messages are compressed, there is an overhead on the consumer to decompress them before processing. To get compression to work without adverse effects on lag, one must tune the batch size carefully.
Join a global community of developers 🚀
Sub: Chat with our engineers and connect with fellow data streamers!
Best practices to minimize Kafka lag
The following section provides a set of best practices to achieve a cluster with minimum lag.
Use the correct producer partitioner class
The default partitioner class has a tendency to create lag in the Kafka cluster by overloading partitions if the message keys are not controlled closely. Using the RoundRobinPartitioner or custom partitioner helps here.
Enable adaptive partitioning
Adaptive partitioning involves sending more messages to partitions that are faster. Kafka decides the speed of partitions by finding the number of records per batch. Partitions with a higher number of records per batch are considered slower.
Stick to lower producer batch size
Lower producer batch sizes help consumers to process the data and commit the offsets quicker. This has a direct impact on the lag in a Kafka cluster. Since reducing batch size also affects the throughput of the producer, this needs to be done considering factors on the producer side as well.
Use CooperativeStickyAssignor for consumer partition assignment
The cooperative sticky assignor serves two purposes - It helps to reduce rebalance operations. While in a rebalancing operation, it lets consumers unaffected by rebalancing continue. Both these aspects help in controlling the lag.
Finetune consumer batching configurations
Setting higher values for consumer fetch configurations will help in creating a larger batch for consumers and increase the throughput. However, a higher consumer throughput will not always result in lower lag. If the consumers wait too long to fill the batches, the lag will go up. Hence batch configurations must be followed up with an optimal value for ‘fetch.max.wait.ms’.
Have a unified strategy for tuning producers, brokers, consumers
Kafka’s performance is closely linked with producer, broker, and consumer performance. A producer can be tuned for very high throughput by using large batch sizes. But this will have an impact on consumer performance and lag in the cluster. Hence a unified strategy must be adopted to balance latency and throughput. This strategy must then be followed up with producer, broker, and consumer configurations that are synchronized with each other.
Optimize hardware-related factors
Getting the best performance from a Kafka cluster is not only about tuning the parameters but also ensuring that the cluster has the most optimal hardware needed to achieve the targeted performance metrics. The processing power of the server, speed of storage devices, operating system configurations, and network card performance play a critical role in achieving minimal lag.
A small amount of lag is present in any real-time data processing system. But a cluster with a high amount of lag defeats the purpose of having a real-time processing system. Hence lag is considered a key performance indicator in Kafka clusters. A continuously increasing lag or a spike in lag almost always flags up an unhealthy cluster.
In the case of Kafka, controlling lag requires playing around with the configuration parameters of the producer, broker, and consumers. It is best to implement a cohesive strategy that looks at several parameters for your specific use case.