Kafka architecture—a deep dive
Kafka client
Apache Kafka® is a distributed streaming platform that enables you to publish, subscribe to, process, and store streams of records in a fault-tolerant, durable manner. It is designed for high-volume event streaming applications and is a backbone for many event-driven systems, enabling decoupled services to communicate through real-time data streams.
Kafka clients are libraries that allow applications to interact with a Kafka broker. They enable the production of messages to topics, the consumption of messages from topics, and performing administrative operations, among other tasks. They bridge your applications with Kafka so you can integrate Kafka's streaming capabilities into other software systems.
This article examines Kafka client APIs, common settings, and SDK and configuration best practices.
Summary of key Kafka client concepts
Kafka client APIs
Kafka clients facilitate Kafka integration within application code, abstracting the complexities of direct protocol communication with Kafka brokers. APIs facilitate interaction between clients and the Kafka ecosystem. We give some examples of common APIs below.
Producer API
The producer API enables applications to publish records to one or more Kafka topics. Producers package data into records and send it to topics within the Kafka cluster.
The API supports various configurations to optimize throughput and ensure data reliability through batching, compression, and configurable delivery guarantees. It ensures that data producers can efficiently send large data volumes with configurable acknowledgment levels to guarantee that messages are stored successfully.
You can use the following settings to control the operation of the producer API:
The following code demonstrates the usage of the producer API:
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.Collections;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test_topic"));
ConsumerRecords<String, String> records = consumer.poll(10000);
records.forEach(record -> System.out.printf("%s: %s%n", record.key(), record.value()));
}
}
Consumer API
The consumer API allows applications to subscribe to topics and process the records produced. Consumers read records from subscribed topics, processing data in real-time or batch mode as needed. Kafka facilitates efficient record distribution among consumers in a group, ensuring scalable and fault-tolerant processing.
Consumers use offsets to record their position on a partition that is being consumed. That enables consumers to re-read messages and consume messages selectively. You can use the following settings to control the operation of the consumer API.
The following code demonstrates the usage of the consumer API:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
Streams API
The streams API is a high-level library for building complex stream-processing applications on top of Kafka. This API abstracts away the complexities of direct stream manipulation. It allows developers to define high-level transformations and aggregations and join operations on data streams.
It also handles parallel processing, state management, and fault tolerance, allowing for sophisticated real-time data processing applications that can scale across the Kafka cluster. The Streams API transforms how clients interact with data streams, providing a robust framework for developing stateful applications that process data in real time.
You can use the following settings to control the operation of the Streams API:
The following code demonstrates the usage of the Streams API:
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-example");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("my-input-topic");
source.mapValues(value -> value.toUpperCase()).to("my-output-topic", Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Connect API
The connect API simplifies the integration of Kafka with other data systems. It provides a scalable and reliable way to move large data sets into and out of Kafka. Connectors source data from databases, key-value stores, search indexes, and other data systems or sink data into these systems.
Connect API integrations have Source and Sink configurations, which depend on the integrated system. For the Kafka side of the connection, you can use producer or consumer settings.
The following code demonstrates the creation of a database connector:
curl -X POST -H "Content-Type: application/json" --data '{
"name": "jdbc-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://localhost:3306/mydb",
"connection.user": "user",
"connection.password": "password",
"table.whitelist": "mytable",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "jdbc-"
}
}' http://localhost:8083/connectors
Admin API
The admin API provides administrative capabilities for managing and inspecting the Kafka cluster. It allows you to create, list, describe, and delete topics, manage quotas, alter configurations, and perform other administrative tasks.
The following code demonstrates the usage of the admin API:
Kafka client SDKs
Kafka officially supports clients in Java. However, the community has extended support to numerous other languages, including Go, Python, .NET, and Node.js, ensuring Kafka's accessibility across diverse development environments.
Higher-level client implementations
Beyond the standard client libraries, higher-level implementations like Spring Kafka in Java, Faust in Python, and KafkaJS for Node.js exist—these abstract the complexities of working with Kafka further and integrate seamlessly into their respective ecosystems.
Client compatibility
Apache Kafka is designed with support for backward compatibility, ensuring that client libraries can interact with broker versions different from their own. Clients are generally backward-compatible and often forward-compatible, but using an older client version may prevent newer features in the broker from being used.
Kafka clients use an API version request when connecting to the broker, enabling the client to adapt its behavior to match the broker’s capabilities.
Choosing a Client SDK
When selecting a Kafka client, consider factors like language support, community, official backing, performance, and features. The choice often depends on the application's specific requirements and the development team's expertise.
Java client example
The following Java code subscribes to a Kafka topic and prints out each message:
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.Collections;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test_topic"));
ConsumerRecords<String, String> records = consumer.poll(10000);
records.forEach(record -> System.out.printf("%s: %s%n", record.key(), record.value()));
}
}
Spring Kafka example
Spring Kafka provides a more abstract way to interact with Kafka, integrating Kafka into the Spring ecosystem and simplifying the producer and consumer code.
In this code, the application properties file defines lower-level settings like bootstrap servers and group ID, while the method signature infers the serializer.
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaReceiver {
@KafkaListener(topics = "test_topic")
public void listen(String message) {
System.out.println("Received: " + message);
}
}
Best practices in Kafka client configurations
Implementing best practices when working with Kafka clients ensures efficient, reliable, and secure message handling within your applications. Here are some things to consider.
Throughput optimization
You have to adjust settings to balance between latency and throughput. It is impacted by the following configurations.
Robust error handling
The following settings help you handle errors and downtime in your Kafka system efficiently.
Security enforcement
Use SSL/TLS to encrypt data in transit between clients and servers. Configure SASL/SCRAM or Kerberos for client authentication to ensure only authorized clients can access the Kafka cluster. You can also implement ACLs (Access Control Lists) to control reading and writing access to topics, ensuring that only permitted clients can access specific topics.
You can inadvertently expose sensitive information in your code or configuration files. Use environment variables or secure vaults to manage credentials and configuration details.
Performance benchmarks
Perform load testing and benchmarking to understand how your Kafka clients perform under stress. Use these insights to fine-tune configurations and system resources. Regularly test the scalability of your Kafka clients, especially before significant changes in the volume of data or number of consumers. Appropriate testing helps ensure your system can handle increased loads without degradation in performance.
Monitoring and maintenance
It is also a best practice to monitor key metrics from Kafka clients, such as message rate, latency, and error rates. Utilize tools like JMX, Prometheus, or Kafka’s built-in monitoring capabilities to monitor performance and identify potential issues early.
Comprehensive logging aids in troubleshooting and understanding client behavior over time. Implement a robust logging strategy to capture detailed information about client operations, especially errors and system messages.
Finally, remember to keep your Kafka client libraries updated with the latest releases. Regular updates can protect against known vulnerabilities, improve performance, and provide access to new features.
Conclusion
Kafka clients are available across many languages, offering both low-level access and high-level abstractions to Kafka's capabilities. High-level clients, like Spring Kafka, can significantly simplify development, allowing teams to focus more on business logic rather than boilerplate code.
Adhering to best practices in configuration, error handling, and security ensures the effective and safe delivery of functionality, contributing to the success of your data-driven applications.