Pandaproxy allows access to Redpanda via a REST API.

ByBen PopeonApril 27, 2021
Pandaproxy: Bringing Kafka to the masses

At Redpanda, we like to make things simple. Redpanda is an Apache Kafka®-compatible event streaming platform that eliminates Zookeeper® and the JVM, autotunes itself for modern hardware, and ships in a single binary.

There are many high quality Kafka clients for lots of languages, but wouldn't it be nice if you could just fire up your favourite HTTP CLI or library to produce and consume a stream of events?

We're pleased to announce the beta release of Pandaproxy, a new subsystem of Redpanda that allows access to your data through a REST API.

It's already available in Redpanda, so if you have Redpanda installed, make sure it's up to date. If not, follow the instructions in the Linux, MacOS, Kubernetes, or Docker quick start guides

If you want to leave the infrastructure issues to us, sign up for Vectorized Cloud for the simplest way to run Redpanda.

Produce and Consume Example

Start Redpanda

Let's jump right in and start Redpanda using Docker on Linux:

docker network create redpanda docker volume create redpanda docker run \ --pull=always \ --name=redpanda \ --net=redpanda \ -v "redpanda:/var/lib/redpanda/data" \ -p 8082:8082 \ -p 9092:9092 \ --detach \ vectorized/redpanda start \ --overprovisioned \ --smp 1 \ --memory 1G \ --reserve-memory 0M \ --node-id 0 \ --check=false \ --pandaproxy-addr \ --advertise-pandaproxy-addr \ --kafka-addr \ --advertise-kafka-addr redpanda:9092

Create the topic my_topic:

docker run \ --net=redpanda \ vectorized/redpanda \ --brokers=redpanda:9092 \ topic create my_topic \ --partitions=3 \ --replicas=1

Now we're ready to start using Pandaproxy!

Endpoints are documented with Swagger at http://localhost:8082/v1.

I'm using jq to prettify and process the JSON responses.

We'll use the popular requests module (pip install requests).

For the rest of the guide, we'll assume the following for an interactive python session:

import requests import json def pretty(text): print(json.dumps(text, indent=2)) base_uri = "http://localhost:8082"

List topics

curl -s "localhost:8082/topics" | jq .
res = requests.get(f"{base_uri}/topics").json() pretty(res)
[ "my_topic" ]

Produce to a topic

We need to POST a list of records to the /topics/{topic} endpoint.

JSON and base64 encoded payloads are currently supported, specified with a Content-Type of application/vnd.kafka.json.v2+json or application/vnd.kafka.binary.v2+json, respectively. We'll use JSON:

curl -s \ -X POST \ "http://localhost:8082/topics/my_topic" \ -H "Content-Type: application/vnd.kafka.json.v2+json" \ -d '{ "records":[ { "value":"Vectorized", "partition":0 }, { "value":"Pandaproxy", "partition":1 }, { "value":"JSON Demo", "partition":2 } ] }' | jq .
res = url=f"{base_uri}/topics/my_topic", data=json.dumps( dict(records=[ dict(value="Vectorized", partition=0), dict(value="Pandaproxy", partition=1), dict(value="JSON Demo", partition=2) ])), headers={"Content-Type": "application/vnd.kafka.json.v2+json"}).json() pretty(res)
{ "offsets": [ { "partition": 0, "offset": 0 }, { "partition": 1, "offset": 0 }, { "partition": 2, "offset": 0 } ] }

If a partition is not specified, one is chosen based on a murmur2 hash of the key. If there is no key, partitions are chosen using a round-robin strategy.

Create a consumer

Consumers belong to a consumer group. If you have many consumers in a group, messages are distributed between all consumers.

The standard protocol for interacting with the REST api is specified with: Content-Type: application/vnd.kafka.v2+json.

We need to POST the consumer configuration to the /consumers/{consumer_group} endpoint, let's call the consumer my_consumer and call the consumer group my_group, with format = json:

curl -s \ -X POST \ "http://localhost:8082/consumers/my_group"\ -H "Content-Type: application/vnd.kafka.v2+json" \ -d '{ "format":"json", "name":"my_consumer", "auto.offset.reset":"earliest", "auto.commit.enable":"false", "fetch.min.bytes": "1", "": "10000" }' | jq .
res = url=f"{base_uri}/consumers/my_group", data=json.dumps({ "name": "my_consumer", "format": "json", "auto.offset.reset": "earliest", "auto.commit.enable": "false", "fetch.min.bytes": "1", "": "10000" }), headers={"Content-Type": "application/vnd.kafka.v2+json"}).json() consumer_base_uri = res["base_uri"] pretty(res)
{ "instance_id": "my_consumer", "base_uri": "" }

We'll need the base_uri for further interaction with the consumer, as it identifies the consumer, but also the particular Pandaproxy instance if we're connecting to a Redpanda cluster.

Subscribe the consumer

