Structured stream processing with Redpanda and Apache Spark

How to build a streaming application with Docker, Apache Spark, and Redpanda.

By
on
January 18, 2022

Redpanda is an Apache Kafka®-compatible streaming data platform for mission critical workloads where performance, data safety, and transactionality are of essence. This compatibility makes it very simple to use Redpanda with your existing infrastructure, and that includes Apache Spark®.

Spark is the ever-popular analytics engine for large-scale data processing. Born out of UC Berkeley in 2014 and later donated to the Apache Software Foundation, Spark has become synonymous with big data analytics. It was originally built to address the limitations of the MapReduce algorithm by allowing functions to be executed iteratively on a Resilient Distributed Dataset (RDD), which is an abstraction of a dataset that is logically partitioned and cached in memory on many machines. Over the years Spark has evolved to include further abstractions on top of the core RDD concept, including Spark SQL, MLLib, and Spark Streaming®. This brings us nicely to the topic of this blog.

Spark Streaming extends the core Spark API to support integrations with live data streams generated by a variety of sources, including Redpanda. Internally, a stream is divided into micro-batches (sequence of RDDs) that are continuously processed by the Spark engine to generate batches of results.

Spark Streaming

Furthermore, Spark’s Structured Streaming extends the Spark SQL API to allow computation to be expressed in the same way as you would query a static RDD, but for streams. The engine takes care of dividing the stream into micro-batches and executing SQL queries as a series of small batch jobs.

Streaming data from Redpanda to Spark

Spark Streaming and Structured Streaming are well integrated with Apache Kafka, and if you didn’t already know, Redpanda fully supports the Kafka API so these integrations work seamlessly with Redpanda too. It’s as simple as creating a SparkSession and a streaming DataFrame to read messages from a Redpanda topic. Here is an example in Scala, but it’s just as easy in Java or Python if you prefer:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession
    .builder
    .appName("RedpandaSparkStream")
    .getOrCreate()
import spark.implicits._

val stream = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", <brokers>)
    .option("subscribe", <topics>)
    .option("startingOffsets", "earliest")
    .load()

stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

Redpanda and Structured Streaming demo

Let’s walk through an example to demonstrate how to stream data from Redpanda into a Structured Streaming application, execute a simple transformation on the stream, and write the results back to Redpanda. You can find all of the code and configuration used in this blog on Github here.

Docker Desktop

The easiest way to get started is to use this docker-compose.yml file to spin up an environment on your laptop with Docker containers for the Spark Master, Spark Worker, and Redpanda:

$ git clone https://github.com/redpanda-data)/redpanda-examples.git
$ cd redpanda-examples/docker-compose
$ docker-compose -f redpanda-spark.yml up -d
[+] Running 4/4
 ⠿ Network docker-compose_redpanda_network  Created
 ⠿ Container spark-master                   Started
 ⠿ Container redpanda                       Started
 ⠿ Container spark-worker                   Started

There are a few things to note here. Redpanda is designed to consume all of the available resources on a host, so to make sure it plays nicely on your laptop the Compose file starts Redpanda with a single core (--smp 1), 1GB of memory (--memory 1G), and is instructed to not pin its threads, which will help to lower CPU utilisation (--overprovisioned). On the Spark side, the web UI ports are published to localhost for the Spark master (8080) and Spark worker (8081).

Streaming queries

Write messages to Redpanda

Before streaming data out of Redpanda into Spark we must first put some data in. ProducerExample is a simple producer implementation in Scala that writes stock market activity data to a Redpanda topic in JSON format. The data used in this example is historical S&P 500 (SPX) data downloaded from the Nasdaq website.

{
    "Date":"12/10/2021",
    "Close/Last":"4712.02",
    "Volume":"--",
    "Open":"4687.64",
    "High":"4713.57",
    "Low":"4670.24"
}

The producer can be run with sbt or scala, and it accepts a producer configuration file redpanda.config and topic name as arguments:

