Learn how to build a stream processing application by integrating Faust and Redpanda.

ByDaniel PalmaonNovember 8, 2022
Stream processing with Redpanda and Faust

Stream processing is a way of handling data in real time by processing events as they arrive, rather than waiting for batches (or all) of the data to arrive before processing it. This is useful in many situations, especially in cases where you need to react to changes immediately (e.g., such as in case fraud detection and anomaly detection).

As we enter the age of streaming data, it is becoming more common to see stream processing used as the default way of handling data. After all, if tech like Redpanda allows us to set up an easy-to-use streaming platform in minutes, why would we want to wait for batch-processing pipeline results?

In this article, we'll look at how Redpanda enables stream processing with Faust, a Python library originally developed by engineers at Robinhood and later forked by the community to keep it maintained.

You can access the code used in this demo at any time in this GitHub repository.

integrating-redpanda-faust

What is Faust?

Faust is a stream-processing library written in Python. It is designed to be used with streaming data platforms such as Apache KafkaⓇ and Redpanda. With Faust, we can define pipelines that ingest data from Kafka topics and do some calculations on them. Because Faust is written in Python, it is easy to combine with other Python data libraries such as Pandas, NumPy, and SciPy.

Faust uses RocksDB as its storage engine, which is a key-value store written in C++ that is optimized for lightning-fast performance. Persisting state uses an abstraction called a "table," which is a named and distributed collection of key-value pairs which we can interface with regular dictionaries from Python.

Anatomy a Faust application

Faust takes a lot of inspiration from the Kafka Streams library, mainly in the form of using the same abstractions (i.e., streams, tables, and joins). These abstractions enable an easy setup of stateful stream processing, which allows us to calculate aggregates that not only use the latest events, but we can combine them with values from the past. As an aside, storing state also enables fault tolerance, as well as it makes it possible to resume previous processing pipelines based on the saved state.

By the end of this article, we will have a complete stream processing application that will calculate the rolling average of temperature coming from various sensors. To understand how the parts come together to form the final application, let's take a look at them separately first.

At the core of Faust is the app object. This is the object that allows us to define our stream processing pipelines.

import faust app = faust.App( "temperature-stream", broker="kafka://localhost:9092", store="rocksdb://", )

The main parameters we need to pass to the app object are the name of the application, the broker addresses, and the storage driver if we want to persist state. We can set the store parameter to memory:// to use an in-memory store for development purposes, but for large tables we will want to use rocksdb://, as it makes recovering datasets nearly instantaneous, while the in-memory storage driver can take up to minutes.

If you opt for RocksDB, for best performance save the data on an SSD if possible as RocksDB is highly optimized for flash storage. Also keep in mind that RocksDB is designed to use all the memory you give it access to, but this is something you can tune when creating your Faust Tables!

Next up, let's take a look at agents.

Agents are the main building blocks of Faust applications. They are the objects that define the stream processors that we want to run. To define an agent you simply have to declare an async function with the @app.agent decorator.

topic = app.topic('messages-topic') @app.agent(topic) async def say(messages): async for message in messages: print(message)

In the above example, we create a very basic agent that prints out all the messages it receives. The topic parameter is a topic-description object that represents the Redpanda topic that the agent will listen to. The messages parameter is a stream of messages that we can iterate over.

With just these small building blocks, we are ready to start building our application!

The platform

To run our application, we need to start a Redpanda cluster. The easiest way to do this is by using the official rpk tool.

rpk container start -n 1

This will start a single-node Redpanda cluster in a Docker container. After running the command we should see

Starting cluster Waiting for the cluster to be ready... NODE ID ADDRESS 0 127.0.0.1:59894 Cluster started! You may use rpk to interact with it. E.g: rpk cluster info --brokers 127.0.0.1:59894

Take note of the address of the broker, as we will need it later, and off we go!

The stream processing application

As mentioned above, we are building an application that will calculate the rolling average of temperature coming from various sensors. We will generate mock data for this application, but in a real-world scenario this data could come from thousands of temperature sensors scattered around the world.

stream-processing-architecture

Now that we have a Redpanda cluster running, we can start building our stream processing application. We will start by creating a new file called temperature.py and define our main Faust application. (This example has been developed on Python 3.10, but different versions of Python 3 should work as well, as long as you are able to pip install faust-streaming.)

import faust app = faust.App( "temperature-stream", broker="kafka://localhost:59894", )

