Switching from Kafka to Redpanda for streaming analysis in a VOD project

Switching from Kafka to Repanda is easier than you think. In this post, Pathway proves it with an example video-on-demand platform.

By
on
May 25, 2023

This is usually done with data science functions trained on large amounts of data to be accurate enough to make decisions on. But the brainiacs over at Pathway have cracked the nut on doing the same thing with small amounts of data, in real time, against streaming data coming directly from Apache Kafka®. The same code base can be used for event-driven and microbatch architectures in Pathway.

Pathway loved the idea of having a smaller footprint at mega scale and faster performance, but first wanted to confirm whether Redpanda truly is a "drop-in replacement" for Kafka. So, we put it to the test.

In this post, we walk you through a practical example of using Pathway on both Apache Kafka and Redpanda to show just how easy it is to make the switch. If you want to replicate this yourself, you can find the source code in this GitHub repository.

The best-rated movies problem

Picture this: you’ve just been hired by a trendy video-on-demand (VOD) platform. Your first task is to identify the most popular movies in the catalog. Specifically, you want to find the K movies with the highest scores and see how many ratings those movies received.

For example, this is what the expected table for K=3 could be:

MovieID

Average

RatingNumber

218

4.9

7510

45

4.8

9123

7456

4.8

1240

The ratings are received as a data stream through a Kafka instance, and you output the table to a CSV file.

Implementing the solution with Kafka

We’ll need the following components:

  • Apache ZooKeeper™
  • Apache Kafka
  • Pathway
  • A stream producer

Each component will be hosted in a different docker container. The stream producer sends the ratings to Kafka on the topic ratings. Pathway listens to the topic, processes the stream, and then outputs the ranking in a CSV file best_rating.csv.

Diagram showing how the streaming solution works with Kafka.
Diagram showing how the streaming solution used to work with Kafka.

Pathway and the stream generator have their own Dockerfile to install all the required dependencies. The stream is then created by streaming the lines of a static data dataset.csv.

This will be the structure of our project:

├── pathway-src/
│   ├── Dockerfile
│   └── process-stream.py
├── producer-src/
│   ├── create-stream.py
│   ├── dataset.csv
│   └── Dockerfile
├── docker-compose.yml
└── Makefile

1. Configure Kafka and ZooKeeper

Kafka and ZooKeeper are configurable in the docker-compose.yml file. To keep things simple, we can skip the security mechanisms.

// docker-compose.yml

version: "3.7"
name: tuto-switch-to-redpanda
networks:
 tutorial_network:
   driver: bridge
services:
 zookeeper:
   image: confluentinc/cp-zookeeper:5.5.3
   environment:
     ZOOKEEPER_CLIENT_PORT: 2181
   networks:
     - tutorial_network
 kafka:
   image: confluentinc/cp-enterprise-kafka:5.5.3
   depends_on: [zookeeper]
   environment:
     KAFKA_AUTO_CREATE_TOPICS: true
     KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
     KAFKA_ADVERTISED_HOST_NAME: kafka
     KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
     KAFKA_BROKER_ID: 1
     KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
     KAFKA_JMX_PORT: 9991
     KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
     KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
     CONFLUENT_SUPPORT_METRICS_ENABLE: false
   ports:
   - 9092:9092
   command: sh -c "((sleep 15 && kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic ratings)&) && /etc/confluent/docker/run "
   networks:
     - tutorial_network

Here we’re sending the messages to a topic called ratings, created in the command setting.

2. Generate the stream

We start with a CSV dataset with the following columns: userId (int), movieId (int), rating (float), and timestamp (int). This is the schema chosen by GroupLens for their MovieLens25M dataset: we provide a toy dataset as an example, but the project will work with the whole MovieLens25M dataset.

To generate the stream, you can use a simple Python script to read the CSV file line by line, and each rating will be sent to Kafka using the kafka-python package.

// ./producer-src/create-stream.py

