Kafka architecture—a deep dive

Kafka client

Apache Kafka® is a distributed streaming platform that enables you to publish, subscribe to, process, and store streams of records in a fault-tolerant, durable manner. It is designed for high-volume event streaming applications and is a backbone for many event-driven systems, enabling decoupled services to communicate through real-time data streams.

Kafka clients are libraries that allow applications to interact with a Kafka broker. They enable the production of messages to topics, the consumption of messages from topics, and performing administrative operations, among other tasks. They bridge your applications with Kafka so you can integrate Kafka's streaming capabilities into other software systems.

This article examines Kafka client APIs, common settings, and SDK and configuration best practices.

Kafka client overview
Kafka client overview

Summary of key Kafka client concepts

ConceptDescription
Kafka clientsKafka clients are the software components responsible for interacting with the Kafka broker.
Kafka client APIsKafka clients send messages to and receive messages from a Kafka broker via APIs.
Kafka client SDKsKafka provides official support for the Java client only, but SDKs are available for many other languages with community support.
Higher-level client implementationsHigh-level client implementations abstract the complexities of working with Kafka.
Client exampleThe article provides example code for consuming messages using the Java client and Spring Kafka.
Client best practicesImplementing best practice configurations ensures efficient, reliable, and secure message handling.

Kafka client APIs

Kafka clients facilitate Kafka integration within application code, abstracting the complexities of direct protocol communication with Kafka brokers. APIs facilitate interaction between clients and the Kafka ecosystem. We give some examples of common APIs below.

Producer API

The producer API enables applications to publish records to one or more Kafka topics. Producers package data into records and send it to topics within the Kafka cluster.

The API supports various configurations to optimize throughput and ensure data reliability through batching, compression, and configurable delivery guarantees. It ensures that data producers can efficiently send large data volumes with configurable acknowledgment levels to guarantee that messages are stored successfully.

You can use the following settings to control the operation of the producer API:

PropertyDescription
BatchingConfigures how many records to accumulate before sending to the broker
PartitioningDetermines how to distribute records across brokers
CompressionEnables compression for record batches (e.g., gzip, snappy)
EncryptionEnables encryption of data in transit
RetriesConfigures the number of retries for failed publishing attempts
Acknowledgment levelSpecifies the number of broker acknowledgments required (e.g., acks=all)

The following code demonstrates the usage of the producer API:

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.Collections;
import java.util.Properties;

public class SimpleConsumer {
   public static void main(String[] args) {
       Properties props = new Properties();
       props.put("bootstrap.servers", "localhost:9092");
       props.put("group.id", "test-group");
       props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
       props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
       KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
       consumer.subscribe(Collections.singletonList("test_topic"));
       ConsumerRecords<String, String> records = consumer.poll(10000);
       records.forEach(record -> System.out.printf("%s: %s%n", record.key(), record.value()));
   }
}

Consumer API

The consumer API allows applications to subscribe to topics and process the records produced. Consumers read records from subscribed topics, processing data in real-time or batch mode as needed. Kafka facilitates efficient record distribution among consumers in a group, ensuring scalable and fault-tolerant processing.

Consumers use offsets to record their position on a partition that is being consumed. That enables consumers to re-read messages and consume messages selectively. You can use the following settings to control the operation of the consumer API.

PropertyDescription
Consumer group strategiesDefines how to distribute records among consumers
Offset managementManages consumer offsets to control record processing
Auto-commitAutomatically commits offsets periodically
Manual commitAllows for manual control over offset commits

The following code demonstrates the usage of the consumer API:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("my-topic"));
while (true) {
   ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
   for (ConsumerRecord<String, String> record : records) {
       System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
   }
}

Streams API

The streams API is a high-level library for building complex stream-processing applications on top of Kafka. This API abstracts away the complexities of direct stream manipulation. It allows developers to define high-level transformations and aggregations and join operations on data streams.