Next, we will define the schema of our input and output records using Faust-supported models. This allows us to define the records using Python syntax, and have Faust take care of serializing and deserializing the data for us.

class Temperature(faust.Record, isodates=True, serializer="json"): ts: datetime = None value: int = None class AggTemperature(faust.Record, isodates=True, serializer="json"): ts: datetime = None count: int = None mean: float = None min: int = None max: int = None

We define two classes, Temperature and AggTemperature. The Temperature class represents the input records, and the AggTemperature class represents the output records. The isodates parameter tells Faust to use ISO 8601 format for dates and times, and the serializer parameter tells Faust to use JSON for serialization. We could also add default values to certain fields of the records here in case they are missing from the input data, depending on how we want to handle them in the downstream computations. For now, we will leave them all as None.

Next, we will define the topics that we will be using. We will have a topic for the raw temperature readings, and a topic for the rolling average.

TOPIC = "raw-temperature" SINK = "agg-temperature" source = app.topic(TOPIC, value_type=Temperature) sink = app.topic(SINK, value_type=AggTemperature)

In the above code, we define two topics: TOPIC and SINK. The source topic is where we will be ingesting the raw data, and the sink topic is where we will be writing the results. We also define the value type of the topics to be the Temperature and AggTemperature classes that we defined earlier.

Persisting state

In order to calculate the rolling average, we need to keep track of the sum of the values we have seen so far, as well as the number of values we have seen. We can do this by using a Faust table. A table is an in-memory dictionary, backed by a Redpanda changelog topic used for persistence and fault tolerance. This is useful in case we have to rebuild the state of the table because of network failures or node restarts.

WINDOW = 10 # 10 seconds window WINDOW_EXPIRES = 1 PARTITIONS = 1 tumbling_table = ( app.Table( TABLE, default=list, key_type=str, value_type=Temperature, partitions=PARTITIONS, on_window_close=window_processor, ) .tumbling(WINDOW, expires=timedelta(seconds=WINDOW_EXPIRES)) .relative_to_field(Temperature.ts) )

Let's break down the above code. First, we define the WINDOW and WINDOW_EXPIRES parameters. The WINDOW parameter is the size of the window in seconds, and the WINDOW_EXPIRES parameter is the amount of time in seconds that we want to wait before we consider a window to be expired, meaning the duration for which we want to store the data allocated to each window.

The window_processor function is the function that will be called when a window is closed. This is where we will do the actual computation of the rolling average. Let's see how this looks.

def window_processor(key, events): timestamp = key[1][0] # key[1] is the tuple (ts, ts + window) values = [event.value for event in events] count = len(values) mean = sum(values) / count min_value = min(values) max_value = max(values) aggregated_event = AggTemperature( ts=timestamp, count=count, mean=mean, min=min_value, max=max_value ) print( f"Processing window: {len(values)} events, Aggreged results: {aggregated_event}" ) sink.send_soon(value=aggregated_event)

The window_processor function takes two parameters: key and events. The key parameter is the key of the table, and the events parameter is the list of events that were allocated to the window. In our case, the key is a tuple of the start and end timestamps of the window, and the events are the temperature readings that were allocated to the window.

We then calculate the count, mean, min, and max values of the events, and create a new AggTemperature record with the results. Finally, we print the results to the console and send the aggregated record to the sink topic, which we defined earlier.

Now that the middle (and the most important) part of our application is done, let's take a quick step back and implement the data generator function that will be responsible for pumping fake temperature sensor data into our Redpanda source topic.

@app.timer(interval=5.0) async def generate_temperature_data(): temperatures_topic = app.topic(TOPIC, key_type=str, value_type=Temperature) # Create a loop to send data to the Redpanda topic # Send 20 messages every time the timer is triggered (every 5 seconds) for i in range(20): # Send the data to the Redpanda topic await temperatures_topic.send( key=random.choice(["Sensor1", "Sensor2", "Sensor3", "Sensor4", "Sensor5"]), value=Temperature( ts=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"), value=randint(0, 100), ), ) print("Producer is sleeping for 5 seconds 😴")

The above function is a Faust timer task that will be triggered every five seconds. In each iteration, we generate 20 random temperature readings, and send them to the TOPIC topic that we defined earlier.

The last step is to create our agent, which is the Faust term for the processing function that will be triggered when a message is received on the source topic.

