Learn how to use ksqlDB with Redpanda to create a real-time materialized cache.

ByIvy WalobwaonJuly 12, 2022
How to use ksqlDB and Redpanda to build a materialized cache

Organizations often need to build real-time data-processing applications. Specialized tools for stream processing can help build such applications. In another article, you learned how to process data streams with Apache Flink . This article will show you how to do something similar with ksqlDB.

ksqlDB is an event-streaming database that simplifies real-time application building with two Apache Kafka API components — Kafka Connect and Kafka Streams — into a single system. This makes it possible to integrate the stream-processing application with different source systems.

With ksqlDB, you can use SQL queries for processing streaming data. Examples of such use cases include identifying anomalies in real-time data, log monitoring, tracking, and alerting. Using ksqlDB on top of Redpanda, which is API-compatible with Kafka, allows you to explore topics, transform data within topics, copy existing topics from one format to another, and more.

What is ksqlDB?

ksqlDB differs from other popular data-processing tools like Flink or Apache Spark in its ability to build complete streaming applications with only a small set of SQL statements—you don’t need to write Java/Scala/Python in addition to SQL statements when using ksqlDB.

ksqlDB has a simplified architecture and is deployed as a separate, scalable cluster. The interface for event capturing, processing, and query serving is combined into a single system.

Let’s take a look at how Redpanda and ksqlDB can be used together to build a stream-processing application.

Integrating ksqlDB with Redpanda

To set the scene, imagine that you have a database that stores emergency calls made by residents of different locations. It contains their names, emergency type, and area code. You frequently make a few specific queries, and you want to move those out of the database, precompute them, and store the results for fast access. Here, you can leverage the power of ksqlDB (computing) with Redpanda (storage) to build a materialized cache for quick access to the data.

ksqldb and redpanda

This tutorial will walk you through creating this materialized cache using Redpanda, ksqlDB server, and ksqlDB CLI and show you how to query it.

Specifically, you’ll learn how to do the following:

  • Install Redpanda
  • Install ksqlDB
  • Configure ksqlDB to ingest data from Redpanda

Prerequisites

Before getting started, you’ll need to have the following:

  • Docker and Docker Compose installed
  • Familiarity with Apache Kafka or other messaging systems (recommended for Redpanda)
  • Familiarity with SQL syntax (recommended for ksqlDB)

Setting up the stack

The image below is a schematic view of data flow within the system you’d use to process external data. To connect to the external sources, you’d have to set up your connectors. This tutorial doesn’t connect to any external source and uses mock data for the sake of simplicity.

ksqldb 2

Installing Redpanda

You can follow the detailed steps for installing Redpanda from the official documentation on your platform of choice. In this tutorial, you’ll install Redpanda using Docker Compose.

First, add the following configurations to a docker-compose.yml file to install Redpanda from its Docker image:

version: '3.9' services: redpanda: command: - redpanda - start - --smp - '1' - --reserve-memory - 0M - --overprovisioned - --set - redpanda.cluster_id=turning-red - --set - redpanda.enable_idempotence=true - --set - redpanda.enable_transactions=true - --set - redpanda.auto_create_topics_enabled=true - --node-id - '0' - --kafka-addr - PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 - --advertise-kafka-addr - PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092 image: docker.vectorized.io/vectorized/redpanda:v21.11.11 container_name: redpanda ports: - 9092:9092 - 29092:29092

Next, run the command below in the root directory of your docker-compose file to start a local Redpanda cluster:

docker-compose up -d

Now that your Redpanda cluster is running, you can do some test streaming.

Starting Redpanda

Run the command below to access the Redpanda Docker container’s command line:

docker exec -it redpanda /bin/sh

Then run the following command to create a calls topic (note the use of the rpk command-line utility for Redpanda):

$ rpk topic create calls --brokers=localhost:9092

Next, produce a message on the topic:

$ rpk topic produce calls --brokers=localhost:9092

Input some text into the topic, and once you’re finished, press Ctrl+C to exit the prompt.

Finally, consume the messages on the topic:

$ rpk topic consume calls --brokers=localhost:9092

Below is a sample output when consuming the messages:

##Output { "topic": "calls", "value": "3", "timestamp": 1650692216007, "partition": 0, "offset": 2 }

That’s it! You now have enough Redpanda knowledge to leverage the power of ksqlDB. Before moving on, do some cleanup by running the following command:

docker compose down

This command stops and removes the Redpanda container.

Installing ksqlDB

First, add the services below in your docker-compose.yml file:

ksqldb-server: image: confluentinc/ksqldb-server:0.25.1 hostname: ksqldb-server container_name: ksqldb-server depends_on: - redpanda ports: - "8088:8088" environment: KSQL_LISTENERS: "http://0.0.0.0:8088" KSQL_BOOTSTRAP_SERVERS: "redpanda:29092" KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true" KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true" ksqldb-cli: image: confluentinc/ksqldb-cli:0.25.1 container_name: ksqldb-cli depends_on: - redpanda - ksqldb-server entrypoint: /bin/sh tty: true

