Streamline data processing with Redpanda, Apache Spark, and Amazon S3

Build a data processing pipeline for audio files in a few simple steps

By
on
September 28, 2023

All human interactions lead to the generation of data. Digital data, in particular, is generated when people interact with computers in various forms, knowingly or unknowingly. This data is captured as text, images, and audio. While this data is valuable in its raw form, it becomes even more useful when you process it. The transformative journey of this data into actionable insights is known as data processing.

As the digital landscape continues to evolve, the volume and speed of data being processed have surged to unprecedented levels, revolutionizing virtually every industry. For instance, within Internet of Things (IoT) frameworks, data processing manages data from remote sensors. In the realm of stock trading, data processing extracts valuable insights from financial data. In location tracking systems and music streaming platforms, it enhances experiences through personalized recommendations, efficient searches, and much more.

In this post, you'll learn how to use Redpanda, Apache Spark™, and Amazon S3 to implement a data processing pipeline. But first, let’s get familiar with what they are and what they’re used for.

Get to know Apache Spark, Amazon S3, and Redpanda

To build a data processing pipeline, you need a local or cloud-based storage system. You also need an engine to analyze the data and a way to communicate between different services.

Apache Spark

Spark is a widely used open-source analytics engine that allows you to run data processing tasks of varying complexity on a single machine or a cluster of machines. It's built on an advanced distributed SQL engine. With Spark, you can stream data in a pipeline, carry out SQL analytics, and run data science and machine learning operations at scale.

In data processing, its high computational speed allows you to quickly execute tasks in batches or in real time, depending on your preferences. Apache Spark can also be used in your favorite languages, such as Python, SQL, Scala, and so on.

It also gives you access to libraries for various data processing tasks, which include GraphX for graph processing, MLib for machine learning tasks, and Structured Streaming for processing real-time data streams. Furthermore, it's compatible with data streaming platforms like Redpanda and Apache Kafka®, which makes integrating it into your projects super easy.

Amazon Simple Storage Service (Amazon S3)

Amazon S3 is an object storage service that is secure, scalable, durable, economical, and efficient. It's built to enable you to back up and archive your data online for any use case through a user-friendly management interface. With S3, you can manage secure access to your application, optimize costs, and satisfy business requirements.

S3 can power the storage system in your data processing workflow, allowing you to store and access your data securely from anywhere in the world. It can hold most forms of data and can also be interacted with using various programming languages like Python, JavaScript, and Java. This enables you to interact with the data stored in it without having to worry much about the tools to use.

S3 also offers several storage classes, which include S3 Standard for frequently accessed data, S3 Intelligent-Tiering for data with unknown or changing access patterns, and S3 Glacier for long-term data archiving.

Redpanda

Redpanda is a powerful streaming data platform designed to optimize your data processing and distribution tasks. It offers high performance with a straightforward setup, with no external dependencies. Since Redpanda is API-compatible with Kafka, you can use all your preferred Kafka client libraries with it. You can think of it Kafka but simpler, faster, and without the complexity.

Now that you’re familiar with the tools, let’s dig into the tutorial. To get you up and running faster, we’ve added the source code in this GitHub repository.

Build a data processing pipeline with Redpanda, Apache Spark, and Amazon S3

To demonstrate how these tools work together, this tutorial uses a scenario involving a hypothetical music streaming organization called PandaMusic. This organization wants to build a data processing pipeline that takes in audio files in WAV format and extracts features from them called mel spectrograms.

Mel spectrograms contain information about the track's amplitude at different frequencies, and they'll be used by other engineers to carry out downstream data analysis and modeling tasks.

System architecture

System architecture

Based on this scenario, you'll create a data processing pipeline comprising two applications. The first application has a Streamlit interface that ingests audio files and produces messages to a Redpanda topic. The second application consumes these messages with the help of Spark and produces another message to a different Redpanda topic. It will also save the generated features to an Amazon S3 bucket as an image.

Prerequisites

Before moving on, make sure you have the following applications and accounts set up:

Note: Hadoop is included here for a more stable Apache Spark environment. You can also install standalone Apache Spark for this tutorial.

Setting up

To get started, create a project directory and navigate to it in your terminal. Then, run the following commands to create and activate a virtual environment in Python:

python -m venv .venv
.venv/bin/activate

Note: If you use a Windows OS, run .venv\Scripts\activate.bat instead.

Run the command below to install the necessary Python libraries via pip:

pip install kafka-python boto3 librosa matplotlib streamlit python-dotenv pyspark==3.4.1

