Bouncing Panda
Loading...
AI Spotlight Result:

AI Spotlight: Toggle on and select any text on this page to learn more about it.

Build a data pipeline for supply chain management with Redshift

Pair Redpanda with Amazon Redshift for scalable data storage, reporting, and analysis of supply chain data

By
on
November 12, 2024

Amazon Redshift is a data warehouse service that performs data analysis by using standard SQL and business intelligence tools. Redshift is used in many different fields. For example, Redshift can manage and analyze high volumes of transactional data to optimize inventory levels and predict supply needs in supply chain management. This can help you react more dynamically to challenges in the supply chain, among other things.

In real-time analytics, Redpanda is a proven Apache Kafka® alternative that can handle high-throughput data to provide insights as new events occur. This is particularly handy in finance, gaming, and online retail, where immediate data insights can supercharge business decisions.

Additionally, in business intelligence, companies can use Redshift to combine data from different sources into a centralized repository, which can be used for comprehensive reporting, trend analysis, and strategic planning. Lastly, Redshift can also be used for log analysis, as it offers the required computational power to parse and analyze large sets of log data to identify patterns and troubleshoot problems.

In this tutorial, you'll learn how to integrate Redpanda with Amazon Redshift for scalable data storage, reporting, and analysis of supply chain data.

What is Redshift, and how can it be used with Redpanda?

With Redshift, Amazon addressed a demand for large-scale data processing and warehousing. The service is a fully managed, cloud-based data warehouse service built to handle large amounts of data and complex queries. Redshift allows organizations to run high-performance databases and query large data sets quickly.

Redshift is especially useful in supply chain management as it can analyze large volumes of different data from every stage of the supply chain. This provides insights into the supplier's performance, inventory levels, and customer demands. Supply chain organizations can optimize their operations, reduce costs, and enhance service delivery with this information.

Integrating Redpanda with Amazon Redshift can further enhance supply chain management by enabling real-time data streaming capabilities. In particular, Redpanda facilitates the immediate ingestion of live data from various supply chain touchpoints, such as sensors on production equipment, RFID tags on inventory items, GPS trackers on delivery vehicles, and point-of-sale systems in retail locations. This data can then be streamed through Redpanda and immediately ingested into Redshift. This setup allows supply chain organizations to perform live data analysis and identify trends and issues as they occur or before, preemptively addressing potential disruptions before they can impact the supply chain.

To illustrate, imagine a scenario where a sudden spike in demand for a popular product is detected across various retail locations. With Redpanda, this sales data is immediately streamed in real time to Amazon Redshift, and the supply chain management system can analyze these trends in real time and adjust production schedules and distribution plans accordingly. Another example scenario could involve logistics and delivery. If GPS data streamed with Redpanda to Redshift shows that certain delivery trucks are delayed due to unexpected road conditions, Redshift can analyze this information alongside weather data and traffic reports. The system could then reroute other vehicles to avoid the same scenario or adjust delivery schedules to maintain service levels.

Using Redshift and Redpanda to build a supply chain management data pipeline

Imagine you're a data engineer at a large manufacturing and distribution company that faces daily challenges due to delivery delays, inconsistent supplier reliability, and inventory mismanagement.

These issues undermine the competitiveness of the company and customer satisfaction. To address these problems, the company asks you to implement a real-time data pipeline that uses Amazon Redshift and Redpanda. This new system should bring more visibility and responsiveness to the company's operations. As part of the setup, supplier locations and production lines are equipped with IoT sensors to monitor the delivery of raw materials and track operational efficiency in real time. Logistics vehicles are outfitted with GPS systems and environmental sensors to continuously update their status and conditions.

As this real-time data streams in, Redpanda should process and forward this data to Amazon Redshift. Here, sophisticated analytics can be performed to gain insights from the data and to anticipate and address problems before they escalate.

Architecture diagram

Prerequisites

To follow along, you'll need:

  • A Python virtual environment created and activated. All Python-related commands should be run in this environment.
  • Python 3.11 or higher. The code is written in Python, so you need to have Python installed on your system.
  • A recent version of Docker installed on your machine (this tutorial uses Docker 24.0.6).
  • An AWS account for Redshift. To set up an AWS account, feel free to follow these instructions.
  • A project folder with a suitable structure. You should set up a folder structure to organize your configuration files and plugins that are required to integrate Redpanda with Redshift. This example uses the folder structure below, which is recommended if you want to follow the tutorial:
redpanda_redshift
├── src
├── configuration
├── plugins
├── resources

Step 1. Run a Redpanda cluster

