Build an ETL pipeline for streaming data with Apache Beam and Redpanda

Get an overview of ETL pipelines, where to use them, and how to build an ETL pipeline for streaming data with Apache Beam and Redpanda.

By
on
November 23, 2023

Apache Beam® is a versatile open-source framework for designing and executing data processing pipelines. It can handle both batch and streaming data, and allows you to create a seamless data processing workflow, encompassing everything from reading data from source systems to applying transformations and writing the output to target systems.

One of Apache Beam's most notable features is its language-specific SDKs, which include Python, Java, and Go SDKs. These allow developers to write pipeline code in their preferred programming language. Beam also boasts strong portability; the same pipeline code can be executed on various execution engines such as Apache Flink™, Apache Spark™, and Google Cloud Dataflow. In addition, Apache Beam offers a Direct Runner feature to execute pipelines on a single node or machine.

Beam's flexibility enhances collaboration and allows you to maximize the use of existing skill sets within your team. Its unified programming model simplifies the development of data processing pipelines, saving time and reducing complexity.

In this tutorial, you'll learn how to build a streaming ETL pipeline using Apache Beam and Redpanda. Through this practical example, you'll become more comfortable with Beam and develop the skills needed for your own data processing pipelines.

How to build a streaming ETL Pipeline with Apache Beam and Redpanda

Imagine you’re building an e-commerce application that captures product orders and customer geolocation information. Your objective is to analyze real-time data on product orders from specific regions based on a predefined mapping of state codes to regions.

To filter and analyze the data, you use the corresponding state codes to enrich the incoming data with descriptive information about the states. The demo data set used in this tutorial only consists of two regions, North and South, each comprising five states. The mapping between state codes and regions is provided later through a class calledStateAndRegion.java.

The tutorial example uses the following data processing pipeline structure:

  • A real-time streaming ETL data processing pipeline reads data from an input topic in Redpanda.
  • Data from incoming data streams is filtered based on a specific region (in this case, the southern region).
  • The data is then enriched through a lookup operation. This is a transformation that involves performing a lookup on mapping information that contains a state code and its corresponding description.
  • After the lookup, the retrieved state description information is added to the data stream.
  • Finally, the filtered and enriched data is written to an output topic in Redpanda.

The following diagram illustrates how Beam and Redpanda work together for this solution:

Architecture diagram

Architecture diagram

Prerequisites

To complete this tutorial, you'll need the following:

Confirm the Redpanda container is running and create topics

To start, check that you have a running Redpanda container using the following command:

docker ps

docker ps

If you’re running a single-node Redpanda cluster per the quick start instructions, then you should see an output similar to this:

CONTAINER ID   IMAGE                                                COMMAND                   CREATED         STATUS         PORTS                                                                                                              NAMES
cec1bdff492c   docker.redpanda.com/redpandadata/console:v2.2.4      "/bin/sh -c 'echo \"$…"   6 minutes ago   Up 6 minutes   0.0.0.0:8080->8080/tcp                                                                                             redpanda-console
ebccfc877212   docker.redpanda.com/redpandadata/redpanda:v23.1.11   "/entrypoint.sh redp…"    6 minutes ago   Up 6 minutes   8081-8082/tcp, 0.0.0.0:18081-18082->18081-18082/tcp, 9092/tcp, 0.0.0.0:19092->19092/tcp, 0.0.0.0:19644->9644/tcp   redpanda-0

You can execute the following command and check the status of the Redpanda cluster:

docker exec -it redpanda-0 rpk cluster info

You should see an output similar to the one below:

CLUSTER
=======
redpanda.25f35248-fddb-4316-89bb-88a7cfba6242

BROKERS
=======
ID    HOST        PORT
0*    redpanda-0  9092

TOPICS
======
NAME      PARTITIONS  REPLICAS
_schemas  1           1


Execute the following command to connect to the Redpanda container's terminal session:

docker exec -it redpanda-0 bash

Then, execute the following CLI command inside the Redpanda container's terminal session to list the topics:

rpk topic list

