Introducing Kafka Migrator for Redpanda Connect

Move your workloads from any Kafka system to Redpanda with a single command

By
on
September 10, 2024

Moving existing workloads to a new system is just as important as its deployment, operation, and management. At Redpanda, we believe there’s power in simplicity, so we’re excited to share how we’re making end-to-end migration drastically easier.

Introducing Kafka Migrator, a tool designed to simplify migrations from any Apache Kafka® system to Redpanda. With fewer components to manage, this new tool has major benefits:

  • Single Go binary with no complex deployment or JVM tuning.
  • Single process to migrate topics, schemas, consumer groups, ACLs. No herding connectors or manual coordination.
  • Single metrics endpoint with all you need to know about progress and health. No second guessing.

Kafka Migrator is currently available in Redpanda Connect. Existing customers can try it with any Redpanda distribution: Cloud or Self-Managed. New users can try Kafka Migrator for free as part of Redpanda Serverless.

In this post, we’ll briefly cover what Kafka Migrator brings to the table and demonstrate a simple example of transferring data to Redpanda.

Why use Kafka Migrator?

With Kafka Migrator, you can move your workloads from any Kafka system to Redpanda using just one command.

While you can continue using MirrorMaker2 (MM2) and the Kafka Connect ecosystem with Redpanda clusters, we designed our new Kafka Migrator for Redpanda Connect to address common concerns voiced by customers and internal Redpanda operators alike.

  • Requires complex deployment and setup: Running MM2 in distributed mode involves the complexities and infrastructure overhead of running a second distributed system alongside Kafka.
  • Prone to misconfiguration: Config is spread across multiple connectors, worker nodes, and the Kafka Connect runtime itself, making it hard to use for developers without deep Kafka expertise.
  • Incomplete metrics experience: The default log level in Kafka Connect makes it difficult to debug issues with MM2 and other connectors. Still, increasing log verbosity can flood users with noise, making it difficult to diagnose problems.
  • Difficult to tune performance: Between JVM garbage collection and years of bolt-on fixes, it's hard to tune MM2 to keep up with high-volume data flows while maintaining minimal replication lag.

Redpanda’s Kafka Migrator in action

Here’s a quick demo of transferring data from a Kafka cluster to a Redpanda cluster using Kafka Migrator. You can also check the Redpanda Connect Docs on GitHub for help getting started.

Note that the Redpanda Connect components that enable Kafka Migrator’s functionality are:

For convenience, these are bundled together in the kafka_migrator_bundle input and kafka_migrator_bundle output templates. For those who prefer to read, here's a step-by-step breakdown of the demo.

1. Create the Docker containers

First, you’ll need two clusters. To keep it simple, you can run the Bitnami Kafka and Schema Registry Docker containers for the source cluster and a Redpanda Docker container for the destination cluster via Docker Compose.

services:
 source:
   image: bitnami/kafka
   environment:
     KAFKA_CFG_NODE_ID: 0
     KAFKA_CFG_PROCESS_ROLES: controller,broker
     KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
     KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
     KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_DOCKER:PLAINTEXT
     KAFKA_CFG_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_DOCKER://0.0.0.0:19092,CONTROLLER://:9093
     KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_DOCKER://source:19092
     KAFKA_CFG_AUTHORIZER_CLASS_NAME: "org.apache.kafka.metadata.authorizer.StandardAuthorizer"
     KAFKA_CFG_SUPER_USERS: User:redpanda;User:ANONYMOUS
   ports:
     - 9092:9092
     - 19092:19092
   healthcheck:
     test: [ "CMD", "kafka-topics.sh", "--bootstrap-server=localhost:9092", "--list" ]
     start_period: 5s
     interval: 3s


 init_source:
   image: bitnami/kafka
   working_dir: /opt/bitnami/kafka/bin
   entrypoint: /bin/bash
   depends_on:
     source:
       condition: service_healthy
   command: >
     -c "kafka-topics.sh --create --if-not-exists --topic foo --replication-factor=1 --partitions=2 --bootstrap-server source:19092 &&
         kafka-topics.sh --create --if-not-exists --topic bar --replication-factor=1 --partitions=2 --bootstrap-server source:19092 &&
         echo 'Created topics:' &&
         kafka-topics.sh --list --exclude-internal --bootstrap-server source:19092 &&
         kafka-acls.sh --bootstrap-server source:19092 --add --allow-principal User:redpanda --operation Read --topic foo &&
         kafka-acls.sh --bootstrap-server source:19092 --add --deny-principal User:redpanda --operation Read --topic bar
         echo 'Created ACLs:' &&
         kafka-acls.sh --bootstrap-server source:19092 --list"


 source_schema_registry:
   image: bitnami/schema-registry
   environment:
     SCHEMA_REGISTRY_KAFKA_BROKERS: PLAINTEXT://source:19092
   ports:
     - 8081:8081
   depends_on:
     source:
       condition: service_healthy


 destination:
   image: redpandadata/redpanda
   command:
     - redpanda
     - start
     - --node-id 0
     - --mode dev-container
     - --set rpk.additional_start_flags=[--reactor-backend=epoll]
     - --set redpanda.auto_create_topics_enabled=false
     - --kafka-addr 0.0.0.0:9093
     - --advertise-kafka-addr localhost:9093
     - --schema-registry-addr 0.0.0.0:8081
   ports:
     - 8082:8081
     - 9093:9093
     - 9645:9644
