Understanding Apache Kafka
Kafka consumer config
Apache Kafka® is a distributed streaming platform for building real-time data pipelines and streaming applications. It’s a high-throughput, low-latency platform that can handle millions of messages per second. Kafka is often used as a message broker, allowing different software systems to communicate by sending (producing) and receiving (consuming) messages.
This article will focus on configuring Kafka consumers. It’s important to properly set up and manage consumers to ensure the efficient and reliable processing of messages. We will cover essential configuration parameters, tips for optimizing consumers and avoiding pitfalls, and security and engineering best practices.
Summary of key Kafka consumer configurations
There are many different configurations that you can provide to a Kafka consumer, but the default values work for most use cases. We’ll highlight a few that you’ll want to make sure to set properly.
Configuring a Kafka consumer
There are client libraries for Kafka in most major programming languages. You can look here for the list of the official clients. Though there might be some language-specific syntactical differences, the process is the same among them. A common practice is to set the configuration properties using a config file rather than using environment variables, as the properties are in dot case and aren’t valid environment variable names (dots aren’t allowed in environment variable names).
Here’s an example config file:
# Filename: kafka-consumer.properties
# Connection settings
bootstrap.servers=192.168.1.50:9092,192.168.1.51:9092
security.protocol=SSL
ssl.truststore.location=/path/to/truststore.jks
ssl.truststore.password=truststore-password
ssl.keystore.location=/path/to/keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
# Consumer settings
enable.auto.commit=true
auto.commit.interval.ms=5000
auto.offset.reset=latest
heartbeat.interval.ms=3000
max.poll.records=100
max.poll.interval.ms=300000
session.timeout.ms=30000
# Consumer group settings
group.id=my-consumer-group
group.min.session.timeout.ms=6000
group.max.session.timeout.ms=30000
Using that config file to create a Kafka consumer using the Java client library might look something like this:
Properties props = new Properties();
try {
props.load(new FileInputStream("kafka-consumer.properties"));
} catch (IOException e) {
// Handle exception
}
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Here’s an example using Python:
import configparser
config = configparser.ConfigParser()
config.read('kafka-consumer.properties')
props = {
'bootstrap.servers': config.get('consumer', 'bootstrap.servers'),
'group.id': config.get('consumer', 'group.id'),
'auto.offset.reset': config.get('consumer', 'auto.offset.reset'),
'max.poll.records': int(config.get('consumer', 'max.poll.records'))
}
consumer = KafkaConsumer(props)
[CTA_MODULE]
Key properties
bootstrap.servers
This is a comma-separated list of host:port pairs that the Kafka client will use to establish initial connections to the Kafka cluster. To allow for failover and high availability, it’s important to provide at least two Kafka brokers.
group.id
The group.id is a unique identifier that specifies the consumer group to which a particular consumer belongs. Each consumer group has a unique consumer offset representing the point in the topic where the consumer group is currently reading. When a consumer starts consuming messages from a topic, it’ll use the group.id to determine the consumer offset from which to start consuming.
The group.id is used to ensure that all consumers within the same group are reading from the same consumer offset and to allow consumers to automatically recover from failures by picking up where they left off. Accordingly, it’s essential to choose a unique group.id for each consumer group to avoid conflicts with other consumer groups.
enable.auto.commit
The enable.auto.commit configuration option determines whether the consumer should automatically commit offsets to the Kafka broker at a set interval.
If enable.auto.commit is true, the consumer will commit offsets automatically at the configured interval. If enable.auto.commit is false, the consumer won’t commit offsets automatically and must manually commit offsets using the consumer’s commitSync() method.
Auto-commit can be useful when the consumer doesn’t need to perform any additional processing on the messages after they’ve been consumed. This allows the consumer to automatically commit offsets and move on to the next batch of messages, improving performance and reducing the risk of message duplication.
Disabling auto-commit can be useful when the consumer needs to perform additional processing on the messages after they have been consumed, such as storing them in a database or forwarding them to another system. Disabling auto-commit allows the consumer to commit offsets only after the additional processing has been completed, ensuring that messages aren’t lost in the event of failure.
auto.offset.reset
The auto.offset.reset configuration option determines what action the consumer should take if there are no committed offsets for the consumer group or if the consumer is unable to connect to the last committed offset. The options for this setting are “latest,” which means the consumer will start consuming messages from the most recent message in the topic; and “earliest,” which means the consumer will start consuming messages from the beginning of the topic.
The auto.offset.reset setting is useful for ensuring that the consumer can start consuming messages from a known point in the topic. This is true even if it hasn’t previously consumed from the topic or has been unable to connect to the last committed offset.
heartbeat.interval.ms
The heartbeat.interval.ms configuration option specifies the interval at which the consumer will send heartbeats to the Kafka broker to indicate that it’s still active. Heartbeats are used to prevent the consumer group coordinator from marking the consumer as failed due to inactivity.
Suppose the consumer doesn’t send a heartbeat within the configured interval. In that case, the consumer group coordinator will assume that the consumer has failed and will trigger a rebalance of the consumer group.
The heartbeat.interval.ms setting is important because it determines how quickly the consumer group coordinator will detect a failure and trigger a rebalance. A shorter interval will result in faster detection of failures but may also increase the load on the consumer group coordinator. Worse, it may result in an unwanted rebalancing of partition assignments among the consumers in the group, and consumption is blocked during rebalancing. Conversely, a longer interval may result in slower detection of failures but might reduce the load on the consumer group coordinator.
max.poll.records
The max.poll.records configuration option specifies the maximum number of records the consumer will retrieve in a single call to the poll() method. This method is used to retrieve messages from the Kafka broker, and the max.poll.records setting determines the maximum number of messages returned in a single call.
The max.poll.records setting is important because it determines the maximum number of messages the consumer will process in a single batch. A larger value for max.poll.records may result in higher throughput but may also increase the risk of message duplication in the event of a failure. Conversely, a smaller value may result in lower throughput but could also reduce the risk of message duplication.
max.poll.interval.ms
This configuration sets the maximum time, in milliseconds, that a Kafka consumer can go without polling the Kafka cluster for new messages. If a consumer goes longer than the specified time without polling, it will be considered as failed by the Kafka cluster, and a rebalance will be triggered.
When setting the value of max.poll.interval.ms, it’s important to consider the expected rate of message consumption for your use case. If messages are consumed quickly, a lower value for max.poll.interval.ms can be set. However, if messages are consumed more slowly, or if the consumer may need to perform additional processing on messages before polling again, a higher value for max.poll.interval.ms may be necessary to prevent unwanted rebalances.
It’s also important to consider how often the consumer polls with relation to the rate of message production. Having a value that is too high may increase the latency of messages being consumed.
We recommend testing different values for max.poll.interval.ms and monitoring the performance of your Kafka consumer to determine the best value for your use case.
[CTA_MODULE]
Common use cases
Now that we’ve covered the theory side of various configurations, let’s briefly look at some common situations where configurations might be particularly important.
Event-driven architectures
In an event-driven architecture, consumers are microservices that need to be highly responsive to each event. If you start them up and Kafka topic lag has built up, it may take a while before recent events are processed, and you will see this lag percolate out through your system. You may want to set auto.offset.reset to “latest” to ensure that the consumers are immediately available for new messages upon startup. Conversely, if you want the full event log to be processed even if the events were emitted while consumers were down, you need to be sure that this value is set to “earliest.”
Real-time stream processing
When your system needs nearly instantaneous insights or action from streaming data, a lower heartbeat.interval.ms value ensures that consumers are responsive and unhealthy consumers are caught quickly. Additionally, increasing the number of partitions in the topic will allow you to scale the consumer much higher to ensure that consumers don’t build up lag and latency.
Batch data ingestion
Batch data processing is often used to collect streams of data into batches of an optimized size and put them into files that can be uploaded and queried later. In these scenarios, many files with few records is undesirable because of the overhead of opening each file. To account for this, you should prefer larger max.poll.records and max.poll.interval.ms values to ensure that large batches can be processed without triggering rebalances.
Recommendations
Once you’ve started your consumers, you will likely begin looking for the optimal number of consumers so that they can keep up with producers. If consumers are falling behind, it may seem sensible to scale the number of consumers up. If you try this, initially, you will see increased consumption, but soon you will see it taper off. Why?
Kafka divides topics into partitions. Each partition is an ordered, immutable sequence of messages stored on a broker. The number of partitions in a topic can be configured when the topic is created and can be increased or decreased over time as needed.
When a consumer group consumes from a topic, the topic’s partitions are assigned to the consumers in the group. When the group membership changes due to a consumer failure or a new consumer joining the group, the group coordinator sends an assignment request to each consumer. This assignment request includes a list of the partitions that the consumer will read from. The consumer then begins consuming from the assigned partitions. By default, partition assignments are distributed using a round-robin algorithm to evenly distribute the workload among the consumers and ensure that each consumer has a unique partition to consume from.
By dividing topics into partitions and assigning those partitions to consumers, Kafka can distribute the workload of consuming and processing messages among multiple consumers and scale the processing of messages horizontally. This allows Kafka to support high levels of message throughput and enables real-time processing of large volumes of data.
However, if you scale your consumers beyond the number of partitions in the topics from which they’re consuming, your consumers will just sit idle, wasting resources.
For example, if you have four partitions in your topic, and you scale the number of consumers to five, one consumer will be left without an assigned partition and will be sitting idle, as shown in the following illustration.
If you need to continue scaling consumers beyond the number of partitions, increase the partition count for the topic on the Kafka broker and then scale your consumers. It’s good practice to give yourself some headroom to scale up when needed without increasing the number of partitions, but keep in mind that the maximum number of partitions you can create is limited by the resources the broker has (CPU cores, memory, and disk space). Another optimization is to have the number of consumers be a factor of the number of partitions so that the work is evenly distributed. See below for an example of an optimized partition and consumer count configuration.
Conclusion
Configuring a Kafka consumer involves setting some key properties that control the behavior and performance of the consumer. These properties determine the consumer group to which the consumer belongs, control whether the consumer automatically commits offsets, determine how the consumer handles starting from an unknown offset, control the interval at which the consumer sends heartbeats to the group coordinator, and dictate the maximum number of records that the consumer will retrieve in a single poll.
We hope this guide has helped provide a clear and concise overview of the key concepts and considerations for configuring and running Kafka consumers. We wish you the best of luck in following this guide and configuring and running your own Kafka consumers.
[CTA_MODULE]