Learn how to write a Python client that produces and consumes data from Redpanda in just five steps.

ByDunith DhanushkaonFebruary 14, 2023
Getting started with Redpanda in Python

Python has always been a popular choice among budding developers who are just getting started with programming. According to the 2022 Stack Overflow Developer Survey, Python was the third most popular programming language for people learning to code, right after Javascript.

Python’s simplicity, versatility, and support for multiple programming paradigms—like procedural, object-oriented, and functional programming—helped drive its adoption in various industries, including data engineering and machine learning. Apache Kafka® offers support for Python via the kafka-python package, which lets you build a diverse set of use cases around Kafka, including:

  • Event-driven microservices
  • ETL pipelines
  • Stream processors
  • Scripting admin operations

Redpanda is compatible with Kafka APIs, allowing seamless reuse of existing Kafka clients. If you have an existing Python client for Kafka or you want to write a new Python client for Redpanda, you’re reading the right blog post.

In this tutorial, we’ll explore a common pattern that you can build with Redpanda and Python: producing to and consuming data from Redpanda with a Python application.

The process

Here are the steps we’ll be walking through to build our example:

  1. Set up the prerequisites
  2. Start a single-node Redpanda cluster
  3. Produce and consume plain-text messages
  4. Produce and consume JSON messages
  5. Clean up

First, we’ll start a Redpanda cluster and create a topic called orders. This cluster uses the standard Kafka protocol port 9092 for communicating with Python clients.

Then, we’ll write a Python producer that produces plain-text messages to the orders topic, followed by a Python consumer that consumes from the same.

Python producer consumer

As we go through this tutorial, we’ll add the necessary code to produce and consume JSON-formatted messages as well. That said, let’s dive into the first step.

1. Prerequisites

Start by making sure you have the following prerequisites set up:

In this GitHub repository, you can find the configuration files and completed source code for the examples in this guide. You can clone it into your local machine using:

git clone https://github.com/redpanda-data-blog/2023-python-gsg.git cd 2023-python-gsg

Create a virtual environment and install Python dependencies

Before writing any code, let’s create a new virtual environment for our project.

Virtual environments are used in Python to create isolated environments for different projects. With them, you can have separate and distinct environments for each of your projects, each with its own dependencies and packages. This helps to avoid conflicts and maintain a consistent and predictable environment.

Now, run the following commands to create a new virtual environment, activate it, and upgrade the pip installation:

python3 -m venv env source env/bin/activate pip install --upgrade pip

Choose a Python client library

At this point, you might have a question about which Python library to choose because there are two popular options: kafka-python and confluent-kafka-python. However, there are some differences between them that may make one a better choice than the other depending on your use case.

  • Kafka-python: provides a simple, high-level API interface while offering a lot of flexibility with a relatively small codebase. It’s often a good choice for small to medium sized projects.

  • Confluent-kafka-python: provides a more feature-rich and optimized API interface, including advanced features such as compression, serialization, and reliability guarantees. Also, it offers better performance than kafka-python, particularly in high-throughput scenarios, making it a good choice for large-scale projects or projects that require advanced Kafka functionality.

Ultimately, the choice between kafka-python and confluent-kafka-python will depend on your specific requirements. If you’re starting a new project and aren’t sure which library to use, you may want to start with kafka-python and switch to confluent-kafka-python if your needs grow beyond its capabilities.

We’ll use kafka-python in this tutorial. You can install it by running the following command:

pip install kafka-python

2. Start a single-node Redpanda cluster

First, navigate to the root level of the cloned repository in a terminal and type the following to start a single node Redpanda cluster:

docker-compose up -d

The 2023-python-gsg repo contains a docker-compose.yml to make it easy to start the cluster. Once the cluster is up, run the following command to create the orders topic. (We’re using the rpk CLI tool for this.)

docker exec -it redpanda rpk topic create orders

That’s it! Now we have a working Redpanda installation along with a topic.

3. Produce and consume plain-text messages

In this section, we’ll write Python code to produce and consume plain text messages (strings) into the orders topic.

Copy and paste the following code into the string_producer.py file:

from kafka import KafkaProducer from kafka.errors import KafkaError producer = KafkaProducer( bootstrap_servers = "localhost:9092" ) topic = "orders" def on_success(metadata): print(f"Message produced to topic '{metadata.topic}' at offset {metadata.offset}") def on_error(e): print(f"Error sending message: {e}") # Produce asynchronously with callbacks for i in range(1, 11): msg = f"Order with id #{i}" future = producer.send( topic, value=str.encode(msg) ) future.add_callback(on_success) future.add_errback(on_error) producer.flush() producer.close()

The producer.send() method above is asynchronous by nature—it writes the message to the wire and immediately returns a Future object. That makes the producer non-blocking and allows you to register callback functions to handle delivery receipts (on_success) and exceptions (on_error).

You can also use this producer code to connect a Redpanda cluster running in the Redpanda Cloud. However, that requires you to provide authentication parameters, such as SASL/SCRAM (username and password) or mTLS, in the KafkaProducer.