$ cd redpanda-examples/scala
$ sbt "run redpanda.config spx_history"
Multiple main classes detected. Select one to run:
[1] com.redpanda.examples.clients.ConsumerExample
[2] com.redpanda.examples.clients.ProducerExample *
[3] com.redpanda.examples.spark.RedpandaSparkStream
...
$ sbt clean assembly
$ scala -classpath target/scala-2.12/redpanda-examples-assembly-1.0.0.jar com.redpanda.examples.clients.ProducerExample redpanda.config spx_history

Run the Structured Streaming application

The Structured Streaming application RedpandaSparkStream reads the JSON formatted messages from Redpanda into a Spark SQL DataFrame. A simple function is applied to the DataFrame to derive a new column (the difference between the high and low stock price), before writing the output to another Redpanda topic. The modified stream is also printed to the console so that you can see what's going on.

To avoid having to install Spark on your local machine, spin up another Docker container with the necessary libraries already installed. To make it even easier, bind mount the local directory on the container for access to the .jar file, and publish port 4040 so you can browse to the application’s web UI from localhost:

$ docker run --rm -it --user=root \
  -e SPARK_MASTER="spark://spark-master:7077" \
  -v `pwd`:/project \
  -p 4040:4040 \
  --network docker-compose_redpanda_network \
  docker.io/bitnami/spark:3 /bin/bash

Use spark-submit to run the application from within the container, passing the Redpanda address, input topic, and output topic as parameters. Note that because this client container is running within the Docker network you must use the internal Redpanda address to connect (172.24.1.4:9092) as opposed to the external address (localhost:19092):

# spark-submit \
  --master $SPARK_MASTER \
  --class com.redpanda.examples.spark.RedpandaSparkStream \
  /project/target/scala-2.12/redpanda-examples-assembly-1.0.0.jar \
  172.24.1.4:9092 spx_history spx_history_diff

Read results from Redpanda

To complete the end-to-end data flow, consume the output from the Structured Streaming application using ConsumerExample:

$ cd redpanda-examples/scala
$ sbt "run redpanda.config spx_history_diff"
Multiple main classes detected. Select one to run:
 [1] com.redpanda.examples.clients.ConsumerExample *
 [2] com.redpanda.examples.clients.ProducerExample
 [3] com.redpanda.examples.spark.RedpandaSparkStream
...
$ sbt clean assembly
$ scala -classpath target/scala-2.12/redpanda-examples-assembly-1.0.0.jar com.redpanda.examples.clients.ConsumerExample redpanda.config spx_history_diff

Or simply use rpk:

$ rpk topic consume spx_history_diff \
  --brokers localhost:19092 \
  --offset "start"

What About Wasm?

Thanks to the maturity of the Spark API it’s easy to connect to Redpanda as the data source and destination for your streaming analytics. This allows you to build complex data processing pipelines that can deliver high performance, scalability, and durability from end-to-end.

You may be wondering where Redpanda’s Wasm Data Transforms fit into this architecture. Transforms enable Redpanda users to create WebAssembly (Wasm) functions to perform simple transformations on topics. Wasm functions run inside Redpanda, avoiding the need to send data to a stream processing engine to perform common transformation tasks — “data ping-pong.” Transforms are well suited to one-shot transformations, such as redacting data for GDPR compliance, or mapping from one format to another. In the majority of cases, you can use WASM functions as replacements for map() or filter() functions in Spark. Tools like Spark and Apache Flink® can be used to supplement Transforms when your processing tasks involve complex aggregations, windowing, and joining to other streams.

No items found.

Related articles

VIEW ALL POSTS
Real-time product recommendation AI inferencing
Tyler Rockwood
&
&
&
January 14, 2025
Text Link
Real-time analytics with MongoDB and Redpanda Connect
Aykut Bulgu
&
&
&
January 7, 2025
Text Link
Flag non-compliant content in real time with AI and Pinecone
Keanan Koppenhaver
&
&
&
December 24, 2024
Text Link