import csv
import json
import time
from kafka import KafkaProducer
topic = "ratings"
#We wait for Kafka and ZooKeeper to be ready
time.sleep(30)
producer = KafkaProducer(
   bootstrap_servers=["kafka:9092"],
   security_protocol="PLAINTEXT",
   api_version=(0, 10, 2),
)
with open("./dataset.csv", newline="") as csvfile:
   dataset_reader = csv.reader(csvfile, delimiter=",")
   first_line = True
   for row in dataset_reader:
       # We skip the header
       if first_line:
           first_line = False
           continue
       message_json = {
           "userId": int(row[0]),
           "movieId": int(row[1]),
           "rating": float(row[2]),
           "timestamp": int(row[3]),
       }
       producer.send(topic, (json.dumps(message_json)).encode("utf-8"))
       time.sleep(0.1)
producer.send(topic, "*COMMIT*".encode("utf-8"))
time.sleep(2)
producer.close()

Note that we connect to kafka:9092 and not localhost.

This script will have its own container:

// docker-compose.yml

 stream-producer:
   build:
     context: .
     dockerfile: ./producer-src/Dockerfile
   depends_on: [kafka]
   networks:
     - tutorial_network

You only need to use a Python image and install the associated package:

// ./producer-src/Dockerfile

FROM python:3.10
RUN pip install kafka-python
COPY ./producer-src/create-stream.py create-stream.py
COPY ./producer-src/dataset.csv dataset.csv
CMD ["python", "-u", "create-stream.py"]

3. Connect Pathway to Kafka

Now you have a stream generated from the dataset and sent to Kafka. At this point, you simply need to connect Pathway to Kafka and process the data.

To connect to Kafka, configure the connection:

// ./pathway-src/process-stream.py

rdkafka_settings = {
   "bootstrap.servers": "kafka:9092",
   "security.protocol": "plaintext",
   "group.id": "0",
   "session.timeout.ms": "6000",
}

To establish a more secure connection using a SASL-SSL authentication over a SCRAM-SHA-256 mechanism, you can do the following:\

// ./pathway-src/process-stream.py

rdkafka_settings = {
   "bootstrap.servers": "server:9092",
   "security.protocol": "sasl_ssl",
   "sasl.mechanism": "SCRAM-SHA-256",
   "group.id": "$GROUP_NAME",
   "session.timeout.ms": "6000",
   "sasl.username": "username",
   "sasl.password": "********"
}

Connect to the ratings topic using a Kafka connector:

// ./pathway-src/process-stream.py

t_ratings = pw.kafka.read(
   rdkafka_settings,
   topic_names=["ratings"],
   format="json",
   value_columns=[
       "movieId",
       "rating",
   ],
   types={"movieId": pw.Type.INT, "rating": pw.Type.FLOAT},
   autocommit_duration_ms=100,
)

You’re only interested in the movieId and rating columns, so there’s no need to include the others. Now, define a function to find the best-rated movies:

def compute_best(t_ratings, K):
 t_best_ratings = t_ratings.groupby(pw.this.movieId).reduce(
     pw.this.movieId,
     sum_ratings=pw.reducers.sum(pw.this.rating),
     number_ratings=pw.reducers.count(pw.this.rating),
 )
 t_best_ratings = t_best_ratings.select(
     pw.this.movieId,
     pw.this.number_ratings,
     average_rating=pw.apply(
         lambda x, y: (x / y) if y != 0 else 0,
         pw.this.sum_ratings,
         pw.this.number_ratings,
     ),
 )
 t_best_ratings = t_best_ratings.select(
     movie_tuple=pw.apply(
         lambda x, y, z: (x, y, z),
         pw.this.average_rating,
         pw.this.number_ratings,
         pw.this.movieId,
     )
 )
 t_best_ratings = t_best_ratings.reduce(
     total_tuple=pw.reducers.sorted_tuple(pw.this.movie_tuple)
 )
 t_best_ratings = t_best_ratings.select(
     K_best=pw.apply(lambda my_tuple: (list(my_tuple))[-K:], pw.this.total_tuple)
 )
 t_best_ratings = flatten_column(t_best_ratings.K_best).select(
     pw.this.K_best
 )
 t_best_ratings = t_best_ratings.select(
     movieId=pw.apply(lambda rating_tuple: rating_tuple[2], pw.this.K_best),
     average_rating=pw.apply(lambda rating_tuple: rating_tuple[0], pw.this.K_best),
     views=pw.apply(lambda rating_tuple: rating_tuple[1], pw.this.K_best),
 )
 return t_best_ratings