producer = KafkaProducer( bootstrap_servers="<TODO: change this to your cluster hosts>", security_protocol="SASL_SSL", sasl_mechanism="SCRAM-SHA-256", sasl_plain_username="<TODO: change this to your service account name>", sasl_plain_password="<TODO: change this to your service account password>", )

Other than that, the rest of the code works the same. For more information on working with the Redpanda Cloud, check our guide on how to use Redpanda with Kafka client libraries.

Run the following command in another terminal window to produce ten string messages to the orders topic.

python string_producer.py

Then, copy and paste the following code into the string_consumer.py file. This code will consume messages from the orders topic and print them on the terminal, along with the payload and message metadata.

from kafka import KafkaConsumer consumer = KafkaConsumer( bootstrap_servers=["localhost:9092"], group_id="demo-group", auto_offset_reset="earliest", enable_auto_commit=False, consumer_timeout_ms=1000 ) consumer.subscribe("orders") try: for message in consumer: topic_info = f"topic: {message.partition}|{message.offset})" message_info = f"key: {message.key}, {message.value}" print(f"{topic_info}, {message_info}") except Exception as e: print(f"Error occurred while consuming messages: {e}") finally: consumer.close()

Run the file using:

python string_consumer.py

4. Produce and consume JSON messages

Exchanging structured messages between applications, such as JSON objects, is much more common than string messages. So, we’ll write Python code to produce and consume JSON objects into the orders topic.

Copy and paste the following code into the json_producer.py file:

import json from kafka import KafkaProducer from kafka.errors import KafkaError producer = KafkaProducer( bootstrap_servers = "localhost:9092", value_serializer=lambda m: json.dumps(m).encode('ascii') ) topic = "orders" def on_success(metadata): print(f"Message produced to topic '{metadata.topic}' at offset {metadata.offset}") def on_error(e): print(f"Error sending message: {e}") # Produce asynchronously with callbacks for i in range(1, 11): msg = { "id": i, "content": "Some value"} future = producer.send(topic, msg) future.add_callback(on_success) future.add_errback(on_error) producer.flush() producer.close()

This code will produce ten JSON messages to the orders topic.

In the code, we use a lambda function as the value_serializer, which converts the dictionary object into a JSON string by invoking the json.dumps() method.

Run the JSON producer using:

python json_producer.py

You should see an output similar to this on the terminal:

Message produced to topic 'orders' at offset 0 Message produced to topic 'orders' at offset 1 Message produced to topic 'orders' at offset 2 Message produced to topic 'orders' at offset 3 Message produced to topic 'orders' at offset 4 Message produced to topic 'orders' at offset 5 Message produced to topic 'orders' at offset 6 Message produced to topic 'orders' at offset 7 Message produced to topic 'orders' at offset 8 Message produced to topic 'orders' at offset 9

Next, copy and paste the following code into the json_consumer.py file. This code will consume JSON messages from the orders topic, then parses and prints them on the terminal.

import json from kafka import KafkaConsumer consumer = KafkaConsumer( bootstrap_servers=["localhost:9092"], group_id="demo-group", auto_offset_reset="earliest", enable_auto_commit=False, consumer_timeout_ms=1000, value_deserializer=lambda m: json.loads(m.decode('ascii')) ) consumer.subscribe("orders") try: for message in consumer: topic_info = f"topic: {message.partition}|{message.offset})" message_info = f"key: {message.key}, {message.value}" print(f"{topic_info}, {message_info}") except Exception as e: print(f"Error occurred while consuming messages: {e}") finally: consumer.close()

Similar to the producer, the code uses a lambda function as the value_deserializer, which converts the received byte array into a JSON object by invoking the json.loads() method.

Run the file using:

python json_consumer.py

You should see an output similar to this on the terminal:

topic: orders (0|0), key: None, {'id': 1, 'content': 'Some value'} topic: orders (0|1), key: None, {'id': 2, 'content': 'Some value'} topic: orders (0|2), key: None, {'id': 3, 'content': 'Some value'} topic: orders (0|3), key: None, {'id': 4, 'content': 'Some value'} topic: orders (0|4), key: None, {'id': 5, 'content': 'Some value'} topic: orders (0|5), key: None, {'id': 6, 'content': 'Some value'} topic: orders (0|6), key: None, {'id': 7, 'content': 'Some value'} topic: orders (0|7), key: None, {'id': 8, 'content': 'Some value'} topic: orders (0|8), key: None, {'id': 9, 'content': 'Some value'} topic: orders (0|9), key: None, {'id': 10, 'content': 'Some value'}

5. Clean up

Lastly, stop the Redpanda cluster and remove the containers by running:

docker compose down

What’s next?

In this tutorial, we showed you how to write Python clients for producing and consuming data from Redpanda, including plain-text strings and JSON objects.

Now that you have a basic understanding, you can play around with different functions in the kafka-python package, such as producing messages with keys, producing to consuming from specific partitions, authentication, etc.

Plus, you can use Python with Redpanda to go beyond simple clients. For example:

Interested? Take Redpanda for a test drive! Make sure to check out our documentation to understand the nuts and bolts of the platform, and browse the Redpanda blog for more tutorials on how to easily integrate with Redpanda.

Let's keep in touch

Subscribe and never miss another blog post, announcement, or community event. We hate spam and will never sell your contact information.