Learn how to build a CDC pipeline with Redpanda that streams operational data from PostgreSQL to DuckDB for OLAP analytics.

ByDaniel PalmaonDecember 20, 2022
Data Lakehouse at home with Redpanda and DuckDB

Introduction

Having to load data from an operational database into a data warehouse/lake to prepare for analytical workloads is a common use case for data-driven organizations. Change data capture (CDC) is a pattern that enables this use-case by replicating the changes in the operational database into the warehouse in near real-time (or at least with low latency).

In this blog post, we will mimic a basic setup of CDC-ing an operational database into a data warehouse. We are going to showcase how simple it is to set up a streaming CDC pipeline using Redpanda & Debezium and how quickly you can get started with querying the data in the data lake using DuckDB.

Debezium is a fault tolerant, open-source, distributed platform for change data capture. Simply attaching it to a source database with some configuration will allow us to quickly start ingestion change events.

DuckDB is an in-process OLAP database management system. Think of it like a data warehouse that you can start on your machine just by running one process.

As usual, the code for this blog post is available on Github.

The use-case

Let's say we have a backend for an app with two tables: USER and USER_PAYMENT. These tables live in a PostgreSQL database and get updated frequently. The analytics team wants to use user & payment information to generate reports and the data science team would like to train their models on the data as soon as it's available.

Gone are the days when you would have to write a batch job to extract the data from the database and load it into a data warehouse (or lake) that is being managed by a separate team.

To enable both teams to work with the data we need to replicate the data from the operational database into a data lake where it can be queried and analyzed. We will use Redpanda as our streaming data platform and Debezium to replicate the change events from PostgreSQL into MinIO in parquet format. We will then query the data in the data lake using DuckDB, an in-memory SQL OLAP database.

MinIO is an open-source object storage that is S3 API-compatible. This compatibility makes it perfect to quickly spin up proof of concept projects which later can be easily moved to S3, if needed. For our use-case we will use MinIO as a data lake, to store our change events in parquet format.

If that was a bit too abstract, don't worry we'll go step-by-step! Let's dive in.

Humble beginnings

We will start by setting up a PostgreSQL database with our two source tables: USER and PAYMENT. We will then use a small python script to generate some data and insert it into the database. Everything will run in Docker containers, so you don't have to worry about setting up the database or a local development environment.

Source database schema

Our USER table will have the following schema:

CREATE TABLE users ( id VARCHAR(255) PRIMARY KEY, name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL, address VARCHAR(255) NOT NULL );

Our USER_PAYMENT table will have the following schema:

CREATE TABLE user_payments ( id VARCHAR(255) NOT NULL, user_id VARCHAR(255) NOT NULL, amount INT NOT NULL );

The postgres/init.sql file contains the SQL statements to create the tables and will run on database startup, so we don't have to do anything manually.

The relevant part in the docker-compose.yml for the database looks like this:

postgres: image: postgres:13 container_name: postgres hostname: postgres environment: - POSTGRES_USER=postgres - POSTGRES_DB=postgres - POSTGRES_PASSWORD=postgres ports: - "5432:5432" volumes: - ./postgres/init.sql:/docker-entrypoint-initdb.d/init.sql - postgres:/var/lib/postgresql/data command: - "postgres" - "-c" - "wal_level=logical"

We are mounting the init.sql file into the container so that the tables are created on startup. We are also setting the wal_level to logical so that we can use PostgreSQL's logical replication feature. This is the flag that allows Debezium to read the database's transaction log and generate change events.

If you would like to dive deeper into CDC with Debezium, check out this great article by Almas Maksotov.

Generating some data

We will use a small python script to generate some data and insert it into the database. The script will generate a new user and a random number of payments for that user every few seconds. The script will run in a Docker container, and we will use the same docker-compose.yml file to start the container.

The script looks like this:

while True: user_id = str(uuid.uuid4()) cur.execute( f"INSERT INTO public.user (id, name, email, address) VALUES ('{user_id}','{fake.name()}','{fake.email()}','{fake.address()}');" ) for i in range(random.randint(5, 15)): cur.execute( f"INSERT INTO public.payment (id, user_id, amount) VALUES ('{uuid.uuid4().hex}','{user_id}','{random.randint(1, 100)}');" ) sleep(1) conn.commit() sleep(2)

There are a few sleeps in there to make sure that we generate some data at a reasonable rate, and it's easier to follow when developing.

Add the following to the docker-compose.yml file (the Dockerfile is included in the repository!):

datagen: build: ./datagen container_name: datagen

Now we can start our database and the data generator:

docker-compose up -d postgres datagen

You should see some data being generated in the database:

psql -h localhost -U postgres -d postgres postgres=# select * from public.user limit 3; id | name | email | address --------------------------------------+-------------------+-------------------------+-------------------------------- 53938687-affa-45b8-a22c-3c4ecf2069b4 | Michael Rodriguez | qsmith@example.org | 5479 Coleman Glens Suite 836 + | | | Annatown, OH 35759 aaf3a630-eae7-4558-9857-2640c89b172c | Madison Schultz | shelbybaker@example.net | 3034 Michael Streets Suite 452+ | | | East Shawnburgh, PW 56434 914a3215-7357-4d62-ba8f-f1c4fdb6743b | Thomas Smith | melissa85@example.com | 5835 Guzman Lights Suite 954 + | | | Brianbury, TX 18767 (3 rows)

Cool, let's move on to the next step.

Setting up Redpanda

Redpanda will be our streaming data platform of choice. Because it's Kafka API-compatible, we can seamlessly integrate it with Debezium and start replicating our data. Redpanda is the perfect choice to store database change events because it's a log-based data store, and the incoming records only have to be appended to the end of the log, as if a record in the source database gets deleted or modified, we will just get a new record with the updated state and the operation type that triggered the event.

We will deploy three services in one go; Redpanda, Redpanda Console, and Debezium. The docker-compose.yml file looks like this:

redpanda: image: docker.vectorized.io/vectorized/redpanda:latest container_name: redpanda command: - redpanda start - --overprovisioned - --smp 1 - --memory 1G - --reserve-memory 0M - --node-id 0 - --check=false - --kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 - --advertise-kafka-addr PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092 - --pandaproxy-addr 0.0.0.0:8082 - --advertise-pandaproxy-addr redpanda:8082 - --set redpanda.enable_transactions=true - --set redpanda.enable_idempotence=true - --set redpanda.auto_create_topics_enabled=true ports: - "9092:9092" - "8081:8081" - "8082:8082" - "9644:9644" redpanda-console: container_name: redpanda-console image: docker.redpanda.com/vectorized/console:latest entrypoint: /bin/sh command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console" environment: CONFIG_FILEPATH: /tmp/config.yml CONSOLE_CONFIG_FILE: | kafka: brokers: ["redpanda:29092"] schemaRegistry: enabled: true urls: ["http://redpanda:8081"] redpanda: adminApi: enabled: true urls: ["http://redpanda:9644"] connect: enabled: true clusters: - name: local-connect-cluster url: http://connect:8083 ports: - "8080:8080" depends_on: - redpanda connect: container_name: connect build: context: ./connect dockerfile: Dockerfile depends_on: - redpanda ports: - "8083:8083" environment: BOOTSTRAP_SERVERS: "redpanda:29092" GROUP_ID: "1" CONFIG_STORAGE_TOPIC: "users.configs" OFFSET_STORAGE_TOPIC: "users.offset" STATUS_STORAGE_TOPIC: "users.status" KEY_CONVERTER: io.confluent.connect.avro.AvroConverter VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://redpanda:8081 CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://redpanda:8081 volumes: - ./connect:/connectors

