Integrating MongoDB with Debezium and Redpanda for CDC

Learn how to use MongoDB with Debezium and Redpanda for real-time change data capture.

By
on
September 8, 2022

Data sensitivity is becoming more and more important in the cloud-native era. Data is the key to success for any business, whether it is private user information that needs to be secured or metrics that make it possible to predict business trends. Thus, being able to access data in real time is particularly beneficial as you can use, consume, and process “fresh” data.

Redpanda, with its Apache Kafka-compatible API and fast mechanism, provides a data-pipeline backbone for streaming real-time data to be processed flawlessly.

Real-time data streaming generally needs faster and scalable persistence layers such as NoSQL databases. MongoDB is a modern NoSQL database that supports transactions, searching, and analytics. It’s horizontally scalable and easily manageable and configurable, making MongoDB one of the best NoSQL technologies.

Note: In this tutorial we will be integrating MongoDB with Redpanda via Debezium. This may be preferred in cases where users already have operational patterns in place with Debezium. If you prefer to use MongoDB’s Kafka source connector, this is also an option.

Redpanda’s features and MongoDB complement each other well in many use cases, especially those that require real-time data streaming. Managing the data is easy with MongoDB while Redpanda simplifies the streaming. You can keep your important, actively growing data like transactional logs, user-behavior data, or audit data on MongoDB and stream it to Redpanda to expose the data to other platforms for monitoring, tracing, or event triggering.

In this tutorial, you’ll learn how to do the following:

  • Run a Redpanda cluster in a containerized way by using Docker
  • Create a topic within Redpanda by using its rpk CLI
  • Run MongoDB as a Docker container
  • Run Kafka Connect as a Docker container and configure it with the MongoDB Debezium connector

You can refer to this GitHub repo for all code used in this demo at any time throughout the tutorial.

Prerequisites

You’ll need the following to get started:

  • A recent version of Docker installed on your machine. (Docker Desktop 4.6.1 was used for this article)
  • A command-line interface or terminal to run Docker commands
  • A docker network called pandonline (you can create the network by running this command on your terminal: docker network create pandonline)

Scenario: Tracking users’ classified ad visits

Through an example scenario with Pandonline Corp., a fictional web-based classified ad company, this tutorial demonstrates how you can integrate Redpanda with Debezium for CDC in MongoDB.

In this scenario, Pandonline’s website has a huge number of daily visitors, and they want to track and log these visitors’ classified ad visiting activity.

They save each user’s classified ad visit activity in MongoDB, and they want to expose this activity to tracking services by streaming it through Redpanda. They prefer to use CDC with Debezium to capture the changes in MongoDB and send them to Redpanda.

mongodb-debezium-redpanda-architecture

You are one of Pandonline’s experienced developers and must implement this architecture. They need a prototype from you before discussing any further implementation points with the architecture team, so they have asked you to produce a quick prototype that runs in Docker containers.

Running Redpanda

There are many options for preparing a Redpanda instance. For information on installing or running Redpanda on a variety of platforms, refer to this documentation.

For the purposes of this tutorial, Redpanda runs in a Docker container, which you can learn how to do here.

An example command for running Redpanda on Docker in the pandonline network is as follows:

docker run -d --name=redpanda-1 --rm \
    --network pandonline \
    -p 9092:9092 \
    -p 9644:9644 \
    docker.vectorized.io/vectorized/redpanda:latest \
    redpanda start \
    --advertise-kafka-addr redpanda-1 \
    --overprovisioned \
    --smp 1  \
    --memory 1G \
    --reserve-memory 500M \
    --node-id 0 \
    --check=false

The Docker network is required to make containers communicate.

Running and configuring MongoDB

Execute the following command to run MongoDB with the container instance name mongo-1 and replica set pandonline-mongo-rs:

docker run -d --rm --network pandonline -p 27017:27017 --name mongo-1 mongo mongod --replSet pandonline-mongo-rs

