Schema registry provides tools for describing your events.

ByBen PopeonSeptember 7, 2021
Schema Registry: The event is the API

Heads up: there's a newer version of this post. Read it here!

Introduction

Highly scalable, loosely coupled architectures often use an asynchronous event-driven design. In such systems, the contract between the producer and the consumer is the event - the event is the API.

It's important to document the API, and it's important to be able to evolve the API. This is often done using schema, such as Apache Avro, JSON Schema, or Protobuf.

We're pleased to announce the beta release of the schema registry subsystem of Redpanda that provides an interface for managing schema.

Built into the Redpanda binary, schemas are stored on the same raft-based storage engine and the RESTful interface is available on every broker. You get the same high availability as your data so there's nothing new to deploy, and it's available today.

To take advantage of the schema registry in an existing Redpanda installation, make sure you update to the latest version. Otherwise, follow the instructions in the Linux, MacOS, Kubernetes, or Docker quick start guides to spin up a new Redpanda instance.

If you want to leave the infrastructure issues to us, sign up for Redpanda Cloud for the simplest way to run Redpanda.

To get down to business, skip ahead to the example.

Overview

A loosely coupled architecture not only reduces dependencies in the code, it also reduces communication overhead between and within teams. By defining the API, or in this case the event, with a schema, disparate teams can start work on the subsystems that produce and consume those events with minimal communication overhead.

Operational complexity

At Redpanda, we like to make things simple. Redpanda is an Apache Kafka®-compatible event streaming platform that eliminates Zookeeper® and the JVM, autotunes itself for modern hardware, and ships in a single binary.

We've built the schema registry directly into Redpanda; there are no new binaries to install, no new services to deploy and maintain, and the default configuration just works.

Schemas are stored in a standard compacted topic, we use optimistic concurrency control at the topic level to allow mutating REST calls to any broker. There's no need to configure leadership or failover strategies, every broker is symmetric.

Schema

A schema can be used as human readable documentation for an API, to verify data conforms to that API, to generate serialisers for the data, and to evolve the API with predefined levels of compatibility, allowing new versions of services to be rolled out independently.

Some data encodings are somewhat self-describing, but that can make them verbose. Some encodings are extensible. JSON for example, has a property name and a property value. The name isn't part of the information, but it allows new fields to be easily added by the producer and ignored by the consumer.

A schema is an external mechanism to describe the data and its encoding, allowing a reduction in the amount of data transmitted, while keeping the same information. It also allows defaults for new fields, which means that it's possible to decouple the rollout of producers and consumers.

Example

Start Redpanda

Let's jump right in and start Redpanda using Docker on Linux:

docker network create redpanda-sr
docker volume create redpanda-sr
docker run \
  --pull=always \
  --name=redpanda-sr \
  --net=redpanda-sr \
  -v "redpanda-sr:/var/lib/redpanda/data" \
  -p 8081:8081 \
  -p 8082:8082 \
  -p 9092:9092 \
  --detach \
  docker.vectorized.io/vectorized/redpanda start \
  --overprovisioned \
  --smp 1 \
  --memory 1G \
  --reserve-memory 0M \
  --node-id 0 \
  --check=false \
  --pandaproxy-addr 0.0.0.0:8082 \
  --advertise-pandaproxy-addr 127.0.0.1:8082 \
  --kafka-addr 0.0.0.0:9092 \
  --advertise-kafka-addr redpanda-sr:9092

Now we're ready to start using the schema registry!

Endpoints are documented with Swagger at http://localhost:8081/v1 or on SwaggerHub

I'm using jq to prettify and process the JSON responses.

We'll use the popular requests module (pip install requests).

For the rest of the guide, we'll assume the following for an interactive python session:

import requests
import json
def pretty(text):
  print(json.dumps(text, indent=2))

base_uri = "http://localhost:8081"

Schemas

The currently supported schema type is AVRO, we plan to support JSON and PROTOBUF.

You can query the schema registry for that:

curl -s \
  "http://localhost:8081/schemas/types" \
  | jq .
res = requests.get(f'{base_uri}/schemas/types').json()
pretty(res)
[
  "AVRO"
]

Publish a schema

Schemas are registered against a subject, typically in the form {topic}-key or {topic}-value.