This code snippet creates two containers—the ksqlDB server and ksqlDB CLI—from their respective Docker images. The ksqlDB server is where your application runs, and the ksqlDB CLI allows you to interact with the server.

Then run the command below in the root directory of your docker-compose.yml file to start all three services:

docker-compose up -d

Finally, run the following command to check if the containers are running as expected:

docker stats

If everything is okay, you should have three containers running: ksqldb-cli, ksqldb-server, and redpanda.

Starting ksqlDB

To start ksqlDB and access its interface, run the command below:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

You should see something similar to this:

ksqldb 3

If the server isn’t responding, give it a while, exit the ksqlDB CLI, and then retry.

Configuring ksqlDB to ingest data from Redpanda

Now that your stack is running, it’s time to execute some ksqlDB code. You’ll use the ksqlDB CLI to interact with the server.

Creating a stream

Before you create your stream, enter the command below in the running instance of ksqlDB CLI:

SHOW TOPICS;

This displays a list of existing topics. At this point, you will see only default topics. You can now create a stream that matches the data in your database as shown below:

CREATE STREAM emergencies (name VARCHAR, reason VARCHAR, area VARCHAR) WITH (kafka_topic='call-center', value_format='json', partitions=1);

This command creates not only a stream but also a Redpanda topic named call-center, if it does not already exist. If the topic does already exist, the command defines the stream, which can then be selected from with SQL syntax.

Running the SHOW TOPICS; command displays the newly created topic.

Creating materialized views

To keep track of certain logic, you need to create a materialized view for the logic. Run the following commands in the ksqlDB CLI instance to do so.

The location_of_interest materialized view counts the number of distinct areas, identifies the latest area of the emergency call, and then groups the rows returned by the reason for the call:

// RUN 1

CREATE TABLE location_of_interest AS SELECT reason, count_distinct(area) AS distinct_pings, latest_by_offset(area) AS last_location FROM emergencies GROUP BY reason EMIT CHANGES;

The call_record materialized view counts the number of times a resident called based on the reason and groups them by the resident’s name:

// RUN 2

CREATE TABLE call_record AS SELECT name, count(reason) AS total_emergencies FROM emergencies GROUP BY name EMIT CHANGES;

Adding mock data

Now that you have a topic, a stream linked to your topic, and a materialized view to make your queries persistent, you can add some mock data to test your application.

First, open a new terminal and open the Redpanda terminal using the following command:

docker exec -it redpanda /bin/sh

Then you can produce messages on the topic:

$ rpk topic produce call-center --brokers=localhost:9092

Add the messages below in the terminal. Each message is produced to a partition and given a timestamp.

{"name":"Liam", "reason": "allergy", "area": "Florida"} {"name":"Fiona", "reason": "dizziness", "area": "Orlando"} {"name":"Mike", "reason": "pain", "area": "Florida"} {"name":"Louise", "reason": "allergy", "area": "Orlando"} {"name":"Steven", "reason": "stroke", "area": "New York"} {"name":"Liam", "reason": "pain", "area": "Florida"} {"name":"Louise", "reason": "dizziness", "area": "Hawai"} {"name":"Ivor", "reason": "choking", "area": "New York"} {"name":"Louise", "reason": "pain", "area": "Florida"} {"name":"Beckham", "reason": "allergy", "area": "New York"}

You are now ready to test your application by running some queries.

Running queries

Before you run any query, set the property below to ensure the queries run from the beginning of the topic:

SET 'auto.offset.reset' = 'earliest';

To run a query that terminates immediately after it has returned the results, test with the following command:

SELECT * FROM location_of_interest WHERE reason = 'allergy';

To run a query that keeps running and updates the results as more data comes in, use this command (note the use of the “EMIT CHANGES” clause):

SELECT * FROM call_record WHERE name = 'Louise' EMIT CHANGES;

If you open a new ksqlDB server and add some more mock data, the query above will update with the new data.

You can view failed ksqlDB messages by adding the following statements in your docker-compose.yml file:

environment: KSQL_LOG4J_ROOT_LOGLEVEL: "ERROR" KSQL_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR" KSQL_LOG4J_PROCESSING_LOG_BROKERLIST: kafka:29092 KSQL_LOG4J_PROCESSING_LOG_TOPIC: <ksql-processing-log-topic-name> KSQL_KSQL_LOGGING_PROCESSING_TOPIC_NAME: <ksql-processing-log-topic-name> KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true" KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"

To stop and remove the containers, run the command below:

docker-compose down

Conclusion

Now that you've been proprly introduced to ksqlDB and how to use it with Redpanda, you can take what you've learned in this tutorial and create a data stream-processing application with a materialized cache for any number of use cases.

As you saw, ksqlDB is easy to install and configure, and it lets you run standard SQL queries.

All the code in this tutorial can be found on GitHub. Try out Redpanda using the tutorial, interact with Redpanda’s developers directly in the Redpanda Community on Slack, or contribute to Redpanda’s source-available GitHub repo here. To learn more about everything you can do with Redpanda, check out our documentation here.

Let's keep in touch

Subscribe and never miss another blog post, announcement, or community event. We hate spam and will never sell your contact information.