We will only use the Console to verify that our connectors are running properly. The configuration for our Redpanda and Redpanda Console service is pretty straightforward. For the Debezium service (the connect service in the docker-compose.yml) we are setting 4 important environment variables:

KEY_CONVERTER: io.confluent.connect.avro.AvroConverter VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://redpanda:8081 CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://redpanda:8081

These parameters tell Debezium to serialize the data (both record key and message) in Avro format. We will use the Redpanda Schema Registry to store the schemas, which is available from the same Redpanda service as the broker, on port 8081. This will be very helpful when we are going to save our data on the other end in parquet format, which requires a schema definition.

The connect service also has a volume attached to it, which is used to store the connector configuration. We will use this to store the configurations for our Debezium source connector and our S3 sink connector. Because the folder is mapped as a volume, we can easily add/modify configs without having to restart the service.

Now that the foundations are in place, we can start setting up Debezium. But first, we have to start our new services:

docker-compose up -d redpanda redpanda-console connect

This is a great opportunity to explore the Redpanda Console. It's a web-based UI that allows you to monitor your Redpanda cluster and Kafka Connect cluster. You can find it at http://localhost:8080.

Setting up Debezium

After waiting for a few seconds so every service has time to initialize, we can exec into the connect container and create our new connectors:

docker exec -it connect /bin/bash curl -X PUT -H "Content-Type:application/json" localhost:8083/connectors/pg-src/config -d '@/connectors/pg-src.json' curl -X PUT -H "Content-Type:application/json" localhost:8083/connectors/s3-sink/config -d '@/connectors/s3-sink.json'

The source connector configuration looks like this:

{ "name": "pg-src", "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.server.name": "postgres-docker", "database.dbname": "postgres", "database.include.list": "postgres", "topic.prefix": "debezium", "schema.include.list": "public", "plugin.name": "pgoutput", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://redpanda:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://redpanda:8081" }

and the sink connector like this:

{ "name": "s3-sink", "connector.class": "io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector", "aws.access.key.id": "minio", "aws.s3.bucket.name": "user-payments", "aws.s3.endpoint": "http://minio:9000", "aws.s3.region": "us-east-1", "aws.secret.access.key": "minio123", "format.output.type": "parquet", "topics": "debezium.public.user,debezium.public.payment", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://redpanda:8081", "file.compression.type": "none", "format.output.fields": "key,value,offset,timestamp" }

The source connector is configured to connect to the postgres service, which is running in the same docker network, so we can use the service name as the hostname. The source connector is configured to use Avro for both the key and the value of the records.

The sink connector is configured to use the AivenKafkaConnectS3SinkConnector class, which is provided by Aiven. This is a custom connector that we have to manually install in our Kafka Connect service. Don't worry, the provided Dockerfile takes care of this for us. The connector is configured to use the minio service as the S3 endpoint, and the user-payments bucket to store the data. The key and value converters are configured to use the same Avro converters as the source connector. We also specify that we want to store the data in parquet format, so we can ingest the data into DuckDB without any trouble.

If the connector creation is successful the API will just return the configuration for both. Also, to verify let's head over to the Redpanda Console (available at localhost:8080) and check if the connectors are running:

Redpanda console connectors

Looks like both of our connectors are in the RUNNING state, which means that they are ready to replicate our data.

Quick peek at the topics page, and we can quickly make sure data is flowing into Redpanda properly:

Redpanda console topics

Now the last service we haven't started yet is our data lake, MinIO. MinIO is an open source S3 compatible object storage service. We will use it to store our data in parquet format. We can start it with the following command:

docker-compose up -d minio mc

The mc service is a small container that we use to automatically create the user-payments bucket in MinIO and set its permissions to public. This is just for convenience, so we can easily access the data from our local machine.

After a few seconds, we can verify that the bucket is created and accessible by navigating to localhost:9000 in our web browser:

Minio Homepage

If we click Browse we can see that the bucket already has some data in it:

Minio Bucket

Great, that means our data is flowing from Postgres to Redpanda and from Redpanda to MinIO. This means the hard part is done, and we can start playing around with our data. Our data lake is ready, now we just need something to query it with so we can call it a Data Lakehouse!

Setting up DuckDB

While DuckDB is designed to be embedded in other applications, it also provides a command line interface that we can use to query our data lake. We can start DuckDB with the following command:

duckdb --init duckdb/init.sql -- Loading resources from duckdb/init.sql v0.5.1 7c111322d Enter ".help" for usage hints. D

This will start an in-memory datawarehouse for us in under a second! The --init flag is used to specify a file that will be executed when DuckDB starts. In our case, we use it to load the required modules and configurations for querying our data lake:

load 'httpfs'; set s3_endpoint='localhost:9000'; set s3_access_key_id='minio'; set s3_secret_access_key='minio123'; set s3_use_ssl= false; set s3_region='us-east-1'; set s3_url_style='path';

After initializing DuckDB, we can start querying our data lake. Let's start with a simple query to see how many users we have:

SELECT count(value.after.id) as user_count FROM read_parquet('s3://user-payments/debezium.public.user-*'); ┌────────────┐ │ user_count │ ├────────────┤ 152└────────────┘

Note that the schema of our tables is based on the Debezium envelope format, so we need to access the data through the value.after field, which is a STRUCT in DuckDB. To get a glimpse of the whole schema, we can take a look at the Redpanda Console, under schema registry.

Also note the read_parquet function, which is used to read data from the S3 bucket. The s3:// prefix is used to tell DuckDB that we want to read data from S3 or, in our case, MinIO. In the object name, we can use the * symbol to specify a wildcard, which will be expanded to all the objects that match the pattern. This is useful if we want to read data from multiple files, which is the case for our tables that are being replicated.

To showcase the power of DuckDB, let's join our two tables and query the 3 latest payments for each user:

WITH latest_payments_per_user AS ( SELECT p.* FROM (SELECT p.*, row_number() over (partition by value.after.user_id order by timestamp desc) as rn FROM read_parquet('s3://user-payments/debezium.public.payment-*') p) p WHERE rn <= 3 ) SELECT u.value.after.name, p.value.after.amount FROM read_parquet('s3://user-payments/debezium.public.user-*') AS u JOIN latest_payments_per_user AS p ON u.value.after.id = p.value.after.user_id LIMIT 9; ┌────────────────────┬────────┐ │ name │ amount │ ├────────────────────┼────────┤ │ Michael Green │ 8│ Michael Green │ 52│ Michael Green │ 99│ Christopher Holden │ 51│ Christopher Holden │ 21│ Christopher Holden │ 77│ Kristen Smith │ 5│ Kristen Smith │ 75│ Kristen Smith │ 4└────────────────────┴────────┘

As you can see, we can query our data lake with a simple SQL query, and we can even join multiple tables without having to even use any DDL - although it's definitely recommended to do so in a real-world scenario.

Conclusion

Let’s take a look at the concepts and tools we learned about in this tutorial: CDC to ingest database change events from PostgreSQL without putting extra load on the database. How to spin up and configure Debezium as our CDC platform. How to utilize Redpanda as the data streaming platform backbone of the pipeline. How to deploy a proof of concept data lake and processing engine with MinIO and DuckDB.

Not bad for a short blog post! Even though the actual tools can vary in real life scenarios; you probably won’t use MinIO as a data lake in production and your source database/service might not be (just) PostgreSQL, the core concepts of a streaming CDC pipeline will stay the same.

This stack is very powerful, and getting up and running is very easy, as you can see. If you didn't want to follow the tutorial step-by-step, you can find the full code in this GitHub repository. After cloning the repository all you have to do is docker-compose up and you are good to go.

Let's keep in touch

Subscribe and never miss another blog post, announcement, or community event. We hate spam and will never sell your contact information.