How we built our async topic transformation engine in Wasm.
Download the official O’Reilly Media book WebAssembly: The Definitive Guide for free, courtesy of Redpanda. Get it here.
When building streaming systems, we often encounter questions about having users provide a custom function to perform some transformation on a stream. When applied to a distributed log like Redpanda, custom functions can add value by transforming topics asynchronously, providing a “materialized view” on top of existing topics, and many other applications we’re still mulling over.
In this blog post, I will describe how we spent the last year creating a technical preview of our first implementation of an asynchronous log transformation engine, as well as why we decided to have our implementation include WebAssembly (Wasm), and how Redpanda Data Transforms can help you transform your own Redpanda topics.
A brief overview of Redpanda Data Transforms
Redpanda Data Transforms gives users the ability to push custom code to Redpanda. This code can be used to transform one or more topics without programmers having to worry about the details of building and managing a complete system to perform the transformations. There are three main goals of the Data Transforms capability:
Ease of use: You should be able to code in your favorite Wasm-supported languages (only JS is supported in this release). No need to worry about which Apache Kafka® libraries you’ll need to leverage or any low-level external dependencies. Your code gets record batches sent directly into it, straight from the local Redpanda node.
Performance: Your transform is pushed to all nodes in the cluster and calls to your code boil down to an RPC call over localhost. This avoids expensive raft chatter that you would otherwise incur if using an external producer and consumer.
Operational simplicity: Creating a separate transformer comes with more configuration and set up. However, with Data Transforms, you just push your code to Redpanda and it is durably stored within a dedicated topic and completely managed by Redpanda.
When developing the initial idea of a data transformation engine, we wanted to give the user full control over the unit of execution that is mapping a target log into one or more product logs. That meant no DSLs and full support for one or more popular programming languages that users would feel familiar coding with. It would also give them the advantage of importing any library of their liking. The implications of these foundational design decisions will touch on performance, security, adaptability, and ability to integrate with other libraries/codebases.
In the tech preview, the transforms will be deployed to a separate sidecar process called the “async engine.” It’s a NodeJS service deployed on each Redpanda node that responds to requests from the local Redpanda process over localhost.
When data from a Data Transforms subscription topic(s) arrives, it is routed to the async engine, where it will be forwarded to all applicable transforms. The transformation’s response is then sent back to Redpanda where it will be written to a resultant topic we call a “materialized topic.”
We consider this an active area of research. Today, we only support the “async” Wasm sidecar process, but we are also experimenting with other potential solutions. One such solution is the potential for embedding v8 and running Wasm functions inline, which we will discuss in a future post.
Routing streams - the pacemaker
At the heart of the implementation is something we call the “pacemaker.” Simply put, it is a pacing mechanism that routes data to each transform as fast as it can, within certain limits. For example, reading large amounts of data from disk if the coprocessors are lagging would have a negative impact on performance, so we researched the best ways to maximize total system throughput.
We’ve found that the best way to perform the work done within each coprocessor’s read/map/write event fiber is to maximize the level of parallelism up until a limiting factor, which we’ve determined is a single transform’s
apply() method. To maintain ordering guarantees (and the sanity of the developer) we guarantee that no single coprocessor’s
apply() method will be called concurrently.
Performing all required reads and sending one big batch to the Wasm engine was worse for performance due to not exploiting the level of concurrency that the seastar reactor provides us. The main fiber was waiting for a response from multiple coprocessors, wasting cycles that could have been used performing reads for coprocessors that have already completed their
apply() transforms. That is why, per-shard, there is a fiber of execution for each coprocessor.
The workflow for each fiber is simple:
Poll for data across all inputs in parallel.
Perform an RPC to the Wasm engine passing all of the record batches and any additional metadata to pair each individual sub-request to a particular coprocessor.
Interpret the response by writing responses to resultant topics and incrementing the respective offsets.
Note that Step 1 cannot proceed for a single coprocessor until Step 3 has completed. If it did, then out of order results will be written to materialized topics. To maximize concurrency, there is one of these fibers per coprocessor-shard combination.
Replay and error handling
One challenge we encountered during development was getting message replay right. Message replay must be a specifically handled case because requests and responses are batched. An error may occur during the handling of a batch, resulting in partially committed work. This means upon replay, additional state must be retained to keep track of which responses should be used, and which can be thrown away. The additional piece of state holds the highest processed input for each output topic, per input. Therefore, the read head is the lower bound of all materialized offsets and, if a response for an already processed request is processed, this can easily be identified by taking a look at its respective materialized offset. If it’s higher than the lower bound it can be skipped, as it has already been processed.
In the example above, a coprocessor’s fiber is depicted with two inputs:
Bar can be considered fully processed, while input
Foo will immediately be detected as having to replay. Redpanda Data Transforms will read from the lowest recorded offset, 450 in this case.
Furthermore, after receiving a response from the coprocessor, Redpanda must recognize this was a case where work was partially committed, so it can ignore writing responses to materialized topic
Foo._result2_ and only proceed with writing data to materialized topic
What we’ve done is create a system that allows users to asynchronously map one topic to another, solving a common problem in streaming. The idea to keep the Wasm engine itself simple and stateless provided a solid foundation. Keeping state within the Wasm engine would have meant synchronizing this with Redpanda somehow. Moving to this design shifted the bulk of the complexity into Redpanda itself, where the problems above were eventually discovered and addressed.
This implementation will be iterated on, and we’re interested in hearing your feedback on our coprocessor API. Wasm engine performance will be addressed by a redesign where it will support concurrent execution of transforms, and full Wasm support will be addressed in the future, as well. We hope you enjoy using our first iteration of Data Transforms as much as we did developing it.
Getting started with Redpanda Data Transforms
Data Transforms is going live first in a special tech preview of Redpanda. The first version to contain all of the changes needed to run data transforms is
v21.12.1-wasm-beta1. You can get the code here.
To start, you’ll need to modify your Redpanda configuration file to have the
enable_coproc flag set to
true. You’ll also need to start the Wasm engine. If you’re trying this out via
rpk container the Wasm engine starts up automatically once you’ve set the
enable_coproc flag to
true. For the users trying this out via our deb/rpm install packages, you can start the Wasm engine with the command
sudo systemctl start wasm_engine.
To make your own coprocessors you can start with
rpk wasm generate to create a sample project. Build your coprocessor and deploy it with
rpk wasm deploy to launch your custom transform.
For help, join our Slack community or check out the documentation here, or review documentation about installing tech previews here.
Let's keep in touch
Subscribe and never miss another blog post, announcement, or community event. We hate spam and will never sell your contact information.