Pandaproxy allows access to Redpanda via a REST API.

ByBen PopeonApril 27, 2021
Pandaproxy: Bringing Kafka to the masses

At Redpanda, we like to make things simple. Redpanda is an Apache Kafka®-compatible event streaming platform that eliminates ZooKeeper and the JVM, autotunes itself for modern hardware, and ships in a single binary.

There are many high-quality Kafka clients for lots of languages, but wouldn't it be nice if you could just fire up your favorite HTTP CLI or library to produce and consume a stream of events?

Well, we're pleased to announce Pandaproxy, a new subsystem of Redpanda that allows access to your data through a REST API!

It's already available in Redpanda, so if you have Redpanda installed, make sure it's up to date. If not, follow the instructions in the  LinuxMacOSKubernetes, or Docker quick start guides

If you want to leave the infrastructure issues to us, sign up for Redpanda Cloud for the simplest way to run Redpanda.

Example: produce and consume

Let's jump right in and start Redpanda using Docker on Linux:

docker network create redpanda
docker volume create redpanda
docker run \
  --pull=always \
  --name=redpanda \
  --net=redpanda \
  -v "redpanda:/var/lib/redpanda/data" \
  -p 8082:8082 \
  -p 9092:9092 \
  --detach \
  vectorized/redpanda start \
  --overprovisioned \
  --smp 1 \
  --memory 1G \
  --reserve-memory 0M \
  --node-id 0 \
  --check=false \
  --pandaproxy-addr 0.0.0.0:8082 \
  --advertise-pandaproxy-addr 127.0.0.1:8082 \
  --kafka-addr 0.0.0.0:9092 \
  --advertise-kafka-addr redpanda:9092

Create the topic my_topic :

docker run \
  --net=redpanda \
  vectorized/redpanda \
  --brokers=redpanda:9092 \
  topic create my_topic \
  --partitions=3 \
  --replicas=1

Now we're ready to start using Pandaproxy!

Endpoints are documented with Swagger at http://localhost:8082/v1.

I'm using jq to prettify and process the JSON responses.

We'll use the popular requests module (pip install requests).

For the rest of the guide, we'll assume the following for an interactive python session:

import requests
import json
def pretty(text):
  print(json.dumps(text, indent=2))

base_uri = "http://localhost:8082"

List topics

curl -s "localhost:8082/topics" | jq .
res = requests.get(f"{base_uri}/topics").json()
pretty(res)
[
  "my_topic"
]

Produce to a topic

We need to POST a list of records to the /topics/{topic} endpoint.

JSON and base64 encoded payloads are currently supported, specified with a Content-Type of application/vnd.kafka.json.v2+json or application/vnd.kafka.binary.v2+json, respectively. We'll use JSON:

curl -s \
  -X POST \
  "http://localhost:8082/topics/my_topic" \
  -H "Content-Type: application/vnd.kafka.json.v2+json" \
  -d '{
  "records":[
      {
          "value":"Vectorized",
          "partition":0
      },
      {
          "value":"Pandaproxy",
          "partition":1
      },
      {
          "value":"JSON Demo",
          "partition":2
      }
  ]
}' | jq .
res = requests.post(
    url=f"{base_uri}/topics/my_topic",
    data=json.dumps(
        dict(records=[
            dict(value="Vectorized", partition=0),
            dict(value="Pandaproxy", partition=1),
            dict(value="JSON Demo", partition=2)
        ])),
    headers={"Content-Type": "application/vnd.kafka.json.v2+json"}).json()
pretty(res)
{
  "offsets": [
    {
      "partition": 0,
      "offset": 0
    },
    {
      "partition": 1,
      "offset": 0
    },
    {
      "partition": 2,
      "offset": 0
    }
  ]
}

If a partition is not specified, one is chosen based on a murmur2 hash of the key. If there is no key, partitions are chosen using a round-robin strategy.

Create a consumer

Consumers belong to a consumer group. If you have many consumers in a group, messages are distributed between all consumers.

The standard protocol for interacting with the REST api is specified with: Content-Type: application/vnd.kafka.v2+json.

We need to POST the consumer configuration to the /consumers/{consumer_group} endpoint, let's call the consumer my_consumer and call the consumer group my_group, with format = json:

curl -s \
  -X POST \
  "http://localhost:8082/consumers/my_group"\
  -H "Content-Type: application/vnd.kafka.v2+json" \
  -d '{
  "format":"json",
  "name":"my_consumer",
  "auto.offset.reset":"earliest",
  "auto.commit.enable":"false",
  "fetch.min.bytes": "1",
  "consumer.request.timeout.ms": "10000"
}' | jq .
res = requests.post(
    url=f"{base_uri}/consumers/my_group",
    data=json.dumps({
        "name": "my_consumer",
        "format": "json",
        "auto.offset.reset": "earliest",
        "auto.commit.enable": "false",
        "fetch.min.bytes": "1",
        "consumer.request.timeout.ms": "10000"
    }),
    headers={"Content-Type": "application/vnd.kafka.v2+json"}).json()
consumer_base_uri = res["base_uri"]
pretty(res)
{
  "instance_id": "my_consumer",
  "base_uri": "http://127.0.0.1:8082/consumers/my_group/instances/my_consumer"
}

We'll need the base_uri for further interaction with the consumer, as it identifies the consumer, but also the particular Pandaproxy instance if we're connecting to a Redpanda cluster.

Subscribe the consumer