@app.agent(app.topic(TOPIC, key_type=str, value_type=Temperature)) async def calculate_tumbling_temperatures(temperatures): async for temperature in temperatures: value_list = tumbling_table["events"].value() value_list.append(temperature) tumbling_table["events"] = value_list

In each iteration of the calculate_tumbling_temperatures function, we receive a temperature reading from the source topic and append it to the list of events that are allocated to the current window. We then update the table with the new list.

Finally, we have enough to run our application! To start a Faust worker run the following command:

faust -A temperature worker -l info

The -A parameter tells Faust to use the temperature module, and the worker parameter tells Faust to run a worker process. The -l info parameter tells Faust to log the output at the info level.

If all is well, our output should look something like this:

[2022-10-16 19:40:36,188] [88447] [INFO] [^Worker]: Ready [2022-10-16 19:40:41,203] [88447] [WARNING] Producer is sleeping for 5 seconds 😴 [2022-10-16 19:40:46,214] [88447] [WARNING] Producer is sleeping for 5 seconds 😴 [2022-10-16 19:40:51,928] [88447] [WARNING] Processing window: 40 events, Aggreged results: <AggTemperature: ts=1665945640.0, count=40, mean=41.025, min=4, max=95> [2022-10-16 19:40:56,232] [88447] [WARNING] Producer is sleeping for 5 seconds 😴 [2022-10-16 19:41:01,237] [88447] [WARNING] Producer is sleeping for 5 seconds 😴 [2022-10-16 19:41:01,939] [88447] [WARNING] Processing window: 40 events, Aggreged results: <AggTemperature: ts=1665945650.0, count=40, mean=56.875, min=8, max=97> [2022-10-16 19:41:06,241] [88447] [WARNING] Producer is sleeping for 5 seconds 😴 [2022-10-16 19:41:11,250] [88447] [WARNING] Producer is sleeping for 5 seconds 😴 [2022-10-16 19:41:11,948] [88447] [WARNING] Processing window: 40 events, Aggreged results: <AggTemperature: ts=1665945660.0, count=40, mean=49.525, min=5, max=96>

As you can see, the application is running. Every five seconds, it is generating 20 temperature readings and sending them to the source topic. Every 10 seconds, it is processing the events in the current window and sending the aggregated results to the sink topic.

We can take a peek at the data in the sink topic by running the following command:

rpk topic consume agg-temperature --brokers 127.0.0.1:53231

The output should look something like this:

{ "topic": "agg-temperature", "value": "{\"ts\":1665945640.0,\"count\":40,\"mean\":41.025,\"min\":4,\"max\":95,\"__faust\":{\"ns\":\"temperature.AggTemperature\"}}", "timestamp": 1665945652146, "partition": 0, "offset": 0 } { "topic": "agg-temperature", "value": "{\"ts\":1665945650.0,\"count\":40,\"mean\":56.875,\"min\":8,\"max\":97,\"__faust\":{\"ns\":\"temperature.AggTemperature\"}}", "timestamp": 1665945661940, "partition": 0, "offset": 1 } { "topic": "agg-temperature", "value": "{\"ts\":1665945660.0,\"count\":40,\"mean\":49.525,\"min\":5,\"max\":96,\"__faust\":{\"ns\":\"temperature.AggTemperature\"}}", "timestamp": 1665945671951, "partition": 0, "offset": 2 }

Included in the code repository is a docker-compose.yml file that will start a Redpanda cluster and the Faust worker in a container. To start the cluster with one command, run the following command:

docker-compose up -d

The output should be similar to the previous example, but this way you don't even have to create your Python environment!

Wrapping up

In this tutorial, we took a look at how to use Faust to build a streaming application that aggregates temperature readings from multiple sensors. Stream processing has never been more accessible thanks to Redpanda and libraries like Faust. The combination of these two technologies allows you to build powerful streaming applications with very little code.

As a reminder, all the code for this tutorial is available on GitHub here.

If you are interested in how Faust compares to other stream processing frameworks, like Spark streaming check out this article by James Kinley or, for a more detailed comparison between frameworks, take a look at this article by Jakkie Koekemoer.

Take Redpanda for a test drive here. Check out the documentation to understand the nuts and bolts of how the platform works, or read more blogs to see the plethora of ways to integrate with Redpanda. To ask Redpanda's Solution Architects and Core Engineers questions and interact with other Redpanda users, join the Redpanda Community on Slack.

Let's keep in touch

Subscribe and never miss another blog post, announcement, or community event. We hate spam and will never sell your contact information.