As the Debezium Kafka connector uses the operational logs of MongoDB, you must configure the MongoDB instance as a replica set. The operation logs (oplog) are only enabled in the replica-set mode.

To configure the MongoDB instance, you need to access the MongoDB shell, which you can do by running the following command:

docker exec -it mongo-1 mongo

Execute the following commands on the MongoDB shell:

config = {"_id":"pandonline-mongo-rs","members":[{"_id":0,"host":"mongo-1:27017"}]}
rs.initiate(config)

The first command assigns a variable called config that has the MongoDB replica set configuration. Then with the second command, you initiate the replica set using the configuration variable you set.

Type exit to exit the Mongo shell.

Running Kafka Connect for Debezium

Debezium provides a Kafka Connect container image that can be used with container technologies such as Docker. It is preconfigured, so it has all the needed connector binaries, including the one for Debezium’s MongoDB connector.

Execute the following command on your terminal to run Kafka Connect on Docker as a container:

docker run -d -it --rm --name connect-1 \
--network pandonline \
-p 8083:8083 -e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
-e STATUS_STORAGE_TOPIC=my_connect_statuses \
-e BOOTSTRAP_SERVERS=redpanda-1:9092 \
debezium/connect:1.9.5.Final

This Kafka Connect instance runs in the distributed mode, so it accepts connector configurations via a REST API.

To retrieve a list of connectors via a REST API, you can run the following command:

curl -H "Accept:application/json" localhost:8083/connectors

As no connector has been configured yet, you should see the message “curl: (52) Empty reply from server.”

Configuring the connector

To capture data changes on MongoDB and send them to any Redpanda topic, you must configure a Debezium connector for MongoDB. The following JSON is an example of a configured connector with the name classifieds-connector:

{
  "name": "classifieds-connector",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.hosts": "pandonline-mongo-rs/mongo-1:27017",
    "mongodb.name": "mongo-1",
    "collection.include.list": "testdb.classifieds"
  }
}

Note that the configuration includes io.debezium.connector.mongodb.MongoDbConnector as the connector class, which points to the Java binaries for MongoDB Debezium Connector in the connect cluster instance running in the Docker container.

To capture changes from a specific MongoDB database and a schema, you must define them in the mongodb.hosts and collection.include.list. The configuration above makes it possible to access a MongoDB instance on host mongo-1 and port 27017 on a replica set called pandonline-mongo-rs. The collection.include.list is a kind of allow list that defines the databases and the schemas to be connected.

Run the following curl command to apply the above configuration via Kafka Connect’s REST API:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
localhost:8083/connectors \
-d '{"name":"classifieds-connector","config":{"connector.class":"io.debezium.connector.mongodb.MongoDbConnector","mongodb.hosts":"pandonline-mongo-rs/mongo-1:27017","mongodb.name":"mongo-1","collection.include.list": "testdb.classifieds"}}'

You can examine the created connectors using the following code:

curl -H "Accept:application/json" localhost:8083/connectors

The output should be as follows:

["classifieds-connector"]

If you are getting this output, it means your connector task has been successfully created in the Kafka Connect instance.

Creating the Redpanda topic and consuming data

By default, the Debezium MongoDB connector sends the captured data to a topic with a specific name: it uses mongodb.name and collection.include.list, which consists of [database_name].[schema_name] pairs. So the overall topic name should consist of [mongodb_instance_name].[database_name].[schema_name].

For the example in this tutorial, the topic name is mongo-1.testdb.classifieds, and you can use the following command to create this topic in the Redpanda cluster:

docker exec -it redpanda-1 \
    rpk topic create mongo-1.testdb.classifieds

Validate the mongo-1.testdb.classifieds topic with the following command:

docker exec -it redpanda-1 \
    rpk cluster info

This returns the following output:

BROKERS
=======
ID    HOST        PORT
0*    redpanda-1  9092

TOPICS
======
NAME                        PARTITIONS  REPLICAS
...output omitted...
 mongo-1.testdb.classifieds  1           1