Using the function, your final file will look like this:

// ./pathway-src/process-stream.py

import pathway as pw
import time
rdkafka_settings = {
   "bootstrap.servers": "kafka:9092",
   "security.protocol": "plaintext",
   "group.id": "0",
   "session.timeout.ms": "6000",
}
t_ratings = pw.kafka.read(
   rdkafka_settings,
   topic_names=["ratings"],
   format="json",
   value_columns=[
       "movieId",
       "rating",
   ],
   types={"movieId": pw.Type.INT, "rating": pw.Type.FLOAT},
   autocommit_duration_ms=100,
)
t_best_ratings = compute_best(t_ratings, 3)
# We output the results in a dedicated CSV file
pw.csv.write(t_best_ratings, "./best_ratings.csv")
# We wait for Kafka and ZooKeeper to be ready
time.sleep(20)
# We launch the computation
pw.run()

Set up a dedicated container:

// docker-compose.yml

 pathway:
   build:
     context: .
     dockerfile: ./pathway-src/Dockerfile
   depends_on: [kafka]
   networks:
     - tutorial_network
// ./pathway-src/Dockerfile

FROM python:3.10

RUN pip install --extra-index-url https://packages.pathway.com/966431ef6b789140248947273e541114f6679aa3436359bb5d67064b5693047a90ec699d6a pathway
COPY ./pathway-src/process-stream.py process-stream.py

CMD ["python", "-u", "process-stream.py"]

4. Run the results

Use the following toy dataset:

// ./producer-src/dataset.csv

userId,movieId,rating,timestamp
1,296,5.0,1147880044
1,306,3.5,1147868817
1,307,5.0,1147868828
1,665,5.0,1147878820
1,899,3.5,1147868510
1,1088,4.0,1147868495
2,296,4.0,1147880044
2,306,2.5,1147868817
2,307,3.0,1147868828
2,665,2.0,1147878820
2,899,4.5,1147868510
2,1088,2.0,1147868495
3,296,1.0,1147880044
3,306,2.5,1147868817
3,307,4.0,1147868828
3,665,2.0,1147878820
3,899,1.5,1147868510
3,1088,5.0,1147868495

You should get the following results:

movieId,average_rating,views,time,diff
296,5,1,1680008702067,1
306,3.5,1,1680008702167,1
[...]
296,3.3333333333333335,3,1680008703767,-1
1088,3.6666666666666665,3,1680008703767,1
296,3.3333333333333335,3,1680008703767,1
899,3.1666666666666665,3,1680008703767,-1

As expected, the top three results are updated whenever the ranking changes due to a new rating.

Switching from Kafka to Redpanda

Congratulations! You can now find the K best-rated movies on your VOD platform. However, your team has discovered a new alternative to Kafka: Redpanda—which is known for being simpler, faster, and more cost-effective.

With Redpanda, the project is straightforward. All you need is:

  • Redpanda
  • Pathway
  • The stream producer
Diagram showing how the streaming solution works with Redpanda.
Diagram showing how the streaming solution works with Redpanda.

Let's see how to deploy Redpanda in Docker and how it impacts your project.

1. Set up Redpanda on Docker

(You can skip this section if you already have an existing Redpanda instance)

First, remove the two services kafka and zookeeper, and replace them with a redpanda service:

// ./docker-compose.yml

