As we enter the age of streaming data, it is becoming more common to see stream processing used as the default way of handling data. After all, if tech like Redpanda allows us to set up an easy-to-use streaming platform in minutes, why would we want to wait for batch-processing pipeline results?
In this article, we'll look at how Redpanda enables stream processing with Faust, a Python library originally developed by engineers at Robinhood and later forked by the community to keep it maintained.
You can access the code used in this demo at any time in this GitHub repository.
What is Faust?
Faust is a stream-processing library written in Python. It is designed to be used with streaming data platforms such as Apache KafkaⓇ and Redpanda. With Faust, we can define pipelines that ingest data from Kafka topics and do some calculations on them. Because Faust is written in Python, it is easy to combine with other Python data libraries such as Pandas, NumPy, and SciPy.
Faust uses RocksDB as its storage engine, which is a key-value store written in C++ that is optimized for lightning-fast performance. Persisting state uses an abstraction called a "table," which is a named and distributed collection of key-value pairs which we can interface with regular dictionaries from Python.
Anatomy a Faust application
Faust takes a lot of inspiration from the Kafka Streams library, mainly in the form of using the same abstractions (i.e., streams, tables, and joins). These abstractions enable an easy setup of stateful stream processing, which allows us to calculate aggregates that not only use the latest events, but we can combine them with values from the past. As an aside, storing state also enables fault tolerance, as well as it makes it possible to resume previous processing pipelines based on the saved state.
By the end of this article, we will have a complete stream processing application that will calculate the rolling average of temperature coming from various sensors. To understand how the parts come together to form the final application, let's take a look at them separately first.
At the core of Faust is the app
object. This is the object that allows us to define our stream processing pipelines.
import faust
app = faust.App(
"temperature-stream",
broker="kafka://localhost:9092",
store="rocksdb://",
)
The main parameters we need to pass to the app
object are the name of the application, the broker addresses, and the storage driver if we want to persist state. We can set the store
parameter to memory://
to use an in-memory store for development purposes, but for large tables we will want to use rocksdb://
, as it makes recovering datasets nearly instantaneous, while the in-memory storage driver can take up to minutes.
If you opt for RocksDB, for best performance save the data on an SSD if possible as RocksDB is highly optimized for flash storage. Also keep in mind that RocksDB is designed to use all the memory you give it access to, but this is something you can tune when creating your Faust Tables!
Next up, let's take a look at agents
.
Agents are the main building blocks of Faust applications. They are the objects that define the stream processors that we want to run. To define an agent you simply have to declare an async function with the @app.agent
decorator.
topic = app.topic('messages-topic')
@app.agent(topic)
async def say(messages):
async for message in messages:
print(message)
In the above example, we create a very basic agent that prints out all the messages it receives. The topic
parameter is a topic-description object that represents the Redpanda topic that the agent will listen to. The messages
parameter is a stream of messages that we can iterate over.
With just these small building blocks, we are ready to start building our application!
The platform
To run our application, we need to start a Redpanda cluster. The easiest way to do this is by using the official rpk tool.
rpk container start -n 1
This will start a single-node Redpanda cluster in a Docker container. After running the command we should see
Starting cluster
Waiting for the cluster to be ready...
NODE ID ADDRESS
0 127.0.0.1:59894
Cluster started! You may use rpk to interact with it. E.g:
rpk cluster info --brokers 127.0.0.1:59894
Take note of the address of the broker, as we will need it later, and off we go!
The stream processing application
As mentioned above, we are building an application that will calculate the rolling average of temperature coming from various sensors. We will generate mock data for this application, but in a real-world scenario this data could come from thousands of temperature sensors scattered around the world.
Now that we have a Redpanda cluster running, we can start building our stream processing application. We will start by creating a new file called temperature.py
and define our main Faust application. (This example has been developed on Python 3.10, but different versions of Python 3 should work as well, as long as you are able to pip install faust-streaming
.)
import faust
app = faust.App(
"temperature-stream",
broker="kafka://localhost:59894",
)
import faust
Next, we will define the schema of our input and output records using Faust-supported models. This allows us to define the records using Python syntax, and have Faust take care of serializing and deserializing the data for us.
class Temperature(faust.Record, isodates=True, serializer="json"):
ts: datetime = None
value: int = None
class AggTemperature(faust.Record, isodates=True, serializer="json"):
ts: datetime = None
count: int = None
mean: float = None
min: int = None
max: int = None
We define two classes, Temperature
and AggTemperature
. The Temperature
class represents the input records, and the AggTemperature
class represents the output records. The isodates
parameter tells Faust to use ISO 8601 format for dates and times, and the serializer
parameter tells Faust to use JSON for serialization. We could also add default values to certain fields of the records here in case they are missing from the input data, depending on how we want to handle them in the downstream computations. For now, we will leave them all as None
.
Next, we will define the topics that we will be using. We will have a topic for the raw temperature readings, and a topic for the rolling average.
TOPIC = "raw-temperature"
SINK = "agg-temperature"
source = app.topic(TOPIC, value_type=Temperature)
sink = app.topic(SINK, value_type=AggTemperature)
In the above code, we define two topics: TOPIC
and SINK
. The source
topic is where we will be ingesting the raw data, and the sink
topic is where we will be writing the results. We also define the value type of the topics to be the Temperature
and AggTemperature
classes that we defined earlier.
Persisting state
In order to calculate the rolling average, we need to keep track of the sum of the values we have seen so far, as well as the number of values we have seen. We can do this by using a Faust table. A table is an in-memory dictionary, backed by a Redpanda changelog topic used for persistence and fault tolerance. This is useful in case we have to rebuild the state of the table because of network failures or node restarts.
WINDOW = 10 # 10 seconds window
WINDOW_EXPIRES = 1
PARTITIONS = 1
tumbling_table = (
app.Table(
TABLE,
default=list,
key_type=str,
value_type=Temperature,
partitions=PARTITIONS,
on_window_close=window_processor,
)
.tumbling(WINDOW, expires=timedelta(seconds=WINDOW_EXPIRES))
.relative_to_field(Temperature.ts)
)
Let's break down the above code. First, we define the WINDOW
and WINDOW_EXPIRES
parameters. The WINDOW
parameter is the size of the window in seconds, and the WINDOW_EXPIRES
parameter is the amount of time in seconds that we want to wait before we consider a window to be expired, meaning the duration for which we want to store the data allocated to each window.
The window_processor
function is the function that will be called when a window is closed. This is where we will do the actual computation of the rolling average. Let's see how this looks.
def window_processor(key, events):
timestamp = key[1][0] # key[1] is the tuple (ts, ts + window)
values = [event.value for event in events]
count = len(values)
mean = sum(values) / count
min_value = min(values)
max_value = max(values)
aggregated_event = AggTemperature(
ts=timestamp, count=count, mean=mean, min=min_value, max=max_value
)
print(
f"Processing window: {len(values)} events, Aggreged results: {aggregated_event}"
)
sink.send_soon(value=aggregated_event)
The window_processor
function takes two parameters: key
and events
. The key
parameter is the key of the table, and the events
parameter is the list of events that were allocated to the window. In our case, the key is a tuple of the start and end timestamps of the window, and the events are the temperature readings that were allocated to the window.
We then calculate the count, mean, min, and max values of the events, and create a new AggTemperature
record with the results. Finally, we print the results to the console and send the aggregated record to the sink
topic, which we defined earlier.
Now that the middle (and the most important) part of our application is done, let's take a quick step back and implement the data generator function that will be responsible for pumping fake temperature sensor data into our Redpanda source topic.
@app.timer(interval=5.0)
async def generate_temperature_data():
temperatures_topic = app.topic(TOPIC, key_type=str, value_type=Temperature)
# Create a loop to send data to the Redpanda topic
# Send 20 messages every time the timer is triggered (every 5 seconds)
for i in range(20):
# Send the data to the Redpanda topic
await temperatures_topic.send(
key=random.choice(["Sensor1", "Sensor2", "Sensor3", "Sensor4", "Sensor5"]),
value=Temperature(
ts=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"),
value=randint(0, 100),
),
)
print("Producer is sleeping for 5 seconds 😴")
The above function is a Faust timer task that will be triggered every five seconds. In each iteration, we generate 20 random temperature readings, and send them to the TOPIC
topic that we defined earlier.
The last step is to create our agent
, which is the Faust term for the processing function that will be triggered when a message is received on the source
topic.
@app.agent(app.topic(TOPIC, key_type=str, value_type=Temperature))
async def calculate_tumbling_temperatures(temperatures):
async for temperature in temperatures:
value_list = tumbling_table["events"].value()
value_list.append(temperature)
tumbling_table["events"] = value_list
In each iteration of the calculate_tumbling_temperatures
function, we receive a temperature reading from the source
topic and append it to the list of events that are allocated to the current window. We then update the table with the new list.
Finally, we have enough to run our application! To start a Faust worker run the following command:
faust -A temperature worker -l info
The -A
parameter tells Faust to use the temperature
module, and the worker
parameter tells Faust to run a worker process. The -l info
parameter tells Faust to log the output at the info
level.
If all is well, our output should look something like this:
[2022-10-16 19:40:36,188] [88447] [INFO] [^Worker]: Ready
[2022-10-16 19:40:41,203] [88447] [WARNING] Producer is sleeping for 5 seconds 😴
[2022-10-16 19:40:46,214] [88447] [WARNING] Producer is sleeping for 5 seconds 😴
[2022-10-16 19:40:51,928] [88447] [WARNING] Processing window: 40 events, Aggreged results: <AggTemperature: ts=1665945640.0, count=40, mean=41.025, min=4, max=95>
[2022-10-16 19:40:56,232] [88447] [WARNING] Producer is sleeping for 5 seconds 😴
[2022-10-16 19:41:01,237] [88447] [WARNING] Producer is sleeping for 5 seconds 😴
[2022-10-16 19:41:01,939] [88447] [WARNING] Processing window: 40 events, Aggreged results: <AggTemperature: ts=1665945650.0, count=40, mean=56.875, min=8, max=97>
[2022-10-16 19:41:06,241] [88447] [WARNING] Producer is sleeping for 5 seconds 😴
[2022-10-16 19:41:11,250] [88447] [WARNING] Producer is sleeping for 5 seconds 😴
[2022-10-16 19:41:11,948] [88447] [WARNING] Processing window: 40 events, Aggreged results: <AggTemperature: ts=1665945660.0, count=40, mean=49.525, min=5, max=96>
As you can see, the application is running. Every five seconds, it is generating 20 temperature readings and sending them to the source
topic. Every 10 seconds, it is processing the events in the current window and sending the aggregated results to the sink
topic.
We can take a peek at the data in the sink
topic by running the following command:
rpk topic consume agg-temperature --brokers 127.0.0.1:53231
The output should look something like this:
{
"topic": "agg-temperature",
"value": "{\"ts\":1665945640.0,\"count\":40,\"mean\":41.025,\"min\":4,\"max\":95,\"__faust\":{\"ns\":\"temperature.AggTemperature\"}}",
"timestamp": 1665945652146,
"partition": 0,
"offset": 0
}
{
"topic": "agg-temperature",
"value": "{\"ts\":1665945650.0,\"count\":40,\"mean\":56.875,\"min\":8,\"max\":97,\"__faust\":{\"ns\":\"temperature.AggTemperature\"}}",
"timestamp": 1665945661940,
"partition": 0,
"offset": 1
}
{
"topic": "agg-temperature",
"value": "{\"ts\":1665945660.0,\"count\":40,\"mean\":49.525,\"min\":5,\"max\":96,\"__faust\":{\"ns\":\"temperature.AggTemperature\"}}",
"timestamp": 1665945671951,
"partition": 0,
"offset": 2
}
Included in the code repository is a docker-compose.yml
file that will start a Redpanda cluster and the Faust worker in a container. To start the cluster with one command, run the following command:
docker-compose up -d
The output should be similar to the previous example, but this way you don't even have to create your Python environment!
Wrapping up
In this tutorial, we took a look at how to use Faust to build a streaming application that aggregates temperature readings from multiple sensors. Stream processing has never been more accessible thanks to Redpanda and libraries like Faust. The combination of these two technologies allows you to build powerful streaming applications with very little code.
As a reminder, all the code for this tutorial is available on GitHub here.
If you are interested in how Faust compares to other stream processing frameworks, like Spark streaming check out this article by James Kinley or, for a more detailed comparison between frameworks, take a look at this article by Jakkie Koekemoer.
Take Redpanda for a test drive here. Check out the documentation to understand the nuts and bolts of how the platform works, or read more blogs to see the plethora of ways to integrate with Redpanda. To ask Redpanda's Solution Architects and Core Engineers questions and interact with other Redpanda users, join the Redpanda Community on Slack.
Stream processing is a way of handling data in real time by processing events as they arrive, rather than waiting for batches (or all) of the data to arrive before processing it. This is useful in many situations, especially in cases where you need to react to changes immediately (e.g., such as in case fraud detection and anomaly detection).
Let's keep in touch
Subscribe and never miss another blog post, announcement, or community event. We hate spam and will never sell your contact information.