Consumer groups listen to topics. To subscribe a consumer group to a list of topics, subscribe any of the consumers in the group to the topics.

We need to POST a list of topics for the /consumers/{consumer_group}/instances/{consumer}/subscription endpoint.

Subscribe my_consumer to topic my_topic:

curl -s -o /dev/null -w "%{http_code}" \
  -X POST \
  "http://localhost:8082/consumers/my_group/instances/my_consumer/subscription"\
  -H "Content-Type: application/vnd.kafka.v2+json" \
  -d '{
  "topics": [
     "my_topic"
  ]
}'
res = requests.post(
    url=f"{consumer_base_uri}/subscription",
    data=json.dumps({"topics": ["my_topic"]}),
    headers={"Content-Type": "application/vnd.kafka.v2+json"})

Consume messages

To retrieve messages, send a GET to the /consumers/{consumer_group}/instances/{consumer}/records endpoint.

We'll consume JSON encoded messages, so we have to specify the Accept header: Accept: application/vnd.kafka.json.v2+json.

curl -s \
  "http://localhost:8082/consumers/my_group/instances/my_consumer/records?timeout=1000&max_bytes=100000"\
  -H "Accept: application/vnd.kafka.json.v2+json"  | jq .
res = requests.get(
    url=f"{consumer_base_uri}/records",
    params={"timeout":1000,"max_bytes":100000},
    headers={"Accept": "application/vnd.kafka.json.v2+json"}).json()
pretty(res)
[
  {
    "topic": "my_topic",
    "key": null,
    "value": "Vectorized",
    "partition": 0,
    "offset": 0
  },
  {
    "topic": "my_topic",
    "key": null,
    "value": "Pandaproxy",
    "partition": 1,
    "offset": 0
  },
  {
    "topic": "my_topic",
    "key": null,
    "value": "JSON Demo",
    "partition": 2,
    "offset": 0
  }
]

Get consumer offsets

To get the offsets of consumers in the group, we need to send a GET to the /consumers/{consumer_group}/instances/{consumer}/offsets endpoint with the topics and partitions:

curl -s \
   -X 'GET' \
  'http://localhost:8082/consumers/my_group/instances/my_consumer/offsets' \
  -H 'accept: application/vnd.kafka.v2+json' \
  -H 'Content-Type: application/vnd.kafka.v2+json' \
  -d '{
  "partitions": [
    {
      "topic": "my_topic",
      "partition": 0
    },
    {
      "topic": "my_topic",
      "partition": 1
    },
    {
      "topic": "my_topic",
      "partition": 2
    }
  ]
}' | jq .
res = requests.get(
    url=f"{consumer_base_uri}/offsets",
    data=json.dumps(
        dict(partitions=[
            dict(topic="my_topic", partition=p) for p in [0, 1, 2]
        ])),
    headers={"Content-Type": "application/vnd.kafka.v2+json"}).json()
pretty(res)
{
  "offsets": [
    {
      "topic": "my_topic",
      "partition": 0,
      "offset": -1,
      "metadata": ""
    },
    {
      "topic": "my_topic",
      "partition": 1,
      "offset": -1,
      "metadata": ""
    },
    {
      "topic": "my_topic",
      "partition": 2,
      "offset": -1,
      "metadata": ""
    }
  ]
}

Commit offsets

Once a consumer has handled messages, the offsets can be committed so the consumer group won't retrieve them again.

We need to POST offsets to the /consumers/{consumer_group}/instances/{consumer}/offsets endpoint:

curl -s -o /dev/null -w "%{http_code}" \
   -X 'POST' \
  'http://localhost:8082/consumers/my_group/instances/my_consumer/offsets' \
  -H 'accept: application/vnd.kafka.v2+json' \
  -H 'Content-Type: application/vnd.kafka.v2+json' \
  -d '{
  "partitions": [
    {
      "topic": "my_topic",
      "partition": 0,
      "offset": 0
    },
    {
      "topic": "my_topic",
      "partition": 1,
      "offset": 0
    },
    {
      "topic": "my_topic",
      "partition": 2,
      "offset": 0
    }
  ]
}'
res = requests.post(
    url=f"{consumer_base_uri}/offsets",
    data=json.dumps(
        dict(partitions=[
            dict(topic="my_topic", partition=p, offset=0) for p in [0, 1, 2]
        ])),
    headers={"Content-Type": "application/vnd.kafka.v2+json"})

Remove consumer

To remove a consumer from a group, send DELETE to its base_uri:

curl -s -o /dev/null -w "%{http_code}" \
   -X 'DELETE' \
  'http://localhost:8082/consumers/my_group/instances/my_consumer' \
  -H 'Content-Type: application/vnd.kafka.v2+json'
res = requests.delete(
    url=f"{consumer_base_uri}",
    headers={"Content-Type": "application/vnd.kafka.v2+json"})

Now we can clean up:

docker stop redpanda
docker rm redpanda
docker volume remove redpanda
docker network remove redpanda

Pandaproxy Status

We'll be adding more endpoints and more encodings. For an up-to-date list of features and their status see the Pandaproxy features meta-issue on GitHub.

Pandaproxy is built on the same principles as Redpanda, but has not yet been optimized for performance. We are continuing to work on Pandaproxy, so make sure you join our Slack Community to get updates on the progress!

To learn more about Pandaproxy, read the latest in our documentation.

Let's keep in touch

Subscribe and never miss another blog post, announcement, or community event. We hate spam and will never sell your contact information.