It also handles parallel processing, state management, and fault tolerance, allowing for sophisticated real-time data processing applications that can scale across the Kafka cluster. The Streams API transforms how clients interact with data streams, providing a robust framework for developing stateful applications that process data in real time.

You can use the following settings to control the operation of the Streams API:

PropertyDescription
Application IDA unique identifier for the stream processing application
Bootstrap serversA list of Kafka brokers to connect to
Default SerDesSpecifies the default serializers and deserializers for keys and values
State directoryDirectory location for stateful operations
Processing guaranteeDefines the level of processing guarantees (e.g., "at_least_once" or "exactly_once")
Num stream threadsConfigures the number of threads for processing
Cache max bytes bufferingConfigures the maximum amount of memory to buffer records before flushing
Commit intervalConfigures the interval at which the stream processor commits its current state
Replication factorNumber of replicas for the internal topics

The following code demonstrates the usage of the Streams API:

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-example");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("my-input-topic");
source.mapValues(value -> value.toUpperCase()).to("my-output-topic", Produced.with(Serdes.String(), Serdes.String()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

Connect API

The connect API simplifies the integration of Kafka with other data systems. It provides a scalable and reliable way to move large data sets into and out of Kafka. Connectors source data from databases, key-value stores, search indexes, and other data systems or sink data into these systems.

Connect API integrations have Source and Sink configurations, which depend on the integrated system. For the Kafka side of the connection, you can use producer or consumer settings.

The following code demonstrates the creation of a database connector:

curl -X POST -H "Content-Type: application/json" --data '{
   "name": "jdbc-source",
   "config": {
       "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
       "tasks.max": "1",
       "connection.url": "jdbc:mysql://localhost:3306/mydb",
       "connection.user": "user",
       "connection.password": "password",
       "table.whitelist": "mytable",
       "mode": "incrementing",
       "incrementing.column.name": "id",
       "topic.prefix": "jdbc-"
   }
}' http://localhost:8083/connectors

Admin API

The admin API provides administrative capabilities for managing and inspecting the Kafka cluster. It allows you to create, list, describe, and delete topics, manage quotas, alter configurations, and perform other administrative tasks.

The following code demonstrates the usage of the admin API:

Kafka client SDKs

Kafka officially supports clients in Java. However, the community has extended support to numerous other languages, including Go, Python, .NET, and Node.js, ensuring Kafka's accessibility across diverse development environments.

Higher-level client implementations

Beyond the standard client libraries, higher-level implementations like Spring Kafka in Java, Faust in Python, and KafkaJS for Node.js exist—these abstract the complexities of working with Kafka further and integrate seamlessly into their respective ecosystems.

Client compatibility

Apache Kafka is designed with support for backward compatibility, ensuring that client libraries can interact with broker versions different from their own. Clients are generally backward-compatible and often forward-compatible, but using an older client version may prevent newer features in the broker from being used.

Kafka clients use an API version request when connecting to the broker, enabling the client to adapt its behavior to match the broker’s capabilities.

Choosing a Client SDK

When selecting a Kafka client, consider factors like language support, community, official backing, performance, and features. The choice often depends on the application's specific requirements and the development team's expertise.

Java client example

The following Java code subscribes to a Kafka topic and prints out each message:

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.Collections;
import java.util.Properties;

public class SimpleConsumer {
	public static void main(String[] args) {
    	Properties props = new Properties();
    	props.put("bootstrap.servers", "localhost:9092");
    	props.put("group.id", "test-group");
    	props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    	props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    	KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    	consumer.subscribe(Collections.singletonList("test_topic"));
    	ConsumerRecords<String, String> records = consumer.poll(10000);
    	records.forEach(record -> System.out.printf("%s: %s%n", record.key(), record.value()));
	}
}

Spring Kafka example

Spring Kafka provides a more abstract way to interact with Kafka, integrating Kafka into the Spring ecosystem and simplifying the producer and consumer code.

In this code, the application properties file defines lower-level settings like bootstrap servers and group ID, while the method signature infers the serializer.

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaReceiver {
	@KafkaListener(topics = "test_topic")
	public void listen(String message) {
    	System.out.println("Received: " + message);
	}
}

Best practices in Kafka client configurations

Implementing best practices when working with Kafka clients ensures efficient, reliable, and secure message handling within your applications. Here are some things to consider.

Throughput optimization

You have to adjust settings to balance between latency and throughput. It is impacted by the following configurations.

ConfigurationsBest practiceReason
batch.size, linger.msAdjust to balance latency and throughputLarger batch sizes enhance throughput but increase latency as the producer waits to fill a batch.
fetch.min.bytes, fetch.max.wait.msControl data fetched per requestTuning these reduces the number of fetch requests, improving consumer throughput.
max.poll.recordsControl the number of records per pollManages the processing load effectively
compression.typeUse to reduce data sizeImproves throughput but increases CPU usage
num.stream.threads, cache.max.bytes.bufferingAdjust parallelism and buffer size Controls execution efficiency and data handling capacity
tasks.max, connector-specific batch sizeControl parallelism and batch sizeOptimizes the handling and processing of data streams

Robust error handling

The following settings help you handle errors and downtime in your Kafka system efficiently.

ConfigurationsBest practiceReason
retries, retry.backoff.msImplement retry mechanisms with exponential backoffEnsures message delivery and avoids overloading the server during peak errors
enable.idempotenceEnable to prevent duplicate records Essential for scenarios requiring exactly-once processing semantics
Transaction APIsUse for exactly-once semanticsWraps production and consumption of messages in a transaction to prevent data loss or duplication
Consumer rebalance handlingDesign consumers to handle rebalances and commit offsets properlyEnsures continuous processing and data integrity during network errors or broker failures
enable.auto.commit Set to false so you can manually manage offsetsAllows explicit control over when to commit offsets, reducing data loss
state.dirUse fast, reliable storageMinimizes risk of state store corruption and data loss
processing.guarantee Set to "exactly_once" so records are processed exactly oncePrevents data from being lost or processed multiple times
errors.tolerance, errors.deadletterqueue.topic.name, errors.retry.timeoutSet appropriate error handling policies and configure retry policiesManages faulty records and reduces data loss during transient failures

Security enforcement

Use SSL/TLS to encrypt data in transit between clients and servers. Configure SASL/SCRAM or Kerberos for client authentication to ensure only authorized clients can access the Kafka cluster. You can also implement ACLs (Access Control Lists) to control reading and writing access to topics, ensuring that only permitted clients can access specific topics.

You can inadvertently expose sensitive information in your code or configuration files. Use environment variables or secure vaults to manage credentials and configuration details.

Performance benchmarks

Perform load testing and benchmarking to understand how your Kafka clients perform under stress. Use these insights to fine-tune configurations and system resources. Regularly test the scalability of your Kafka clients, especially before significant changes in the volume of data or number of consumers. Appropriate testing helps ensure your system can handle increased loads without degradation in performance.

Monitoring and maintenance

It is also a best practice to monitor key metrics from Kafka clients, such as message rate, latency, and error rates. Utilize tools like JMX, Prometheus, or Kafka’s built-in monitoring capabilities to monitor performance and identify potential issues early.

Comprehensive logging aids in troubleshooting and understanding client behavior over time. Implement a robust logging strategy to capture detailed information about client operations, especially errors and system messages.

Finally, remember to keep your Kafka client libraries updated with the latest releases. Regular updates can protect against known vulnerabilities, improve performance, and provide access to new features.

Conclusion

Kafka clients are available across many languages, offering both low-level access and high-level abstractions to Kafka's capabilities. High-level clients, like Spring Kafka, can significantly simplify development, allowing teams to focus more on business logic rather than boilerplate code.

Adhering to best practices in configuration, error handling, and security ensures the effective and safe delivery of functionality, contributing to the success of your data-driven applications.

Chapters