services:
 redpanda:
   command:
     - redpanda
     - start
     - --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092
     - --advertise-kafka-addr internal://redpanda:9092,external://localhost:19092
     - --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082
     - --advertise-pandaproxy-addr internal://redpanda:8082,external://localhost:18082
     - --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081
     - --rpc-addr redpanda:33145
     - --advertise-rpc-addr redpanda:33145
     - --smp 1
     - --memory 1G
     - --mode dev-container
     - --default-log-level=debug
     - --set redpanda.enable_transactions=true
     - --set redpanda.enable_idempotence=true
     - --set redpanda.auto_create_topics_enabled=true
   image: docker.redpanda.com/redpandadata/redpanda:v23.1.2
   container_name: redpanda
   volumes:
     - redpanda:/var/lib/redpanda/data
   networks:
     - tutorial_network

You must now connect Pathway and the stream producer to redpanda instead of kafka. This could have been avoided by naming the Kafka container differently or by naming the Redpanda container kafka.

2. Connect Redpanda with Pathway

As previously mentioned, we need to update the server's address in the settings:

// ./pathway-src/process-stream.py

producer = KafkaProducer(
   bootstrap_servers=["redpanda:9092"],
   security_protocol="PLAINTEXT",
   api_version=(0, 10, 2),
)

That’s it! The setting will work exactly the same as with Kafka. However, for consistency, there are also dedicated Redpanda connectors. There’s no difference between Kafka and Redpanda connectors since the same connector is used under the hood.

With the Redpanda connector, here’s what your ./pathway-src/process-stream.py file looks like:

// ./pathway-src/process-stream.py

import pathway as pw
import time
rdkafka_settings = {
   "bootstrap.servers": "redpanda:9092",
   "security.protocol": "plaintext",
   "group.id": "0",
   "session.timeout.ms": "6000",
}
t_ratings = pw.redpanda.read(
   rdkafka_settings,
   topic_names=["ratings"],
   format="json",
   value_columns=[
       "movieId",
       "rating",
   ],
   types={"movieId": pw.Type.INT, "rating": pw.Type.FLOAT},
   autocommit_duration_ms=100,
)
t_best_ratings = compute_best(t_ratings, 3)
# We output the results in a dedicated CSV file
pw.csv.write(t_best_ratings, "./best_ratings.csv")
# We wait for Kafka and ZooKeeper to be ready
time.sleep(20)
# We launch the computation
pw.run()

If you don't care about the names of the connector and server, you don't have to change the file at all.

3. Generate the stream

As with Pathway, you need to update the server name (if required) for the stream generator:

// ./producer-src/create-stream.py

producer = KafkaProducer(
   bootstrap_servers=["redpanda:9092"],
   security_protocol="PLAINTEXT",
   api_version=(0, 10, 2),
)

Small problem—if you look at the results now, we have an empty best_ratings.csv.

This comes from creating the ratings topic. While the topic was ready with Kafka, it was created at the reception of the first message with Redpanda. Creating a topic makes Redpanda discard the messages until it’s ready. Sending a message at the beginning of the computation should solve this:

// ./producer-src/create-stream.py

producer.send(topic, "*COMMIT*".encode("utf-8"))
time.sleep(2)

Note that this difference comes from Redpanda—not Pathway. Pathway will receive and process the data the same way regardless of whether Kafka or Redpanda is used. Kafka and Redpanda are responsible for handling messages: Redpanda discards the incoming messages while the topic is created.

Despite the fix, the final file is very similar to the one for the Kafka version:

// ./producer-src/create-stream.py

from kafka import KafkaProducer
import csv
import time
import json
topic = "ratings"
#We wait for Kafka and ZooKeeper to be ready
time.sleep(30)
producer = KafkaProducer(
   bootstrap_servers=["kafka:9092"],
   security_protocol="PLAINTEXT",
   api_version=(0, 10, 2),
)
producer.send(topic, "*COMMIT*".encode("utf-8"))
time.sleep(2)
with open("./dataset.csv", newline="") as csvfile:
   dataset_reader = csv.reader(csvfile, delimiter=",")
   first_line = True
   for row in dataset_reader:
       # We skip the header
       if first_line:
           first_line = False
           continue
       message_json = {
           "userId": int(row[0]),
           "movieId": int(row[1]),
           "rating": float(row[2]),
           "timestamp": int(row[3]),
       }
       producer.send(topic, (json.dumps(message_json)).encode("utf-8"))
       time.sleep(0.1)