Consumer groups listen on topics. To subscribe a consumer group to a list of topics, subscribe any of the consumers in the group to the topics.

We need to POST a list of topics to the /consumers/{consumer_group}/instances/{consumer}/subscription endpoint.

Subscribe my_consumer to topic my_topic:

curl -s -o /dev/null -w "%{http_code}" \ -X POST \ "http://localhost:8082/consumers/my_group/instances/my_consumer/subscription"\ -H "Content-Type: application/vnd.kafka.v2+json" \ -d '{ "topics": [ "my_topic" ] }'
res = url=f"{consumer_base_uri}/subscription", data=json.dumps({"topics": ["my_topic"]}), headers={"Content-Type": "application/vnd.kafka.v2+json"})

Consume messages

To retrieve messages, send a GET to the /consumers/{consumer_group}/instances/{consumer}/records endpoint.

We'll consume JSON encoded messages, so we have to specify the Accept header: Accept: application/vnd.kafka.json.v2+json.

curl -s \ "http://localhost:8082/consumers/my_group/instances/my_consumer/records?timeout=1000&max_bytes=100000"\ -H "Accept: application/vnd.kafka.json.v2+json" | jq .
res = requests.get( url=f"{consumer_base_uri}/records", params={"timeout":1000,"max_bytes":100000}, headers={"Accept": "application/vnd.kafka.json.v2+json"}).json() pretty(res)
[ { "topic": "my_topic", "key": null, "value": "Vectorized", "partition": 0, "offset": 0 }, { "topic": "my_topic", "key": null, "value": "Pandaproxy", "partition": 1, "offset": 0 }, { "topic": "my_topic", "key": null, "value": "JSON Demo", "partition": 2, "offset": 0 } ]

Get consumer offsets

To get the offsets of consumers in the group, we need to send a GET to the /consumers/{consumer_group}/instances/{consumer}/offsets endpoint with the topics and partitions:

curl -s \ -X 'GET' \ 'http://localhost:8082/consumers/my_group/instances/my_consumer/offsets' \ -H 'accept: application/vnd.kafka.v2+json' \ -H 'Content-Type: application/vnd.kafka.v2+json' \ -d '{ "partitions": [ { "topic": "my_topic", "partition": 0 }, { "topic": "my_topic", "partition": 1 }, { "topic": "my_topic", "partition": 2 } ] }' | jq .
res = requests.get( url=f"{consumer_base_uri}/offsets", data=json.dumps( dict(partitions=[ dict(topic="my_topic", partition=p) for p in [0, 1, 2] ])), headers={"Content-Type": "application/vnd.kafka.v2+json"}).json() pretty(res)
{ "offsets": [ { "topic": "my_topic", "partition": 0, "offset": -1, "metadata": "" }, { "topic": "my_topic", "partition": 1, "offset": -1, "metadata": "" }, { "topic": "my_topic", "partition": 2, "offset": -1, "metadata": "" } ] }

Commit offsets

Once messages have been handled by a consumer, the offsets can be committed so that the consumer group won't retrieve them again.

We need to POST offsets to the /consumers/{consumer_group}/instances/{consumer}/offsets endpoint:

curl -s -o /dev/null -w "%{http_code}" \ -X 'POST' \ 'http://localhost:8082/consumers/my_group/instances/my_consumer/offsets' \ -H 'accept: application/vnd.kafka.v2+json' \ -H 'Content-Type: application/vnd.kafka.v2+json' \ -d '{ "partitions": [ { "topic": "my_topic", "partition": 0, "offset": 0 }, { "topic": "my_topic", "partition": 1, "offset": 0 }, { "topic": "my_topic", "partition": 2, "offset": 0 } ] }'
res = url=f"{consumer_base_uri}/offsets", data=json.dumps( dict(partitions=[ dict(topic="my_topic", partition=p, offset=0) for p in [0, 1, 2] ])), headers={"Content-Type": "application/vnd.kafka.v2+json"})

Remove Consumer

To remove a consumer from a group, send DELETE to its base_uri:

curl -s -o /dev/null -w "%{http_code}" \ -X 'DELETE' \ 'http://localhost:8082/consumers/my_group/instances/my_consumer' \ -H 'Content-Type: application/vnd.kafka.v2+json'
res = requests.delete( url=f"{consumer_base_uri}", headers={"Content-Type": "application/vnd.kafka.v2+json"})

Now we can cleanup:

docker stop redpanda docker rm redpanda docker volume remove redpanda docker network remove redpanda

Pandaproxy Status

We'll be adding more endpoints and more encodings. For an up-to-date list of features and their status see the Pandaproxy features meta-issue on GitHub.

Pandaproxy is built on the same principles as Redpanda, but has not yet been optimised for performance. We are continuing to work on Pandaproxy, so make sure you join our Slack Community to get updates on the progress!

Join Our Newsletter

We’ll send you a helpful update every so often, but no spam.