By default, there won't be any topics listed in response to the rpk topic list command. Execute the following commands inside the Redpanda container's terminal session to create some topics for the tutorial:

rpk topic create user_activity
rpk topic create events_from_south

user_activity is the source topic where the e-commerce application's user activities and IDs and state information are published. The events_from_south topic serves as the destination topic to hold filtered and enriched data.

Execute the rpk topic list command again to check the created topics. You should see an output as shown below:

NAME               	PARTITIONS  REPLICAS
events_from_south  	1          	1
user_activity      	1         		1

Alternatively, you can use the Redpanda console UI to check the Redpanda cluster status and to explore or interact with your topics.

Keep this Redpanda container's terminal session open, as you'll use it again after creating the Apache Beam application.

Create an Apache Beam data streaming pipeline application in Java

Now that the Redpanda container instance is up and running and the topics are ready, you can start building the Beam data streaming pipeline application in Java.

1. Create a Maven project

Create a project directory called streaming-pipeline-with-redpanda-and-apache-beam on your machine. Use an IDE of your choice to create a Java Maven project with the following structure:

streaming-pipeline-with-redpanda-and-apache-beam
|-- pom.xml
|-- src
    |-- main
    |   |-- java
    |       |-- org
    |           |-- example
    |               |-- App.java
    |-- test
    |   |-- java
    |       |-- org
    |           |-- example
    |               |-- AppTest.java

The final project structure and tutorial code can be seen in this GitHub repository.

2. Add project dependencies