producer.send(topic, "*COMMIT*".encode("utf-8"))
time.sleep(2)
producer.close()

You can take a look at the project sources in our GitHub repository. With this, the results are the same as with Kafka:

movieId,average_rating,views,time,diff
296,5,1,1680008702067,1
306,3.5,1,1680008702167,1
[...]
296,3.3333333333333335,3,1680008703767,-1
1088,3.6666666666666665,3,1680008703767,1
296,3.3333333333333335,3,1680008703767,1
899,3.1666666666666665,3,1680008703767,-1

Bonus: send your results back to Redpanda

You've successfully computed the K best-rated movies using Redpanda, and your ranking is automatically updated thanks to Pathway. But after taking a closer look, you realize you've been sending your results in a CSV file, which isn't the most suitable for handling a data stream.

Not only that, but the file stays on your local computer, preventing others in the organization from accessing the data in real time.

Your team suggests sending the results back to Redpanda into an “output topic” instead. Redpanda is optimized for handling data streams, making it a more efficient and effective solution than sending data in a CSV file. By doing this, you can ensure the data is accessible to everyone who needs it in real time.

Connecting to Redpanda with Pathway is as easy as connecting to Kafka. You just have to use a Redpanda connector, which is exactly the same as the Kafka connector:

rdkafka_settings = {
 "bootstrap.servers": "redpanda:9092",
 "security.protocol": "plaintext",
 "group.id": "$GROUP_NAME",
 "session.timeout.ms": "6000",
}
pw.redpanda.write(
 t_best_ratings,
 rdkafka_settings,
 topic_name="best_ratings",
 format="json"
)

As previously mentioned, you can also establish a more secure connection using a SASL-SSL authentication over a SCRAM-SHA-256 mechanism (e.g. connecting to Redpanda Cloud) as follows:

rdkafka_settings = {
 "bootstrap.servers": "redpanda:9092",
 "security.protocol": "sasl_ssl",
 "sasl.mechanism": "SCRAM-SHA-256",
 "group.id": "$GROUP_NAME",
 "session.timeout.ms": "6000",
 "sasl.username": "username",
 "sasl.password": "********",
}

Conclusion: it’s simple to switch from Kafka to Redpanda!

Congratulations! You just built a best-rated movie application in Pathway and made it work with Redpanda. Being fully Kafka API-compatible, Redpanda truly acts as a drop-in replacement—there are no new binaries to install, no new services to deploy and maintain, and the default configuration just works.

If the server name remains the same, you have nothing to do with your Pathway code! For consistency, you can use Pathway's Redpanda connectors, which work exactly the same as Kafka connectors. To learn more about Redpanda, browse the Redpanda blog for more tutorials on how to easily integrate with Redpanda, or dive into one of the many free courses at Redpanda University.

To keep exploring, try Redpanda for free! If you get stuck, have a question, or want to chat with the team and fellow Redpanda users, join the Redpanda Community on Slack.

Machine learning and artificial intelligence are powerful, but when you combine them with real-time data, they become extra meaningful if you want to act now. Let's say you have geospatial data and want to map out geofences on the fly in realtime, or get alerts on anomalies without having any parameters pre-defined, or perhaps you want to quickly know what sensors are having issues.

Graphic for downloading streaming data report
Save Your Spot

Related articles

VIEW ALL POSTS
Build an inventory monitoring system with Flink and MongoDB
Rexford A. Nyarko
&
&
&
October 29, 2024
Text Link
8 business benefits of real-time analytics
Redpanda
&
&
&
October 22, 2024
Text Link
Vector databases vs. knowledge graphs for streaming data applications
Fortune Adekogbe
&
&
&
October 15, 2024
Text Link