$ docker-compose -f docker-compose.yml up --force-recreate -V

Note: We used an init container above to create two topics, foo and bar, each with two partitions and an associated ACL.

2. Create schemas

Once the demo clusters are up and running, use curl to create a schema for each topic in the source cluster.

$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"name\": \"Foo\", \"type\": \"record\", \"fields\": [{\"name\": \"data\", \"type\": \"int\"}]}"}' http://localhost:8081/subjects/foo/versions
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"name\": \"Bar\", \"type\": \"record\", \"fields\": [{\"name\": \"data\", \"type\": \"int\"}]}"}' http://localhost:8081/subjects/bar/versions

3. Generate messages

Let's simulate an application with a producer and consumer actively publishing and reading messages on the source cluster. Use Redpanda Connect to generate some Avro-encoded messages and push them to the two topics you just created.

# generate_data.yaml


http:
 enabled: false


input:
 sequence:
   inputs:
     - generate:
         mapping: |
           let msg = counter()
           root.data = $msg


           meta kafka_topic = match $msg % 2 {
             0 => "foo"
             1 => "bar"
           }
         interval: 1s
         count: 0
         batch_size: 1


       processors:
         - schema_registry_encode:
             url: "http://localhost:8081"
             subject: ${! metadata("kafka_topic") }
             avro_raw_json: true


output:
 kafka_franz:
   seed_brokers: [ "localhost:9092" ]
   topic: ${! @kafka_topic }
   partitioner: manual
   partition: ${! random_int(min:0, max:1) }

You can start this pipeline and leave it running:

$ redpanda-connect run generate_data.yaml

You can also start a Redpanda Connect consumer, which reads messages from the source cluster topics and also leaves it running. (This consumer uses the foobar consumer group, which will be important later.)

# read_data_source.yaml


http:
 enabled: false


input:
 kafka_franz:
   seed_brokers: [ "localhost:9092" ]
   topics:
     - '^[^_]' # Skip topics which start with `_`
   regexp_topics: true
   start_from_oldest: true
   consumer_group: foobar


 processors:
   - schema_registry_decode:
       url: "http://localhost:8081"
       avro_raw_json: true


output:
 stdout: {}
 processors:
   - mapping: |
       root = this.merge({"count": counter(), "topic": @kafka_topic, "partition": @kafka_partition})
$ redpanda-connect run read_data_source.yaml

4. Configure and start Kafka Migrator

At this point, you can start the new Kafka Migrator Bundle, which will do the following:

  • On startup, it reads all the schemas from the source cluster Schema Registry through the REST API and pushes them to the destination cluster Schema Registry using the same API. It needs to preserve the schema IDs, so the destination cluster must not have any schemas in it.
  • Once the schemas have been imported, Kafka Migrator starts migrating all the selected topics from the source cluster and any associated ACLs. After it finishes creating all the topics and ACLs that don’t exist in the destination cluster, it starts migrating messages and performs consumer group offsets remapping.
  • If any new topics are created while Kafka Migrator is running, it’s necessary to write some messages to those topics for them to be migrated to the destination cluster.

ACL migration adheres to the following principles:

  • ALLOW WRITE ACLs for topics are not migrated
  • ALLOW ALL ACLs for topics are downgraded to ALLOW READ
  • Only topic ACLs are migrated, group ACLs are not migrated
Note: Changing topic configurations, such as partition count, isn’t currently supported.

Now, use the following Kafka Migrator Bundle configuration. Please refer to the Kafka Migrator docs for details.

# kafka_migrator_bundle.yaml


input:
 kafka_migrator_bundle:
   kafka_migrator:
     seed_brokers: [ "localhost:9092" ]
     topics:
       - '^[^_]' # Skip internal topics which start with `_`
     regexp_topics: true
     consumer_group: migrator_bundle
     start_from_oldest: true


   schema_registry:
     url: http://localhost:8081
     include_deleted: true
     subject_filter: ""


output:
 kafka_migrator_bundle:
   kafka_migrator:
     seed_brokers: [ "localhost:9093" ]
     max_in_flight: 1


   schema_registry:
     url: http://localhost:8082