Edit the pom.xml file and include the following code to add the necessary dependencies:

 <dependencies>
    <!-- Apache Beam -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-core</artifactId>
      <version>${beam.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-direct-java</artifactId>
      <version>${beam.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-kafka</artifactId>
      <version>${beam.version}</version>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>

    <!-- Kafka -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>${kafka.version}</version>
    </dependency>

    <!-- SLF4J for logging -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-simple</artifactId>
      <version>${slf4j.version}</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.26</version>
      <scope>compile</scope>
    </dependency>
  </dependencies>

Then, add the following plugins:

<build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <version>3.2.0</version>
        <configuration>
          <archive>
            <manifest>
              <mainClass>org.example.App</mainClass>
            </manifest>
          </archive>
        </configuration>
      </plugin>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.3.0</version>
        <configuration>
          <archive>
            <manifest>
              <mainClass>org.example.App</mainClass>
            </manifest>
          </archive>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

The Beam JAR files are included as dependencies in this XML file. This file also includes Kafka client JARs that allow the file to interact with the Redpanda cluster. Additionally, the full code includes a <properties> section, which incorporates the version details of the libraries and build plugins required for the project. This information helps the build tool construct the application JAR, ensuring that the application is built with the correct versions of each component.

3. Define the main application class

Override the contents of the autogenerated App.java file with the following code to implement the main application class functionality for this tutorial's use case:

package org.example;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;

public class App {

    private static final Logger LOG = LoggerFactory
            .getLogger(App.class);

    /**
     * Specific pipeline options
     */
    public interface Options extends PipelineOptions {
        @Description("Kafka Bootstrap Servers")
        @Default.String("localhost:9092")
        String getKafkaServer();

        void setKafkaServer(String value);

        @Description("Kafka Input Topic Name")
        @Default.String("user_activity")
        String getInputTopic();

        void setInputTopic(String value);

        @Description("Kafka Output Topic Name")
        @Default.String("events_from_south")
        String getOutputTopic();

        void setOutputTopic(String value);

        @Description("Duration to wait in seconds")
        @Default.Long(-1)
        Long getDuration();

        void setDuration(Long duration);

    }

    /**
     * Filter only the events from the southern region and enrich the data with the state description
     */
    private static EnrichedUserActivity filterAndEnrichEvents(String row) throws JsonProcessingException {
        StateAndRegion stateAndRegion = new StateAndRegion();
        EnrichedUserActivity enrichedUserActivity = new EnrichedUserActivity();
        ObjectMapper objectMapper = new ObjectMapper();
        UserActivity userActivity = objectMapper.readValue(row, UserActivity.class);
        if (stateAndRegion.getRegionByStateCode(userActivity.getStateCode()).equals("South")) {
            System.out.println("South");
            enrichedUserActivity.setUserId(userActivity.getUserId());
            enrichedUserActivity.setProductId(userActivity.getProductId());
            enrichedUserActivity.setStateDescription(stateAndRegion.getStateDescriptionByStateCode(
                    userActivity.getStateCode()));
            enrichedUserActivity.setRegion("South");
        }
        System.out.println("enrichedUserActivity is : " + enrichedUserActivity);
        return enrichedUserActivity;
    }

    public static void main(String[] args) throws Exception {
        // PipelineOptionsFactory.fromArgs(args) creates an instance of PipelineOptions from the
        // command line arguments passed to the application.
        // PipelineOptions is a configuration interface that provides a way to set options for a pipeline,
        // such as the runner to use, the number of workers, and any pipeline-specific options.
        Options options = PipelineOptionsFactory.fromArgs(args)
                .withValidation().as(Options.class);
        LOG.info("Pipeline options are: ");
        LOG.info(options.toString());
        LOG.info("Printed Pipeline options");
        Pipeline pipeline = Pipeline.create(options);

        // It now connects to the queue and processes every event.
        // The pipeline.apply() method reads data from the Redpanda topic using the KafkaIO.read() method.
        PCollection<String> data = pipeline.apply(
                "ReadFromKafka",
                KafkaIO.<String, String> read()
                        .withBootstrapServers(options.getKafkaServer())
                        .withTopics(
                                Collections.singletonList(options
                                        .getInputTopic()))
                        .withKeyDeserializer(StringDeserializer.class)
                        .withValueDeserializer(StringDeserializer.class)
                        .withoutMetadata()).apply("ExtractPayload",
                // The Values.create() method extracts the values from the Kafka records read from the topic.
                Values.<String> create());

        data.apply(ParDo.of(new DoFn<String, String>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                System.out.println(String.format("** element |%s| **",
                        c.element()));
            }
        }));

        // The code below filters the events coming from the states that belong to the southern region, then
        // enriches the event information by transforming the state code into the state description.
        // Finally, it writes the southern region's events to their own topic.
        PCollection<String> enrichedAndSegregatedEvents = data.apply("Filter and Enrich Event Information",
                ParDo.of(new DoFn<String, String>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) throws JsonProcessingException {
                        EnrichedUserActivity enrichedUserActivity = filterAndEnrichEvents(c.element());
                        if (enrichedUserActivity.getUserId() != null) {
                            c.output(enrichedUserActivity.toString());
                        }
                    }
                }));

        // The following transformation snippet processes each element in enrichedAndSegregatedEvents by creating a
        // new KV element where the key is the string "South" and the value is the original element from the
        // earlier computed enrichedAndSegregatedEvents variable.
        PCollection<KV<String, String>> eventsKV = enrichedAndSegregatedEvents
                .apply("Prepare Events for the Output Topic",
                        ParDo.of(new DoFn<String, KV<String, String>>() {
                            @ProcessElement
                            public void processElement(ProcessContext c)
                                    throws Exception {
                                /*
                                System.out.println("c.element is : " + c.element());
                                System.out.println("c.element KV is : " + KV.of("South", c.element()));
                                System.out.println("c.element KV is : " + KV.of("region", c.element()));
                                */
                                c.output(KV.of("South", c.element()));
                            }
                        }));

        // Publish the above filtered and enriched events to the destination topic
        eventsKV
                .apply("WriteToKafka",
                        KafkaIO.<String, String> write()
                                .withBootstrapServers(
                                        options.getKafkaServer())
                                .withTopic(options.getOutputTopic())
                                .withKeySerializer(
                                        org.apache.kafka.common.serialization.StringSerializer.class)
                                .withValueSerializer(
                                        org.apache.kafka.common.serialization.StringSerializer.class));

        // Initiate the pipeline execution
        PipelineResult run = pipeline.run();
        // The waitUntilFinish method is used to block the main thread until the pipeline execution is complete or
        // until the specified duration has elapsed. In this case, the duration is set to -1 and hence the pipeline
        // will continue running until it is explicitly terminated or encounters an error.
        run.waitUntilFinish(Duration.standardSeconds(options.getDuration()));
    }
}