kafka-python and pyspark v3.4.1 will be used to interact with your Redpanda cluster. boto3, on the other hand, is the Python client for interacting with AWS services like S3. librosa and matplotlib will be used to process the audio file to generate a mel spectrogram. Finally, streamlit will be used to build a demo user interface that you can use to ingest the audio file.

Go to your AWS account and copy your AWS access key ID, secret access key, and default region name. Add these secrets to a .env file as environmental variables for accessing AWS services, as shown below:

AWS_DEFAULT_REGION=<aws_default_region>
AWS_SECRET_ACCESS_KEY=<aws_secret_access_key>
AWS_ACCESS_KEY_ID=<aws_access_key_id>

Navigate to your running Redpanda instance and open a shell in it by running the command below:

docker exec -it redpanda-0 /bin/bash

Note: Make sure you replace redpanda-0 with the name of your instance.

Next, run the command below to create two Redpanda topics named music_ingested and music_processed:

$ rpk topic create music_ingested music_processed

The output generated is shown below:

TOPIC            STATUS
music_ingested   OK
music_processed  OK

Ingesting audio

To ingest an audio file, you'll create an interface to upload the file, save it to a temporary directory, and publish a message to the music_ingested Redpanda topic. But first, you'll need to create a Python script named app.py in the project directory. Add the following code to import the required libraries:

import os
import streamlit as st
from kafka import KafkaProducer

This code snippet imports the os module, streamlit, and the KafkaProducer class from kafka. Next, add the code below to app.py to send a message via kafka-python:

def produce(file_path, topic):
    try:
        print("Creates a Producer instance and send message...", end="")
        producer = KafkaProducer(bootstrap_servers='127.0.0.1:19092')

        producer.send(topic, value=file_path.encode())

        # Ensure all messages have been sent
        producer.flush()
        print("Done.")
        return True
    except Exception as e:
        print(f"An error occurred: {e}")
        return False

This code block defines a Boolean function named produce, which takes in a file path and a Redpanda topic. Within it, you create a KafkaProducer instance by parsing in the link to your Redpanda server (update the code accordingly). Next, you pass the topic and the encoded file path to the producer.send method. This sends the file path as a message.

To ensure that the message is sent, you add the producer.flush() method, after which you return True. All of this is done in a try block to catch errors that may occur. If errors do occur, they are printed in the except block, in which case you return False.

Next, add the code block below to the app.py script to initialize your Streamlit app with a title and create a file uploader that expects only WAV audio files:

st.title('PandaMusic File Uploader')

uploaded_file = st.file_uploader("Choose an audio file (wav)", type=['wav'])

Finally, add the following code block to the app.py file to save the uploaded file and publish a message to Redpanda:

if uploaded_file is not None:
    try:
        dir_path = 'filestore'
        if not os.path.exists(dir_path):
            os.makedirs(dir_path)

        with open(os.path.join(dir_path, uploaded_file.name), 'wb') as f:
            f.write(uploaded_file.getbuffer())
        st.success("Saved File")
    except Exception as e:
        st.error(f"An error occurred: {e}")

    if produce(f"filestore/{uploaded_file.name}", topic='music_ingested'):
        st.success("Sent file path to Redpanda")
    else:
        st.error("Failed to send file path to Redpanda")

In the code block above, you save the uploaded audio file to a folder called filestore. If this folder does not exist, you can create it using the os.makedirs method. Then, you send the file path as a message by passing it to the produce function created earlier, alongside the Redpanda topic music_ingested.

When these operations are successful or unsuccessful, you call the st.success or st.error methods, respectively, to inform the user via the interface.

The contents of the app.py file should be as follows:

import os
import streamlit as st
from kafka import KafkaProducer

def produce(file_path, topic):
    try:
        print("Creates a Producer instance and send message...", end="")
        producer = KafkaProducer(bootstrap_servers='127.0.0.1:19092')#, api_version=(2, 0, 2))

        producer.send(topic, value=file_path.encode())

        # Ensure all messages have been sent
        producer.flush()
        print("Done.")
        return True
    except Exception as e:
        print(f"An error occurred: {e}")
        return False

st.title('PandaMusic File Uploader')

uploaded_file = st.file_uploader("Choose an audio file (wav)", type=['wav'])

if uploaded_file is not None:
    try:
        dir_path = 'filestore'
        if not os.path.exists(dir_path):
            os.makedirs(dir_path)

        with open(os.path.join(dir_path, uploaded_file.name), 'wb') as f:
            f.write(uploaded_file.getbuffer())
        st.success("Saved File")
    except Exception as e:
        st.error(f"An error occurred: {e}")

    if produce(f"filestore/{uploaded_file.name}", topic='music_ingested'):
        st.success("Sent file path to Redpanda")
    else:
        st.error("Failed to send file path to Redpanda")