metrics:
 prometheus: {}
 mapping: |
   meta label = if this == "input_kafka_migrator_lag" { "source" }
Note: The max_in_flight setting is important to preserve message ordering at the partition level. Please refer to the documentation for details.

Next, launch the migrator bundle with the example configuration:

$ redpanda-connect run kafka_migrator_bundle.yaml

5. Check the status of migrated topics

You’re ready to check which topics and ACLs have been migrated to the destination cluster.

Note: Roles are specific to Redpanda. For now, they have to be migrated manually.

$ rpk -X brokers=localhost:9093 -X admin.hosts=localhost:9645 topic list
NAME      PARTITIONS  REPLICAS
_schemas  1           1
bar       2           1
foo       2           1
$ rpk -X brokers=localhost:9093 -X admin.hosts=localhost:9645 security acl list
PRINCIPAL      HOST  RESOURCE-TYPE  RESOURCE-NAME  RESOURCE-PATTERN-TYPE  OPERATION  PERMISSION  ERROR
User:redpanda  *     TOPIC          bar            LITERAL                READ       DENY
User:redpanda  *     TOPIC          foo            LITERAL                READ       ALLOW

6. Check metrics to monitor progress

Redpanda Connect emits Prometheus metrics for monitoring and trending with your observability stack. Besides the standard Redpanda Connect metrics, the kafka_migrator input also emits an input_kafka_migrator_lag metric for each topic and partition, which can be used to monitor its progress.

$ curl http://localhost:4195/metrics
...
# HELP input_kafka_migrator_lag Benthos Gauge metric
# TYPE input_kafka_migrator_lag gauge
input_kafka_migrator_lag{label="source",partition="0",path="root.input.sequence.broker.inputs.0",topic="__consumer_offsets"} 0
input_kafka_migrator_lag{label="source",partition="0",path="root.input.sequence.broker.inputs.0",topic="bar"} 0
input_kafka_migrator_lag{label="source",partition="0",path="root.input.sequence.broker.inputs.0",topic="foo"} 0
input_kafka_migrator_lag{label="source",partition="1",path="root.input.sequence.broker.inputs.0",topic="__consumer_offsets"} 0
input_kafka_migrator_lag{label="source",partition="1",path="root.input.sequence.broker.inputs.0",topic="bar"} 1
input_kafka_migrator_lag{label="source",partition="1",path="root.input.sequence.broker.inputs.0",topic="foo"} 0
...

7. Read from the migrated topics

Finally, we can stop the read_data_source.yaml consumer we started previously and then start a similar consumer running against the destination cluster. Before starting the consumer up on the destination cluster, make sure to give the migrator bundle time to finish replicating the translated offset.

# read_data_destination.yaml


http:
 enabled: false


input:
 kafka_franz:
   seed_brokers: [ "localhost:9093" ]
   topics:
     - '^[^_]' # Skip topics which start with `_`
   regexp_topics: true
   start_from_oldest: true
   consumer_group: foobar


 processors:
   - schema_registry_decode:
       url: "http://localhost:8082"
       avro_raw_json: true


output:
 stdout: {}
 processors:
   - mapping: |
       root = this.merge({"count": counter(), "topic": @kafka_topic, "partition": @kafka_partition})
$ redpanda-connect run read_data_destination.yaml

And you’re all done!

It’s worth clarifying that the source cluster consumer uses the foobar consumer group. We started the destination cluster consumer using the same consumer group; as you can see, it resumes reading messages from where the source consumer left off.

Due to the mechanics of the Kafka protocol, we need to perform offset remapping when migrating consumer group offsets to the destination cluster. While more sophisticated approaches are possible, we used a simple timestamp-based approach. So, for each migrated offset, we first query the destination cluster to find the latest offset before the received offset timestamp. We then use that as the destination consumer group offset for the corresponding topic and partition pair.

Although the timestamp-based approach doesn’t guarantee exactly-once delivery, it minimizes the likelihood of message duplication and avoids the need for complex and error-prone offset remapping logic.

Get started with Kafka Migrator

Kafka Migrator is a convenient new tool that simplifies migrations from any Kafka system to Redpanda. It allows developers to easily move Kafka messages, schemas, and ACLs without digging into Kafka or Redpanda internals. We've designed Kafka Migrator as part of Redpanda Connect to make it simple to use and powerful to run at scale.

Ready to migrate without the migraine? Try Kafka Migrator yourself in Redpanda Cloud!

Graphic for downloading streaming data report
Redpanda Connect for Cloud
Christina Lin
&
&
&
September 12, 2024
Text Link
New AI connectors and GPU runtime support for AI models
Tyler Rockwood
&
&
&
September 12, 2024
Text Link
Cloud Topics: Efficiently stream data through object storage
Noah Watkins
&
Matt Schumpert
&
&
September 12, 2024
Text Link