There are inline comments in the code above to help you understand the multiple actions in the App class.

In general, PipelineOptions plays a crucial role in configuring and parameterizing a Beam pipeline. withValidation() applies validation to the options to ensure that they conform to the expected properties, and as(Options.class) casts the options to the custom Options interface defined as part of the App class. The Options interface specifies the configuration options for the pipeline. Using this interface, the pipeline can be configured with different options at runtime, like Kafka topics, Kafka brokers, and other pipeline-specific options.

The code connects to the Redpanda cluster and reads the events from the configured input topic. The ReadFromKafka transformation step reads the data from a Kafka topic using the KafkaIO connector and returns a PCollection object. PCollection is a fundamental data structure of Apache Beam that represents a distributed data set that the pipeline operates on.

The code also creates the PCollection<String> object named data.

As part of the Filter and Enrich Event Information pipeline transformation, you filter only the events from the southern region and enrich the data with the state description. To achieve this, you'll use other methods like filterAndEnrichEvents and classes like StateAndRegion, EnrichedUserActivity, and UserActivity. You'll create these helper classes in the next steps.

The filtered and enriched southern region events are then written to the output topic in Redpanda as part of the WriteToKafka transformation.

You've now finished setting up the App.class functionality.

4. Create helper classes

Next, you'll create the other helper classes mentioned above. Create a class called EnrichedUserActivity.java in the org.example package for holding enriched data/event information to write to the output topic, and paste in the code below:

package org.example;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@NoArgsConstructor
@AllArgsConstructor
@Data
public class EnrichedUserActivity {

    @JsonProperty("user_id")
    private String userId;
    @JsonProperty("product_id")
    private int productId;
    @JsonProperty("state_description")
    private String stateDescription;

    @JsonProperty("region")
    private String region;

}

Create a class named StateAndRegion.java in the same package and paste in the following code to define the mapping information related to the states and regions of the country:

package org.example;

import java.util.HashMap;

public class StateAndRegion {

    private HashMap<String, String> stateRegionMap = new HashMap<>();
    private HashMap<String, String> stateCodeAndDescriptionMap = new HashMap<>();
    StateAndRegion() {
        stateRegionMap.put("TN", "South");
        stateRegionMap.put("AP", "South");
        stateRegionMap.put("KL", "South");
        stateRegionMap.put("KA", "South");
        stateRegionMap.put("PY", "South");
        stateRegionMap.put("DL", "North");
        stateRegionMap.put("RJ", "North");
        stateRegionMap.put("UK", "North");
        stateRegionMap.put("UP", "North");
        stateRegionMap.put("HP", "North");

        stateCodeAndDescriptionMap.put("TN", "Tamil Nadu");
        stateCodeAndDescriptionMap.put("AP", "Andhra Pradesh");
        stateCodeAndDescriptionMap.put("KL", "Kerala");
        stateCodeAndDescriptionMap.put("KA", "Karnataka");
        stateCodeAndDescriptionMap.put("PY", "Pondicherry");
        stateCodeAndDescriptionMap.put("DL", "Delhi");
        stateCodeAndDescriptionMap.put("RJ", "Rajasthan");
        stateCodeAndDescriptionMap.put("UK", "Uttarakhand");
        stateCodeAndDescriptionMap.put("UP", "Uttar Pradesh");
        stateCodeAndDescriptionMap.put("HP", "Himachal Pradesh");
    }
    public String getRegionByStateCode(String stateCode) {
        return stateRegionMap.get(stateCode);
    }

    public String getStateDescriptionByStateCode(String stateCode) {
        return stateCodeAndDescriptionMap.get(stateCode);
    }
}

This class contains the HashMap variable with the mapping details of each state code to its region, as well as each state code to its description. These data points are used in two ways during the data enrichment operation. First, they're used to filter data based on region. Secondly, they're involved in a lookup operation that maps state codes to their corresponding state descriptions.