With this done, you can start the Streamlit application by running the command below:

streamlit run app.py

This should open up a browser tab at http://localhost:8501:

Streamlit upload interface

Streamlit upload interface

Go back to your AWS console, search for "S3", and click the first option to open the S3 home page:

S3 home page

S3 home page

On this page, click the Create bucket button, enter the name of the bucket as shown below, scroll down, and click the final Create bucket button:

S3 create bucket

S3 create bucket

With the bucket now created, click the bucket name to see the empty S3 bucket:

S3 empty bucket

S3 empty bucket

Creating helper functions

With your S3 bucket set up, create a new Python script named helpers.py in the project directory to define some functions that will be used to create and run the consumer. To get started, add the following code block to the script:

import os
import boto3
from botocore.exceptions import NoCredentialsError
from kafka import KafkaProducer
import librosa
import librosa.display
import matplotlib.pyplot as plt
import numpy as np
from dotenv import load_dotenv

load_dotenv()

s3_client = boto3.client('s3')

In this code block, you import all the libraries and methods required for this script. Following this, you call the load_dotenv method to load the environmental variables in the .env file and create an instance of the boto3.client class, which uses the just-loaded variables.

Next, add the generate_spectrogram function, which takes in the audio file path to be read from and the image path to be written to:

def generate_spectrogram(file_path, image_path):
    y, sr = librosa.load(file_path, duration=10.0)
    spectrogram = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=128, fmax=8000)
    log_spectrogram = librosa.power_to_db(spectrogram, ref=np.max)
    plt.figure(figsize=(12,8))
    plt.axis('off')
    librosa.display.specshow(log_spectrogram, sr=sr, x_axis='time', y_axis='mel', fmax=8000)

    plt.savefig(image_path)
    plt.close()

In this code block, you use the librosa.load method to open the audio file and load the first ten seconds of it as a time series. This method returns both the audio time series y and the sampling rate sr. Then, you use the librosa.feature.melspectrogram and librosa.power_to_db methods to generate the mel spectrogram and convert it to decibels.

Following this, you create a Matplotlib figure, turn the axis off, and add the spectrogram to the figure using the librosa.display.specshow method. Finally, you save the figure to the image path and close the plot using the plt.close method.

In the same helpers.py script, add the function below to upload a file to S3 and clean up:

def upload_to_s3(image_path, bucket):
    object_name = image_path.split('/')[1]

    try:
        s3_client.upload_file(image_path, bucket, object_name)
    except FileNotFoundError:
        print("The file was not found")
        return False
    except NoCredentialsError:
        print("Credentials not available")
        return False
    finally:
        if os.path.isfile(image_path):
            os.remove(image_path[:-3]+'wav')
            os.remove(image_path)
    return object_name

In this code block, you define a function named upload_to_s3, which takes in an image path and a bucket name. You then create the object name by removing the folder name from the image path using the str.split method.

Then, you call the s3_client.upload_file function in a try block to upload the image at the specified path to the provided bucket with the specified object name. You also define two except blocks to handle possible errors and a finally block to delete both the image file and the audio file.

Following this, define a function to check if a file exists as an S3 object by adding this code to helpers.py:

def check_file(key, bucket='panda-spectrogram'):
    try:
        s3_client.head_object(Bucket=bucket, Key=key)
        return True
    except Exception as e:
        return False

In this function, you call the s3_client.head_object method to load metadata for an object in the bucket in a try block. If it runs, the object exists, and you return True. Otherwise, an exception is raised, and you return False. This function helps you avoid processing the same file multiple times.

The last function in this file is upload_and_produce, which takes in the path of the image to be uploaded, the Redpanda topic to be published to, and the S3 bucket name:

def upload_and_produce(image_path, topic, bucket):
    producer = KafkaProducer(
        bootstrap_servers='127.0.0.1:19092'
    )
    object_name = upload_to_s3(image_path, bucket)
    if isinstance(object_name, str):
        producer.send(topic, key=bucket.encode(), value=object_name.encode())
        producer.flush()

The code block above first defines a KafkaProducer instance that takes in your Redpanda server link (again, update the code accordingly). Then, it passes the image path and bucket name to the upload_to_s3 function, which returns the object name.