In this tutorial, you'll run a Redpanda cluster using Docker Compose. For this, download a docker-compose.yml file and save it locally in the resources folder of the project directory.

Open a terminal in the resources folder and run the following command:

docker compose up -d

You should get the following output in the terminal if the Redpanda cluster started successfully:

[+] Running 3/3
 ✔ Network redpanda-quickstart-one-broker_redpanda_network  Created                                	0.1s
 ✔ Container redpanda-0                                 	Started                                	0.6s
 ✔ Container redpanda-console                           	Started 

Step 2. Create Redpanda topics

You can create topics in Redpanda using the rpk command line tool, which you can use inside a running Redpanda container.

Open a terminal and execute the following command to create a topic called supply_chain_data inside the running Redpanda container redpanda-0:

docker exec -it redpanda-0 \
rpk topic create supply_chain_data

You should see the output below in the console, which indicates that you successfully created a topic:

TOPIC              STATUS
supply_chain_data  OK

Step 3. Generate the data set

You'll now generate a data set for the supply chain management use case. The example data set represents typical information streams you might find in supply chain management. This includes data from IoT sensors at supplier locations, production lines, and logistics vehicles. Create a file called generate_data.py in the src folder and paste the following code into it:

import csv
import random
from datetime import datetime

# Define file name
file_name = 'supply_chain_data.csv'

# Define headers for different data types
headers = {
    'delivery': ['supplier_id', 'delivery_timestamp', 'material_delivered', 'quantity', 'delivery_status'],
    'production': ['production_line_id', 'production_timestamp', 'operation_efficiency', 'downtime_minutes'],
    'vehicle': ['vehicle_id', 'vehicle_timestamp', 'latitude', 'longitude', 'environmental_conditions']
}

def delivery_data():
    return [
        random.randint(1000, 9999),
        datetime.now().isoformat(),
        random.choice(['steel', 'plastic', 'rubber', 'aluminum']),
        random.randint(100, 1000),
        random.choice(['on_time', 'delayed', 'advanced'])
    ]

def production_line_data():
    return [
        random.randint(1, 10),
        datetime.now().isoformat(),
        random.uniform(0.7, 1.0),  # 70% to 100% efficiency
        random.randint(0, 120)
    ]

def vehicle_gps_data():
    return [
        random.randint(100, 500),
        datetime.now().isoformat(),
        round(random.uniform(-90, 90), 6),
        round(random.uniform(-180, 180), 6),
        random.choice(['clear', 'rainy', 'snowy', 'foggy'])
    ]

def write_data_to_csv():
    with open(file_name, mode='w', newline='') as file:
        writer = csv.writer(file)
        # Write headers
        writer.writerow(headers['delivery'] + headers['production'] + headers['vehicle'])

        for _ in range(1000):
            # Combine data from all functions
            data = delivery_data() + production_line_data() + vehicle_gps_data()
            writer.writerow(data)

if __name__ == "__main__":
    write_data_to_csv()

This Python script generates simulated data for a supply chain management system that focuses on three main areas: deliveries, production lines, and vehicle GPS tracking. It writes 1,000 rows of data into a CSV file called supply_chain_data.csv. The data for the deliveries include supplier_id, delivery_timestamp, material_delivered, quantity, and delivery_status, which represent details like the identity of the supplier, time of delivery, type of materials delivered, the quantity of materials, and the delivery timing status. For production lines, the data is production_line_id, production_timestamp, operation_efficiency, and downtime_minutes, detailing the specific production line, operational efficiency as a percentage, and any downtime in minutes. The vehicle data includes vehicle_id, vehicle_timestamp, latitude, longitude, and environmental_conditions. This captures the vehicle identifier, geolocation data, and the prevailing weather conditions. Each row combines randomly generated data from these areas, which simulates real-time inputs that might be captured from IoT sensors in a supply chain environment.

To generate the data, navigate to the folder where the script is saved, open a terminal, and execute the following code in the virtual Python environment created and activated beforehand:

python generate_data.py

After the data set is generated, the first rows in the generated supply_chain_data.csv file should look similar to this:

supplier_id,delivery_timestamp,material_delivered,quantity,delivery_status,production_line_id,production_timestamp,operation_efficiency,downtime_minutes,vehicle_id,vehicle_timestamp,latitude,longitude,environmental_conditions
6020,2024-04-15T10:50:15.793909,aluminum,933,advanced,1,2024-04-15T10:50:15.793937,0.8032760248939593,112,124,2024-04-15T10:50:15.793945,7.890849,-86.368215,rainy