Next, create a class named UserActivity.java in the org.example package to map the incoming data stream and paste in the following code:

package org.example;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@NoArgsConstructor
@AllArgsConstructor
@Data
public class UserActivity {

    @JsonProperty("user_id")
    private String userId;
    @JsonProperty("product_id")
    private int productId;
    @JsonProperty("state_code")
    private String stateCode;

}

You've created an entire Beam streaming data pipeline application in Java. Now for the moment of truth.

5. Build, execute, and test the application

Use the Maven build tool from your IDE or from the command line to build your project. If you are using the source code from the cloned repository, you can use the following command to build the project:

mvnw clean install

Once the application JAR is built, you can execute the application with the following command:

"<path_to_java_binary>/java" -jar target/streaming-pipeline-with-redpanda-and-apache-beam-1.0-SNAPSHOT-jar-with-dependencies.jar --runner=DirectRunner

Next, access the Redpanda container's terminal and execute the following command to produce some data to test the streaming application:

rpk topic produce user_activity

The console will be listening to input data that you can feed in by pasting the following JSON content:

{"user_id": "user1", "product_id": 1, "state_code": "TN"}
{"user_id": "user2", "product_id": 2, "state_code": "KL"}
{"user_id": "user3", "product_id": 3, "state_code": "DL"}
{"user_id": "user4", "product_id": 4, "state_code": "AP"}
{"user_id": "user5", "product_id": 5, "state_code": "HP"}

Note that the input data contains geolocation data from states belonging to both the South (TN, KL, AP) and North (DL, HP) regions.

Next, open another terminal, connect to the Redpanda container, and execute the following commands:

docker exec -it redpanda-0 bash
rpk topic consume events_from_south

The first command initiates a consumer connection to the Redpanda cluster, and the second listens to the output topic events_from_south, where the Apache Beam application has published the filtered and enriched data.

You should see the following output:

{
  "topic": "events_from_south",
  "key": "South",
  "value": "EnrichedUserActivity(userId=user1, productId=1, stateDescription=Tamil Nadu, region=South)",
  "timestamp": 1681468886805,
  "partition": 0,
  "offset": 0
}
{
  "topic": "events_from_south",
  "key": "South",
  "value": "EnrichedUserActivity(userId=user2, productId=2, stateDescription=Kerala, region=South)",
  "timestamp": 1681468886810,
  "partition": 0,
  "offset": 1
}
{
  "topic": "events_from_south",
  "key": "South",
  "value": "EnrichedUserActivity(userId=user4, productId=4, stateDescription=Andhra Pradesh, region=South)",
  "timestamp": 1681468886810,
  "partition": 0,
  "offset": 2
}

The above output clearly shows that only the southern region data was enriched with corresponding state descriptions and registered in the output topic.

Conclusion

Congratulations! You've successfully built a streaming ETL pipeline with Apache Beam and Redpanda and learned how to execute the data pipeline that you built. Apache Beam is an ideal choice for integrating with existing data infrastructure due to its support for a wide range of data sources and sinks, including Apache Cassandra, Google Cloud Pub/Sub, Apache Solr, Elasticsearch, Google BigQuery, Amazon S3, and Azure Blob Storage.

It simplifies the process of building data processing pipelines with a unified programming model that works seamlessly for both streaming and batch use cases. With Beam, developers can focus on the pipeline logic rather than the underlying infrastructure.

You can find the source code for this tutorial in this repository. Make sure to check out the documentation and browse the Redpanda blog for more tutorials. If you get stuck, just ask in the Redpanda Community on Slack.

Graphic for downloading streaming data report
Save Your Spot

Related articles

VIEW ALL POSTS
Build an inventory monitoring system with Flink and MongoDB
Rexford A. Nyarko
&
&
&
October 29, 2024
Text Link
8 business benefits of real-time analytics
Redpanda
&
&
&
October 22, 2024
Text Link
Vector databases vs. knowledge graphs for streaming data applications
Fortune Adekogbe
&
&
&
October 15, 2024
Text Link