Let's register an example Avro schema which represents a measurement from a sensor for the value of the sensor topic.

{
  "type": "record",
  "name": "sensor_sample",
  "fields": [
    {
      "name": "timestamp",
      "type": "long",
      "logicalType": "timestamp-millis"
    },
    {
      "name": "identifier",
      "type": "string",
      "logicalType": "uuid"
    },
    {
      "name": "value",
      "type": "long"
    }
  ]
}

We need to POST the AVRO schema to /subjects/sensor-value/versions endpoint with the Content-Type of application/vnd.schemaregistry.v1+json:

curl -s \
  -X POST \
  "http://localhost:8081/subjects/sensor-value/versions" \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "{\"type\":\"record\",\"name\":\"sensor_sample\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},{\"name\":\"identifier\",\"type\":\"string\",\"logicalType\":\"uuid\"},{\"name\":\"value\",\"type\":\"long\"}]}"}' \
  | jq
sensor_schema = {
  "type": "record",
  "name": "sensor_sample",
  "fields": [
    {
      "name": "timestamp",
      "type": "long",
      "logicalType": "timestamp-millis"
    },
    {
      "name": "identifier",
      "type": "string",
      "logicalType": "uuid"
    },
    {
      "name": "value",
      "type": "long"
    }
  ]
}

res = requests.post(
    url=f'{base_uri}/subjects/sensor-value/versions',
    data=json.dumps({
      'schema': json.dumps(sensor_schema)
    }),
    headers={'Content-Type': 'application/vnd.schemaregistry.v1+json'}).json()
pretty(res)
{
  "id": 1
}

The id is unique for the schema in the Redpanda cluster.

Retrieve the schema by its ID

We can retrieve the schema directly using its ID:

curl -s \
  "http://localhost:8081/schemas/ids/1" \
  | jq .
res = requests.get(f'{base_uri}/schemas/ids/1').json()
pretty(res)
{
  "schema": "{\"type\":\"record\",\"name\":\"sensor_sample\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},{\"name\":\"identifier\",\"type\":\"string\",\"logicalType\":\"uuid\"},{\"name\":\"value\",\"type\":\"long\"}]}"
}

List the subjects

Now that a schema is associated with a subject, let's list the subjects:

curl -s \
  "http://localhost:8081/subjects" \
  | jq .
res = requests.get(f'{base_uri}/subjects').json()
pretty(res)
[
  "sensor-value"
]

Cool! We knew that, but now anyone can discover them.

Retrieve the schema versions for the subject

Schemas associated with subjects are versioned. That's how your API can evolve.

Let's query the versions for the sensor-value subject:

curl -s \
  "http://localhost:8081/subjects/sensor-value/versions" \
  | jq .
res = requests.get(f'{base_uri}/subjects/sensor-value/versions').json()
pretty(res)
[
  1
]

Retrieve a schema for the subject

If we know the subject and the version we want, we can query directly:

curl -s \
  "http://localhost:8081/subjects/sensor-value/versions/1" \
  | jq .
res = requests.get(f'{base_uri}/subjects/sensor-value/versions/1').json()
pretty(res)
{
  "subject": "sensor-value",
  "id": 1,
  "version": 1,
  "schema": "{\"type\":\"record\",\"name\":\"sensor_sample\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},{\"name\":\"identifier\",\"type\":\"string\",\"logicalType\":\"uuid\"},{\"name\":\"value\",\"type\":\"long\"}]}"
}

Instead of a specific version, we can ask for the latest:

curl -s \
  "http://localhost:8081/subjects/sensor-value/versions/latest" \
  | jq .
res = requests.get(f'{base_uri}/subjects/sensor-value/versions/latest').json()
pretty(res)
{
  "subject": "sensor-value",
  "id": 1,
  "version": 1,
  "schema": "{\"type\":\"record\",\"name\":\"sensor_sample\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},{\"name\":\"identifier\",\"type\":\"string\",\"logicalType\":\"uuid\"},{\"name\":\"value\",\"type\":\"long\"}]}"
}

It's also possible to query for just the schema by appending /schema to the query path. That unwraps the escaped schema:

curl -s \
  "http://localhost:8081/subjects/sensor-value/versions/latest/schema" \
  | jq .