...output omitted...

In the same terminal window, run the following command to start consuming from the mongo-1.testdb.classifieds topic:

docker exec -it redpanda-1 \
rpk topic consume mongo-1.testdb.classifieds

Leave the terminal window open to watch the consumed messages for the next steps.

Capturing the change events

Pandonline has provided you with a simple containerized application for testing the CDC. It creates random classified ad records and saves them to your MongoDB instance in five-second intervals.

The following is an example classified-ad record:

{
   "classifiedId":2696,
   "userId":8649,
   "url":"www.pandonline-classified-ads.com/adlgfxzccr"
}

To run the containerized application, open a new terminal window and run the following command:

docker logs -f $(docker run -d -it --rm  \
--network pandonline \
quay.io/systemcraftsman/visited_classified_ads_gateway)

You should see the generated logs as the application saves the records to the MongoDB classifieds schema:

...output omitted...
quay.io/systemcraftsman/visited_classified_ads_gateway)
Classified data is saved: {'classifiedId': 3938, 'userId': 7575, 'url': 'www.pandonline-classified-ads.com/ixulfgglbv', '_id': ObjectId('62cba65993884cdcaea95d50')}
Classified data is saved: {'classifiedId': 2704, 'userId': 6630, 'url': 'www.pandonline-classified-ads.com/jpchradmuf', '_id': ObjectId('62cba65e93884cdcaea95d51')}
...output omitted...

Return to your Redpanda consumer terminal, where you should see the changes captured and streamed into the relevant Redpanda topic:

...output omitted...
{
   "topic":"mongo-1.testdb.classifieds",
   "key":"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"id\"}],\"optional\":false,\"name\":\"mongo_1.testdb.classifieds.Key\"},\"payload\":{\"id\":\"{\\\"$oid\\\": \\\"62cba96293884cdcaea95deb\\\"}\"}}",
   "value":"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Json\",\"version\":1,\"field\":\"after\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Json\",\"version\":1,\"field\":\"patch\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Json\",\"version\":1,\"field\":\"filter\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,last,false\"},\"default\":\"false\",\"field\":\"snapshot\"},{\"type\":\"string\",\"optional\":false,\"field\":\"db\"},{\"type\":\"string\",\"optional\":true,\"field\":\"sequence\"},{\"type\":\"string\",\"optional\":false,\"field\":\"rs\"},{\"type\":\"string\",\"optional\":false,\"field\":\"collection\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"ord\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"h\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"tord\"},{\"type\":\"string\",\"optional\":true,\"field\":\"stxnid\"}],\"optional\":false,\"name\":\"io.debezium.connector.mongo.Source\",\"field\":\"source\"},{\"type\":\"string\",\"optional\":true,\"field\":\"op\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ms\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"total_order\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"data_collection_order\"}],\"optional\":true,\"field\":\"transaction\"}],\"optional\":false,\"name\":\"mongo_1.testdb.classifieds.Envelope\"},\"payload\":{\"after\":\"{\\\"_id\\\": {\\\"$oid\\\": \\\"62cba96293884cdcaea95deb\\\"},\\\"classifiedId\\\": 3697,\\\"userId\\\": 8867,\\\"url\\\": \\\"www.pandonline-classified-ads.com/camywtnsft\\\"}\",\"patch\":null,\"filter\":null,\"source\":{\"version\":\"1.6.3.Final\",\"connector\":\"mongodb\",\"name\":\"mongo-1\",\"ts_ms\":1657514338000,\"snapshot\":\"false\",\"db\":\"testdb\",\"sequence\":null,\"rs\":\"my-mongo-set\",\"collection\":\"classifieds\",\"ord\":2,\"h\":null,\"tord\":null,\"stxnid\":\"cbcaccdd-579a-389f-88b8-a23cef4618fc:156\"},\"op\":\"c\",\"ts_ms\":1657514338571,\"transaction\":null}}",
   "timestamp":1657514338842,
   "partition":0,
   "offset":2097
}{
   "topic":"mongo-1.testdb.classifieds",
   "key":"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"id\"}],\"optional\":false,\"name\":\"mongo_1.testdb.classifieds.Key\"},\"payload\":{\"id\":\"{\\\"$oid\\\": \\\"62cba96492bb209e6ce7ff1a\\\"}\"}}",
   "value":"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Json\",\"version\":1,\"field\":\"after\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Json\",\"version\":1,\"field\":\"patch\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Json\",\"version\":1,\"field\":\"filter\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,last,false\"},\"default\":\"false\",\"field\":\"snapshot\"},{\"type\":\"string\",\"optional\":false,\"field\":\"db\"},{\"type\":\"string\",\"optional\":true,\"field\":\"sequence\"},{\"type\":\"string\",\"optional\":false,\"field\":\"rs\"},{\"type\":\"string\",\"optional\":false,\"field\":\"collection\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"ord\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"h\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"tord\"},{\"type\":\"string\",\"optional\":true,\"field\":\"stxnid\"}],\"optional\":false,\"name\":\"io.debezium.connector.mongo.Source\",\"field\":\"source\"},{\"type\":\"string\",\"optional\":true,\"field\":\"op\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ms\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"total_order\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"data_collection_order\"}],\"optional\":true,\"field\":\"transaction\"}],\"optional\":false,\"name\":\"mongo_1.testdb.classifieds.Envelope\"},\"payload\":{\"after\":\"{\\\"_id\\\": {\\\"$oid\\\": \\\"62cba96492bb209e6ce7ff1a\\\"},\\\"classifiedId\\\": 4354,\\\"userId\\\": 2,\\\"url\\\": \\\"www.pandonline-classified-ads.com/ackvkzggjv\\\"}\",\"patch\":null,\"filter\":null,\"source\":{\"version\":\"1.6.3.Final\",\"connector\":\"mongodb\",\"name\":\"mongo-1\",\"ts_ms\":1657514340000,\"snapshot\":\"false\",\"db\":\"testdb\",\"sequence\":null,\"rs\":\"my-mongo-set\",\"collection\":\"classifieds\",\"ord\":1,\"h\":null,\"tord\":null,\"stxnid\":\"b243f8ff-7aca-34e1-91ec-0ffb88f14468:207\"},\"op\":\"c\",\"ts_ms\":1657514340721,\"transaction\":null}}",
   "timestamp":1657514340842,
   "partition":0,
   "offset":2098
}
...output omitted...

This is the data captured by Debezium and sent to the Redpanda topic. Because this data consists of the JSON schema structure as well, you might not notice the actual captured data payload at the first sight among the huge JSON output.

Schemas make sure that the data you are transferring has a structure. However, depending on the use case, you might not need a schema along with your actual payload.

As you are only completing this tutorial for prototyping purposes for Pandonline Corp., you can skip using the JSON schema and just stream the payloads:

{
  "name": "classifieds-connector",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.hosts": "pandonline-mongo-rs/mongo-1:27017",
    "mongodb.name": "mongo-1",
    "collection.include.list": "testdb.classifieds",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable":"false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable":"false"
  }
}

In a new terminal window, run the following command to update the connector configuration with the preceding one:

curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" \
localhost:8083/connectors/classifieds-connector/config \
-d '{"connector.class":"io.debezium.connector.mongodb.MongoDbConnector","mongodb.hosts":"pandonline-mongo-rs/mongo-1:27017","mongodb.name":"mongo-1","collection.include.list": "testdb.classifieds","key.converter": "org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"false","value.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false"}'

The output should be as follows:

...output omitted...
{
  "topic": "mongo-1.testdb.classifieds",
  "key": "{\"id\":\"{\\\"$oid\\\": \\\"62cbba4e920e0661fc10e660\\\"}\"}",
  "value": "{\"after\":\"{\\\"_id\\\": {\\\"$oid\\\": \\\"62cbba4e920e0661fc10e660\\\"},\\\"classifiedId\\\": 2878,\\\"userId\\\": 6975,\\\"url\\\": \\\"www.pandonline-classified-ads.com/zsxzrqolgm\\\"}\",\"patch\":null,\"filter\":null,\"source\":{\"version\":\"1.6.3.Final\",\"connector\":\"mongodb\",\"name\":\"mongo-1\",\"ts_ms\":1657518670000,\"snapshot\":\"false\",\"db\":\"testdb\",\"sequence\":null,\"rs\":\"my-mongo-set\",\"collection\":\"classifieds\",\"ord\":1,\"h\":null,\"tord\":null,\"stxnid\":\"1e0e101d-13bd-39bd-a697-184cdd9e9ce6:380\"},\"op\":\"c\",\"ts_ms\":1657518670243,\"transaction\":null}",
  "timestamp": 1657518670395,
  "partition": 0,
  "offset": 5557
}
{
  "topic": "mongo-1.testdb.classifieds",
  "key": "{\"id\":\"{\\\"$oid\\\": \\\"62cbba4e4b1acb3174a70406\\\"}\"}",
  "value": "{\"after\":\"{\\\"_id\\\": {\\\"$oid\\\": \\\"62cbba4e4b1acb3174a70406\\\"},\\\"classifiedId\\\": 2163,\\\"userId\\\": 5195,\\\"url\\\": \\\"www.pandonline-classified-ads.com/nlkgrpbxiy\\\"}\",\"patch\":null,\"filter\":null,\"source\":{\"version\":\"1.6.3.Final\",\"connector\":\"mongodb\",\"name\":\"mongo-1\",\"ts_ms\":1657518670000,\"snapshot\":\"false\",\"db\":\"testdb\",\"sequence\":null,\"rs\":\"my-mongo-set\",\"collection\":\"classifieds\",\"ord\":2,\"h\":null,\"tord\":null,\"stxnid\":\"e1937986-4b8a-3560-8109-df94b3c3c770:382\"},\"op\":\"c\",\"ts_ms\":1657518670261,\"transaction\":null}",
  "timestamp": 1657518670395,
  "partition": 0,
  "offset": 5558
}
...output omitted...

As you can see, only the captured data and its metadata is sent to Redpanda. The value field contains data of before and after. In this case, you have no previous data, so only the after data is included. If this was an update, the before data would also be included. You can selectively filter this data to be streamed to the topic using Kafka Connect Debezium transformations.

Conclusion

Congratulations! With the help of this tutorial, you’ve learned how to integrate Redpanda with Debezium for CDC in MongoDB. Having completed this tutorial, you can apply what you’ve learned here to do real-time streaming from NoSQL databases for various future applications.

MongoDB is an ideal NoSQL database for storing actively growing data such as transactional logs, user behavior data, or audit data. The MongoDB Kafka Connect connector of Debezium makes it possible to stream that data to Redpanda and expose it to other platforms for monitoring, tracing, or event-triggering use cases. For more information about how to integrate Redpanda with different technologies, check out the tutorials on the Redpanda blog.

Take Redpanda's free Community edition for a test drive here or check out the documentation to understand the nuts and bolts of how the platform works. To ask our Solution Architects and Core Engineers questions and interact with other Redpanda users, join the Redpanda Community on Slack.

As a reminder, you can find the resources for this tutorial in this repository on GitHub.

Graphic for downloading streaming data report
Save Your Spot

Related articles

VIEW ALL POSTS
Build an inventory monitoring system with Flink and MongoDB
Rexford A. Nyarko
&
&
&
October 29, 2024
Text Link
8 business benefits of real-time analytics
Redpanda
&
&
&
October 22, 2024
Text Link
Vector databases vs. knowledge graphs for streaming data applications
Fortune Adekogbe
&
&
&
October 15, 2024
Text Link