Introduction
Having to load data from an operational database into a data warehouse/lake to prepare for analytical workloads is a common use case for data-driven organizations. Change data capture (CDC) is a pattern that enables this use-case by replicating the changes in the operational database into the warehouse in near real-time (or at least with low latency).
In this blog post, we will mimic a basic setup of CDC-ing an operational database into a data warehouse. We are going to showcase how simple it is to set up a streaming CDC pipeline using Redpanda & Debezium and how quickly you can get started with querying the data in the data lake using DuckDB.
Debezium is a fault tolerant, open-source, distributed platform for change data capture. Simply attaching it to a source database with some configuration will allow us to quickly start ingestion change events.
DuckDB is an in-process OLAP database management system. Think of it like a data warehouse that you can start on your machine just by running one process.
As usual, the code for this blog post is available on Github.
The use-case
Let's say we have a backend for an app with two tables: USER
and USER_PAYMENT
. These tables live in a PostgreSQL database and get updated frequently. The analytics team wants to use user & payment information to generate reports and the data science team would like to train their models on the data as soon as it's available.
Gone are the days when you would have to write a batch job to extract the data from the database and load it into a data warehouse (or lake) that is being managed by a separate team.
To enable both teams to work with the data we need to replicate the data from the operational database into a data lake where it can be queried and analyzed. We will use Redpanda as our streaming data platform and Debezium to replicate the change events from PostgreSQL into MinIO in parquet format. We will then query the data in the data lake using DuckDB, an in-memory SQL OLAP database.
MinIO is an open-source object storage that is S3 API-compatible. This compatibility makes it perfect to quickly spin up proof of concept projects which later can be easily moved to S3, if needed. For our use-case we will use MinIO as a data lake, to store our change events in parquet format.
If that was a bit too abstract, don't worry we'll go step-by-step! Let's dive in.
Humble beginnings
We will start by setting up a PostgreSQL database with our two source tables: USER
and PAYMENT
. We will then use a small python script to generate some data and insert it into the database. Everything will run in Docker containers, so you don't have to worry about setting up the database or a local development environment.
Our USER
table will have the following schema:
CREATE TABLE users
(
id VARCHAR(255) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL,
address VARCHAR(255) NOT NULL
);
Our USER_PAYMENT
table will have the following schema:
CREATE TABLE user_payments
(
id VARCHAR(255) NOT NULL,
user_id VARCHAR(255) NOT NULL,
amount INT NOT NULL
);
The postgres/init.sql
file contains the SQL statements to create the tables and will run on database startup, so we don't have to do anything manually.
The relevant part in the docker-compose.yml
for the database looks like this:
postgres:
image: postgres:13
container_name: postgres
hostname: postgres
environment:
- POSTGRES_USER=postgres
- POSTGRES_DB=postgres
- POSTGRES_PASSWORD=postgres
ports:
- "5432:5432"
volumes:
- ./postgres/init.sql:/docker-entrypoint-initdb.d/init.sql
- postgres:/var/lib/postgresql/data
command:
- "postgres"
- "-c"
- "wal_level=logical"
We are mounting the init.sql
file into the container so that the tables are created on startup. We are also setting the wal_level
to logical
so that we can use PostgreSQL's logical replication feature. This is the flag that allows Debezium to read the database's transaction log and generate change events.
If you would like to dive deeper into CDC with Debezium, check out this great article by Almas Maksotov.
Generating some data
We will use a small python script to generate some data and insert it into the database. The script will generate a new user and a random number of payments for that user every few seconds. The script will run in a Docker container, and we will use the same docker-compose.yml
file to start the container.
The script looks like this:
while True:
user_id = str(uuid.uuid4())
cur.execute(
f"INSERT INTO public.user (id, name, email, address) VALUES ('{user_id}','{fake.name()}','{fake.email()}','{fake.address()}');"
)
for i in range(random.randint(5, 15)):
cur.execute(
f"INSERT INTO public.payment (id, user_id, amount) VALUES ('{uuid.uuid4().hex}','{user_id}','{random.randint(1, 100)}');"
)
sleep(1)
conn.commit()
sleep(2)
There are a few sleeps in there to make sure that we generate some data at a reasonable rate, and it's easier to follow when developing.
Add the following to the docker-compose.yml
file (the Dockerfile is included in the repository!):
ย datagen:
ย ย build: ./datagen
ย ย container_name: datagen
Now we can start our database and the data generator:
docker-compose up -d postgres datagen
You should see some data being generated in the database:
psql -h localhost -U postgres -d postgres
postgres=# select * from public.user limit 3;
id | name | email | address
--------------------------------------+-------------------+-------------------------+--------------------------------
53938687-affa-45b8-a22c-3c4ecf2069b4 | Michael Rodriguez | qsmith@example.org | 5479 Coleman Glens Suite 836 +
| | | Annatown, OH 35759
aaf3a630-eae7-4558-9857-2640c89b172c | Madison Schultz | shelbybaker@example.net | 3034 Michael Streets Suite 452+
| | | East Shawnburgh, PW 56434
914a3215-7357-4d62-ba8f-f1c4fdb6743b | Thomas Smith | melissa85@example.com | 5835 Guzman Lights Suite 954 +
| | | Brianbury, TX 18767
(3 rows)
Cool, let's move on to the next step.
Setting up Redpanda
Redpanda will be our streaming data platform of choice. Because it's Kafka API-compatible, we can seamlessly integrate it with Debezium and start replicating our data. Redpanda is the perfect choice to store database change events because it's a log-based data store, and the incoming records only have to be appended to the end of the log, as if a record in the source database gets deleted or modified, we will just get a new record with the updated state and the operation type that triggered the event.
We will deploy three services in one go; Redpanda, Redpanda Console, and Debezium. The docker-compose.yml
file looks like this:
redpanda:
image: docker.vectorized.io/vectorized/redpanda:latest
container_name: redpanda
command:
- redpanda start
- --overprovisioned
- --smp 1
- --memory 1G
- --reserve-memory 0M
- --node-id 0
- --check=false
- --kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
- --advertise-kafka-addr PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
- --pandaproxy-addr 0.0.0.0:8082
- --advertise-pandaproxy-addr redpanda:8082
- --set redpanda.enable_transactions=true
- --set redpanda.enable_idempotence=true
- --set redpanda.auto_create_topics_enabled=true
ports:
- "9092:9092"
- "8081:8081"
- "8082:8082"
- "9644:9644"
redpanda-console:
container_name: redpanda-console
image: docker.redpanda.com/vectorized/console:latest
entrypoint: /bin/sh
command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console"
environment:
CONFIG_FILEPATH: /tmp/config.yml
CONSOLE_CONFIG_FILE: |
kafka:
brokers: ["redpanda:29092"]
schemaRegistry:
enabled: true
urls: ["http://redpanda:8081"]
redpanda:
adminApi:
enabled: true
urls: ["http://redpanda:9644"]
connect:
enabled: true
clusters:
- name: local-connect-cluster
url: http://connect:8083
ports:
- "8080:8080"
depends_on:
- redpanda
connect:
container_name: connect
build:
context: ./connect
dockerfile: Dockerfile
depends_on:
- redpanda
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: "redpanda:29092"
GROUP_ID: "1"
CONFIG_STORAGE_TOPIC: "users.configs"
OFFSET_STORAGE_TOPIC: "users.offset"
STATUS_STORAGE_TOPIC: "users.status"
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://redpanda:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://redpanda:8081
volumes:
- ./connect:/connectors
We will only use the Console to verify that our connectors are running properly. The configuration for our Redpanda and Redpanda Console service is pretty straightforward. For the Debezium service (the connect
service in the docker-compose.yml
) we are setting 4 important environment variables:
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://redpanda:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://redpanda:8081
These parameters tell Debezium to serialize the data (both record key and message) in Avro format. We will use the Redpanda Schema Registry to store the schemas, which is available from the same Redpanda service as the broker, on port 8081
. This will be very helpful when we are going to save our data on the other end in parquet format, which requires a schema definition.
The connect
service also has a volume attached to it, which is used to store the connector configuration. We will use this to store the configurations for our Debezium source connector and our S3 sink connector. Because the folder is mapped as a volume, we can easily add/modify configs without having to restart the service.
Now that the foundations are in place, we can start setting up Debezium. But first, we have to start our new services:
docker-compose up -d redpanda redpanda-console connect
This is a great opportunity to explore the Redpanda Console. It's a web-based UI that allows you to monitor your Redpanda cluster and Kafka Connect cluster. You can find it at http://localhost:8080
.
Setting up Debezium
After waiting for a few seconds so every service has time to initialize, we can exec into the connect
container and create our new connectors:
docker exec -it connect /bin/bash
curl -X PUT -H "Content-Type:application/json" localhost:8083/connectors/pg-src/config -d '@/connectors/pg-src.json'
curl -X PUT -H "Content-Type:application/json" localhost:8083/connectors/s3-sink/config -d '@/connectors/s3-sink.json'
The source connector configuration looks like this:
{
"name": "pg-src",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.server.name": "postgres-docker",
"database.dbname": "postgres",
"database.include.list": "postgres",
"topic.prefix": "debezium",
"schema.include.list": "public",
"plugin.name": "pgoutput",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://redpanda:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://redpanda:8081"
}
and the sink connector like this:
{
"name": "s3-sink",
"connector.class": "io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector",
"aws.access.key.id": "minio",
"aws.s3.bucket.name": "user-payments",
"aws.s3.endpoint": "http://minio:9000",
"aws.s3.region": "us-east-1",
"aws.secret.access.key": "minio123",
"format.output.type": "parquet",
"topics": "debezium.public.user,debezium.public.payment",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://redpanda:8081",
"file.compression.type": "none",
"format.output.fields": "key,value,offset,timestamp"
}
The source connector is configured to connect to the postgres
service, which is running in the same docker network, so we can use the service name as the hostname. The source connector is configured to use Avro for both the key and the value of the records.
The sink connector is configured to use the AivenKafkaConnectS3SinkConnector
class, which is provided by Aiven. This is a custom connector that we have to manually install in our Kafka Connect service. Don't worry, the provided Dockerfile takes care of this for us. The connector is configured to use the minio
service as the S3 endpoint, and the user-payments
bucket to store the data. The key and value converters are configured to use the same Avro converters as the source connector. We also specify that we want to store the data in parquet format, so we can ingest the data into DuckDB without any trouble.
If the connector creation is successful the API will just return the configuration for both. Also, to verify let's head over to the Redpanda Console (available at localhost:8080
) and check if the connectors are running:
Looks like both of our connectors are in the RUNNING
state, which means that they are ready to replicate our data.
Quick peek at the topics page, and we can quickly make sure data is flowing into Redpanda properly:
Now the last service we haven't started yet is our data lake, MinIO. MinIO is an open source S3 compatible object storage service. We will use it to store our data in parquet format. We can start it with the following command:
docker-compose up -d minio mc
The mc
service is a small container that we use to automatically create the user-payments
bucket in MinIO and set its permissions to public
. This is just for convenience, so we can easily access the data from our local machine.
After a few seconds, we can verify that the bucket is created and accessible by navigating to localhost:9000
in our web browser:
If we click Browse
we can see that the bucket already has some data in it:
Great, that means our data is flowing from Postgres to Redpanda and from Redpanda to MinIO. This means the hard part is done, and we can start playing around with our data. Our data lake is ready, now we just need something to query it with so we can call it a Data Lakehouse!
Setting up DuckDB
While DuckDB is designed to be embedded in other applications, it also provides a command line interface that we can use to query our data lake. We can start DuckDB with the following command:
duckdb --init duckdb/init.sql
-- Loading resources from duckdb/init.sql
v0.5.1 7c111322d
Enter ".help" for usage hints.
D
This will start an in-memory datawarehouse for us in under a second! The --init
flag is used to specify a file that will be executed when DuckDB starts. In our case, we use it to load the required modules and configurations for querying our data lake:
load 'httpfs';
set s3_endpoint='localhost:9000';
set s3_access_key_id='minio';
set s3_secret_access_key='minio123';
set s3_use_ssl= false;
set s3_region='us-east-1';
set s3_url_style='path';
After initializing DuckDB, we can start querying our data lake. Let's start with a simple query to see how many users we have:
SELECT count(value.after.id) as user_count FROM read_parquet('s3://user-payments/debezium.public.user-*');
โโโโโโโโโโโโโโ
โ user_count โ
โโโโโโโโโโโโโโค
โ 152 โ
โโโโโโโโโโโโโโ
Note that the schema of our tables is based on the Debezium envelope format, so we need to access the data through the value.after
field, which is a STRUCT
in DuckDB. To get a glimpse of the whole schema, we can take a look at the Redpanda Console, under schema registry.
Also note the read_parquet
function, which is used to read data from the S3 bucket. The s3://
prefix is used to tell DuckDB that we want to read data from S3 or, in our case, MinIO. In the object name, we can use the *
symbol to specify a wildcard, which will be expanded to all the objects that match the pattern. This is useful if we want to read data from multiple files, which is the case for our tables that are being replicated.
To showcase the power of DuckDB, let's join our two tables and query the 3 latest payments for each user:
WITH latest_payments_per_user AS (
SELECT p.* FROM
(SELECT p.*, row_number() over (partition by value.after.user_id order by timestamp desc) as rn
FROM read_parquet('s3://user-payments/debezium.public.payment-*') p) p
WHERE rn <= 3
)
SELECT
u.value.after.name,
p.value.after.amount
FROM read_parquet('s3://user-payments/debezium.public.user-*') AS u
JOIN latest_payments_per_user AS p ON u.value.after.id = p.value.after.user_id LIMIT 9;
โโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโ
โ name โ amount โ
โโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโค
โ Michael Green โ 8 โ
โ Michael Green โ 52 โ
โ Michael Green โ 99 โ
โ Christopher Holden โ 51 โ
โ Christopher Holden โ 21 โ
โ Christopher Holden โ 77 โ
โ Kristen Smith โ 5 โ
โ Kristen Smith โ 75 โ
โ Kristen Smith โ 4 โ
โโโโโโโโโโโโโโโโโโโโโโดโโโโโโโโโ
As you can see, we can query our data lake with a simple SQL query, and we can even join multiple tables without having to even use any DDL - although it's definitely recommended to do so in a real-world scenario.
Conclusion
Letโs take a look at the concepts and tools we learned about in this tutorial: CDC to ingest database change events from PostgreSQL without putting extra load on the database. How to spin up and configure Debezium as our CDC platform. How to utilize Redpanda as the data streaming platform backbone of the pipeline. How to deploy a proof of concept data lake and processing engine with MinIO and DuckDB.
Not bad for a short blog post! Even though the actual tools can vary in real life scenarios; you probably wonโt use MinIO as a data lake in production and your source database/service might not be (just) PostgreSQL, the core concepts of a streaming CDC pipeline will stay the same.
This stack is very powerful, and getting up and running is very easy, as you can see. If you didn't want to follow the tutorial step-by-step, you can find the full code in this GitHub repository. After cloning the repository all you have to do is docker-compose up
and you are good to go.
โ
Let's keep in touch
Subscribe and never miss another blog post, announcement, or community event. We hate spam and will never sell your contact information.