Step 4. Produce data to the topic

You'll now produce the data from the created data set (supply_chain_data.csv) to the Redpanda topic (supply_chain_data). Before that, make sure to install the Confluent Kafka Python client by opening a terminal in your project's Python virtual environment and executing the following command:

pip install kafka-python-ng 

After installing the client, create a file called produce_data.py in the redpanda_redshift/src directory and paste the code below into the file:

import csv
import json
from kafka import KafkaProducer

# Kafka producer configuration
config = {
    'bootstrap_servers': ['localhost:19092'],  # Update with your Redpanda server address
    'client_id': 'supply-chain-producer'
}

# Initialize KafkaProducer using kafka-python-ng
producer = KafkaProducer(
    bootstrap_servers=config['bootstrap_servers'],
    client_id=config['client_id'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Serialize the messages as JSON
)

topic_name = 'supply_chain_data'  # Update with your topic name

# Define your schema for the supply chain data
schema = {
    "type": "struct",
    "fields": [
        {"field": "supplier_id", "type": "string"},
        {"field": "delivery_timestamp", "type": "string"},
        {"field": "production_timestamp", "type": "string"},
        {"field": "vehicle_timestamp", "type": "string"},
        {"field": "material_delivered", "type": "string"},
        {"field": "quantity", "type": "string"},
        {"field": "delivery_status", "type": "string"},
        {"field": "production_line_id", "type": "string"},
        {"field": "operation_efficiency", "type": "string"},
        {"field": "downtime_minutes", "type": "string"},
        {"field": "vehicle_id", "type": "string"},
        {"field": "latitude", "type": "string"},
        {"field": "longitude", "type": "string"},
        {"field": "environmental_conditions", "type": "string"}
    ]
}

def produce_message(record):
    message = {
        "schema": schema,
        "payload": record
    }
    # Send the message to Kafka
    producer.send(topic_name, value=message)
    producer.flush()

def produce_from_csv(file_path):
    with open(file_path, newline='') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            produce_message(row)

csv_file_path = 'supply_chain_data.csv'  # Update this with the path to your CSV file
produce_from_csv(csv_file_path)

The script defines a schema that corresponds to the columns in the supply chain data CSV file. This schema ensures that each message sent to the topic retains a consistent structure.

The script then reads the CSV file using Python's csv.DictReader, which converts each row in the CSV file into a dictionary. The produce_message function takes these dictionaries and wraps them into a message format that includes both the schema and the payload (data from the CSV file). Each message is then produced to the specified Redpanda topic using the Kafka producer class.

After constructing the message, the script sends it to Redpanda with a call to the produce method of the producer object. This process is repeated for each row in the CSV file. As a result, the static file data is transformed into a stream of messages that can be dynamically processed by systems subscribed to the Redpanda topic.

To run the script, open a terminal in the redpanda_redshift/src directory and execute the following command:

python produce_data.py

To validate that the data was produced successfully to the topic, you can execute the following command in the terminal:

docker exec -it redpanda-0 \
rpk topic consume supply_chain_data --num 1

The output should show the first data entry that was produced to the topic, which looks something like this:

{
  "topic": "supply_chain_data",
  "value": "{\"schema\": {\"type\": \"struct\", \"fields\": [{\"field\": \"supplier_id\", \"type\": \"string\"}, {\"field\": \"delivery_timestamp\", \"type\": \"string\"}, {\"field\": \"production_timestamp\", \"type\": \"string\"}, {\"field\": \"vehicle_timestamp\", \"type\": \"string\"}, {\"field\": \"material_delivered\", \"type\": \"string\"}, {\"field\": \"quantity\", \"type\": \"string\"}, {\"field\": \"delivery_status\", \"type\": \"string\"}, {\"field\": \"production_line_id\", \"type\": \"string\"}, {\"field\": \"operation_efficiency\", \"type\": \"string\"}, {\"field\": \"downtime_minutes\", \"type\": \"string\"}, {\"field\": \"vehicle_id\", \"type\": \"string\"}, {\"field\": \"latitude\", \"type\": \"string\"}, {\"field\": \"longitude\", \"type\": \"string\"}, {\"field\": \"environmental_conditions\", \"type\": \"string\"}]}, \"payload\": {\"supplier_id\": \"7093\", \"delivery_timestamp\": \"2024-04-15T10:50:15.812918\", \"material_delivered\": \"aluminum\", \"quantity\": \"975\", \"delivery_status\": \"on_time\", \"production_line_id\": \"6\", \"production_timestamp\": \"2024-04-15T10:50:15.812921\", \"operation_efficiency\": \"0.942470385781474\", \"downtime_minutes\": \"102\", \"vehicle_id\": \"194\", \"vehicle_timestamp\": \"2024-04-15T10:50:15.812924\", \"latitude\": \"0.027321\", \"longitude\": \"138.870067\", \"environmental_conditions\": \"rainy\"}}",
  "timestamp": 1713171291237,
  "partition": 0,
  "offset": 1
}

Step 5. Set up Redpanda for Redshift connection

In the following sections, you'll learn how to integrate Redpanda with Amazon Redshift. This integration involves Kafka Connect, a component of the Apache Kafka ecosystem that connects Kafka-compatible systems, such as Redpanda, to different databases and cloud services. The process involves two main steps. First, you configure Kafka Connect to interface with Redpanda, and then you set up a sink connector for Amazon Redshift. For the latter, the JDBC sink connector will be implemented to enable this connection.

Setting up the Kafka Connect cluster

Download the Apache Kafka Connect package and store the binary in your redpanda_redshift directory.

Next, configure Kafka Connect by creating a configuration file called connect.properties and saving it in the directory redpanda_redshift/configuration, then pasting the following into the file:

# Kafka broker addresses
bootstrap.servers=localhost:19092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# Where to keep the Connect topic offset configurations
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

# Plugin path to put the connector binaries
plugin.path=../plugins/

In this file, you must assign the bootstrap.servers parameter to the Redpanda cluster address (in this case, localhost:19092). This links Kafka Connect to the Redpanda cluster. You also need to make sure that plugin.path is directed to the path where the connector binaries will be saved (in this case, redpanda_redshift/plugins).

Your redpanda_redshift directory should now reflect the following structure:

redpanda_redshift
├── src
    ├── generate_data.py
    ├── produce_data.py
├── configuration
    ├── connect.properties
├── plugins
├── resources
└── kafka_2.13-3.1.0

Configuring the JDBC sink connector

Next, download the JDBC sink connector. Unzip the downloaded files into the redpanda_redshift/plugins/confluent-kafka-connect directory, then configure the sink connector to connect with Amazon Redshift. For this, you must create a configuration file called redshift-sink-connector.properties and save the file in the directory redpanda_redshift/configuration. Paste the following configuration into the file:

name=redshift-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=supply_chain_data

connection.url=<YOUR AMAZON REDSHIFT DATABASE URL>
connection.user=<YOUR AMAZON REDSHIFT USERNAME>
connection.password=<YOUR AMAZON REDSHIFT PASSWORD>
# Insert mode configurations
insert.mode=insert

Make sure to add some inputs according to your project configuration:

  • name: The name that you want to call the JDBC sink connector
  • topics: The name of the Redpanda topic (in this tutorial, it's supply_chain_data)
  • connection.url: The unique URL of your database that you obtain from the Redshift console
  • connection.user: Your unique database username that you obtain from the Redshift console
  • connection.password: The password that you have set for your Redshift cluster

To send data from a Redpanda topic to Amazon Redshift using the JDBC sink connector, you need to install a Redshift JDBC driver. The JDBC driver is required for the JDBC sink connector in Kafka Connect because it enables the connector to communicate with the database. The driver is a translator that converts Kafka Connect calls into database-specific operations needed to insert, update, or query the data in Redshift. The driver can be downloaded directly from AWS. Unzip the downloaded files into the directory redpanda_redshift/plugins/confluent-kafka-connect and ensure that the JDBC driver is in the same directory as the JDBC sink connector.

Your redpanda_redshift directory should now reflect the following structure:

redpanda_redshift
├── src
    ├── generate_data.py
    ├── produce_data.py
├── configuration
    ├── connect.properties
    ├── redshift-sink-connector.properties
├── plugins
    ├── confluent-kafka-connect
    ├── [JDBC sink connector binaries]
    ├── [JDBC driver binaries]
├── resources
└── kafka_2.13-3.1.0

Starting the Kafka Connect cluster

You can now set up the Kafka Connect connectors to establish a connection between Redpanda and the Amazon Redshift database.

Navigate to the redpanda_redshift/configuration directory, open a terminal, and run the command below:

../kafka_2.13-3.1.0/bin/connect-standalone.sh \
connect.properties \
redshift-sink-connector.properties

This command activates the connector configured in the connect.properties and redshift-sink-connector.properties files that establishes a data pipeline between Redpanda and Redshift. Once activated, the connector begins the transfer of all data from the Redpanda topic supply_chain_data to a specified database within an Amazon Redshift cluster. The connector maintains real-time synchronization between the Redpanda topic and the Redshift database. As soon as new data enters the supply_chain_data topic, it's directly sent to the Redshift cluster. This makes sure that the data in Redshift is always up to date with the data produced to the Redpanda topic.

The data in Amazon Redshift can be examined in the Redshift console in the section labeled query editor v2:

The data in Amazon Redshift from the Redpanda topic

Step 6. Query the data on Redshift

Once the data has been injected from the Redpanda topic into Amazon Redshift, you can start analyzing this data to get some insights. In particular, you can use SQL queries to analyze the data. To query the data, navigate to the query editor v2 in the Amazon Redshift console and enter the SQL query below:

WITH WeatherConditions AS (
    SELECT
        vehicle_id,
        DATE(vehicle_timestamp) AS date,
        environmental_conditions
    FROM
        supply_chain_data
    GROUP BY
        vehicle_id, date, environmental_conditions
),
ProductionData AS (
    SELECT
        production_line_id,
        DATE(production_timestamp) AS date,
        AVG(operation_efficiency) AS average_efficiency
    FROM
        supply_chain_data
    GROUP BY
        production_line_id, date
),
DeliveryData AS (
    SELECT
        supplier_id,
        material_delivered,
        DATE(delivery_timestamp) AS date,
        COUNT(*) AS total_deliveries,
        COUNT(CASE WHEN delivery_status = 'delayed' THEN 1 ELSE NULL END) AS delayed_deliveries
    FROM
        supply_chain_data
    GROUP BY
        supplier_id, material_delivered, date
),
CombinedAnalysis AS (
    SELECT
        wc.date,
        wc.environmental_conditions,
        COALESCE(pd.average_efficiency, 0) AS production_efficiency,
        dd.material_delivered,
        COALESCE(dd.total_deliveries, 0) AS total_deliveries,
        COALESCE(dd.delayed_deliveries, 0) AS delayed_deliveries
    FROM
        WeatherConditions wc
    LEFT JOIN ProductionData pd ON wc.date = pd.date
    LEFT JOIN DeliveryData dd ON wc.date = dd.date
)
SELECT
    environmental_conditions,
    material_delivered,
    SUM(total_deliveries) AS sum_total_deliveries,
    SUM(delayed_deliveries) AS sum_delayed_deliveries,
    CAST(SUM(delayed_deliveries) AS DECIMAL) / NULLIF(SUM(total_deliveries), 0) AS percentage_delayed_deliveries
FROM
    CombinedAnalysis
GROUP BY
    environmental_conditions, material_delivered
ORDER BY
    environmental_conditions, material_delivered;

This SQL query analyzes the impact of environmental conditions on production efficiency and delivery performance within a supply chain, broken down by material type. It pulls data from a unified data set that includes vehicle GPS logs, production metrics, and delivery records, distinguishing each by environmental conditions recorded on specific dates. The query calculates the number of delayed deliveries for each type of material, all in relation to varying weather conditions. It then outputs the total deliveries, delayed deliveries, and the percentage of delayed deliveries for each material under each environmental condition. This provides insights into how different weather scenarios might influence supply chain efficiency and reliability.

Once you have entered the SQL query into the Redshift console, click the Run button to execute it. The results will then be displayed in the Result tab:

The result of the query

Conclusion

This tutorial explained how to implement a data pipeline for supply chain management using Amazon Redshift and Redpanda. This combination supports real-time data ingestion from different data sources (like IoT devices, sensors, and GPS trackers) and allows you to perform sophisticated analytics on Amazon Redshift. As a result, you can efficiently handle large data sets, react to supply chain dynamics, and enhance decision-making processes based on real-time data.

For hands-on experience and access to all the code samples in this tutorial, visit this GitHub repository. This will allow you to replicate and modify the setup as needed.

To learn more, browse the free courses at Redpanda University. If you have questions or want to chat with the team, join the Redpanda Community on Slack.

Can I use something other than YAML?
Does it use an S3 bucket for cross-zone replication?
Does Serverless feature pre-built data connectors?
Does Redpanda Serverless support Schema Registry?

Related articles

VIEW ALL POSTS
Real-time graph analytics with Redpanda Iceberg Topics and PuppyGraph
Danfeng Xu
&
Sa Wang
&
&
March 4, 2025
Text Link
High availability deployment: Multi-region stretch clusters in Redpanda
Kavya Shivashankar
&
&
&
February 11, 2025
Text Link
Real-time product recommendation AI inferencing
Tyler Rockwood
&
&
&
January 14, 2025
Text Link