res = requests.get(f'{base_uri}/subjects/sensor-value/versions/latest/schema').json()
pretty(res)
{
  "type": "record",
  "name": "sensor_sample",
  "fields": [
    {
      "name": "timestamp",
      "type": "long",
      "logicalType": "timestamp-millis"
    },
    {
      "name": "identifier",
      "type": "string",
      "logicalType": "uuid"
    },
    {
      "name": "value",
      "type": "long"
    }
  ]
}

Compatibility

There are several types of compatibility:

  • BACKWARDS- Allows consumers of the new version to read the previous version

  • FORWARDS- Allows consumers of the previous version to read the new version

  • FULL- Forwards and backwards compatibility is ensured.

Each of these will check against the most recent version. To check against all registered versions for a subject, they can have _TRANSITIVE appended.

  • NONE- No compatibility is required.

The default global compatibility is backwards.

Compatibility can be set explicitly for a subject:

curl -s \
  -X PUT \
  "http://localhost:8081/config/sensor-value" \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "BACKWARD"}' \
  | jq .
res = requests.put(
    url=f'{base_uri}/config/sensor-value',
    data=json.dumps(
        {'compatibility': 'BACKWARD'}
      ),
    headers={'Content-Type': 'application/vnd.schemaregistry.v1+json'}).json()
pretty(res)
{
  "compatibility": "BACKWARD"
}

Evolving a schema

Posting a backwards incompatible change to a subject will fail.

For example, changing the type of the value field from long to int:

{
  "type": "record",
  "name": "sensor_sample",
  "fields": [
    {
      "name": "timestamp",
      "type": "long",
      "logicalType": "timestamp-millis"
    },
    {
      "name": "identifier",
      "type": "string",
      "logicalType": "uuid"
    },
    {
      "name": "value",
      "type": "int"
    }
  ]
}
curl -s \
  -X POST \
  "http://localhost:8081/subjects/sensor-value/versions" \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "{\"type\":\"record\",\"name\":\"sensor_sample\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},{\"name\":\"identifier\",\"type\":\"string\",\"logicalType\":\"uuid\"},{\"name\":\"value\",\"type\":\"int\"}]}"}' \
  | jq
sensor_schema["fields"][2]["type"] = "int"

res = requests.post(
    url=f'{base_uri}/subjects/sensor-value/versions',
    data=json.dumps({
      'schema': json.dumps(sensor_schema)
    }),
    headers={'Content-Type': 'application/vnd.schemaregistry.v1+json'}).json()
pretty(res)
{
  "error_code": 409,
  "message": "Schema being registered is incompatible with an earlier schema for subject \"{sensor-value}\""
}

A backwards compatible change would be changing it from a long to a double:

{
  "type": "record",
  "name": "sensor_sample",
  "fields": [
    {
      "name": "timestamp",
      "type": "long",
      "logicalType": "timestamp-millis"
    },
    {
      "name": "identifier",
      "type": "string",
      "logicalType": "uuid"
    },
    {
      "name": "value",
      "type": "double"
    }
  ]
}
curl -s \
  -X POST \
  "http://localhost:8081/subjects/sensor-value/versions" \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "{\"type\":\"record\",\"name\":\"sensor_sample\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},{\"name\":\"identifier\",\"type\":\"string\",\"logicalType\":\"uuid\"},{\"name\":\"value\",\"type\":\"double\"}]}"}' \
  | jq
sensor_schema["fields"][2]["type"] = "double"

res = requests.post(
    url=f'{base_uri}/subjects/sensor-value/versions',
    data=json.dumps({
      'schema': json.dumps(sensor_schema)
    }),
    headers={'Content-Type': 'application/vnd.schemaregistry.v1+json'}).json()
pretty(res)
{
  "id": 2
}

Cleanup

Now we can cleanup:

docker stop redpanda-sr
docker rm redpanda-sr
docker volume remove redpanda-sr
docker network remove redpanda-sr

Conclusion

We'll be adding more endpoints and more encodings. For an up-to-date list of features and their status see the schema registry features meta-issue on GitHub.

The schema registry is built on the same principles as Redpanda, but has not yet been optimized for performance. We are continuing to work on the schema registry, so make sure you join our slack community to get updates on the progress!

Let's keep in touch

Subscribe and never miss another blog post, announcement, or community event. We hate spam and will never sell your contact information.