If the object name variable returned is a string, you encode it using the str.encode method and send it to the Redpanda topic using the producer.send method. Finally, the producer.flush method is called to ensure that the message is sent. That topic can then be consumed to carry out another downstream task on the mel spectrogram image.

Creating the consumer

You'll now create a file named consumer.py to define the consumer. Enter the following lines of code to import the required packages and create a Spark session:

from pyspark.sql import SparkSession
from helpers import *

spark = SparkSession.builder \
    .appName("Redpanda Consumer") \
    .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1") \
    .getOrCreate()

This code block imports the SparkSession class from pyspark.sql and the methods created in helpers.py. Then, an instance of the SparkSession is created. The app is named Redpanda Consumer, and the spark.jars.packages configuration is set to org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1. The end of this string contains the Spark version, which is set to 3.4.1. This is important because this configuration instructs Spark to download some of the packages it needs to run.

Next, you define the consume function, which takes in the Redpanda topic to be consumed:'

def consume(topic):
    redpanda_brokers = '127.0.0.1:19092'

    df = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", redpanda_brokers) \
        .option("subscribe", topic) \
        .load()

    df = df.selectExpr("CAST(value AS STRING)")

    single_row = df.select("value").collect()[-1]
    file_path = single_row.value
    image_path = file_path.split(".")[0] + '.png'
    print(image_path.split('/')[1])
    if not check_file(image_path.split('/')[1]):
        generate_spectrogram(file_path, image_path)

        return image_path

In this function, you define your Redpanda server as a Python variable. Then, you call the spark.read method, define the format as kafka, set the kafka.bootstrap.servers and the subscribe options to your Redpanda server and the provided Redpanda topic, and call the .load method. This creates a PySpark DataFrame that contains the messages sent to that topic in the column value. To access the most recent message, you first cast the column to a string data type using the df.selectExpr method. Then, you select the value column from the DataFrame and take the last element.

The value from the column, which is the audio file path, creates the matching image file path by changing the extension from .wav to .png. You then use this image file path to check if the image has been saved to Amazon S3. If it has not, you call the generate_spectrogram method and pass the audio file path and the image path to it. This generates the image and returns the image path.

Creating a streaming application

To run the consumer and produce a new message, create a file named streaming.py in the project directory and add the following code to import the consumer file and set the Spark log level to ERROR, making the terminal output more readable:

from consumer import spark, consume, upload_and_produce

spark.sparkContext.setLogLevel("ERROR")

while True:
    bucket = "panda-spectrogram"
    topic = "music_ingested"
    image_path = consume(topic=topic)

    if image_path is not None:
        topic = "music_processed"
        upload_and_produce(
            image_path=image_path, topic=topic, bucket=bucket
        )
    else:
        print("File already exists.")
        break

The code defines a while True loop. In the loop, you define your S3 bucket as bucket and the first Redpanda topic, music_ingested, as topic. This topic is passed to the consume method in consumer.py, which returns the image path.

If the image has not been processed before, the image_path variable will not be None. When this is the case, you run the upload_and_produce method from the helpers.py file. When it is not, you print a message saying that the file exists and terminate the loop.

This works for demonstration purposes, but ideally, you may want to design your consumer differently and terminate this loop under different conditions.

Running the streaming application

To run the application you built, run the streaming.py file by running the python streaming.py command.

Ideally, it should create an object in your S3 bucket, as seen in the image below. The loop also runs once more, but if you don't upload another file before then, it recognizes the just-uploaded file and terminates itself.

Mel spectrogram in S3 bucket

Mel spectrogram in S3 bucket

Download this image and view its result, which should be similar to the image below:

Mel spectrogram

Mel spectrogram

Conclusion

In this post, you learned about data processing, its applications, and the different tools that can be used to implement it, like Amazon S3, Spark, and Redpanda. You also gained practical knowledge on how to use these tools together by building a small-scale music streaming pipeline. Remember, you can find the source code for this tutorial in this GitHub repository.

To learn more, browse the Redpanda blog for tutorials and dive into the free courses at Redpanda University. If have any questions or just want to chat with the team and fellow Redpanda users, join the Redpanda Community on Slack.

Graphic for downloading streaming data report
Save Your Spot

Related articles

VIEW ALL POSTS
8 business benefits of real-time analytics
Redpanda
&
&
&
October 22, 2024
Text Link
Vector databases vs. knowledge graphs for streaming data applications
Fortune Adekogbe
&
&
&
October 15, 2024
Text Link
Real-time data streaming: What it is and how it works
Redpanda
&
&
&
October 8, 2024
Text Link