New AI connectors and GPU runtime support for AI models
Build powerful, flexible AI pipelines with Redpanda Connect
Redpanda Connect now has a suite of GenAI processors to support building data pipelines using the latest and most advanced Large Language Models (LLM) available. Whether you analyze unstructured data or compute embeddings for your vector search as part of your retrieval augmented generation (RAG) application, Redpanda Connect is a powerful and flexible solution for building AI data pipelines.
We're building where our customers are today by integrating with the most popular AI providers including OpenAI, AWS Bedrock, and Google Vertex AI, but also building for where many customers want to be: in their private environment where they can maintain full sovereignty over their data and models.
In this post, we introduce our new AI connectors and showcase a few demos to illustrate how they simplify building cutting-edge AI apps.
Build bigger and better with AI using Redpanda Connect
In keeping with our mission to give every organization full ownership over their data, we aren’t just integrating with major AI providers, but also providing connectors to locally run an Ollama instance on private GPUs leveraging the latest open-source LLM and embedding models.
To give you a better understanding of how you can use these AI connectors, here are three scenarios made simpler with Redpanda Connect.
Connect to popular vector databases
In addition to AI processors, we also support Pinecone, Qdrant, and any PGVector-compatible Postgres database such as Neon, Enterprise DB, and Google's AlloyDB. With Redpanda Connect's ability to ingest data from a plethora of data sources, it's the ideal way to quickly stand up production-ready vector indexing pipelines that handle retries, at-least-once delivery, and out-of-the-box observability.
Here's an example Connect configuration that reads some data from Redpanda Cloud and writes to a local PGVector-enabled Postgres instance.
input:
kafka:
addresses: ["${REDPANDA_CLUSTER}"]
topics: ["posts"]
consumer_group: "rp-connect-consumer-group"
tls:
enabled: true
sasl:
mechanism: SCRAM-SHA-256
user: ${RP_USER}
password: ${RP_PASSWORD}
pipeline:
processors:
- branch:
processors:
- ollama_embeddings:
model: nomic-embed-text
text: "search_document: ${!this.text}"
result_map: "root.embeddings = this"
output:
sql_insert:
driver: "postgres"
dsn: postgres://${PG_USER}:${PG_PASS}@${PG_HOST}/${PG_DB}
init_statement: |
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE IF NOT EXISTS post_embeddings (
post_id bigint,
embeddings vector(768)
);
CREATE INDEX IF NOT EXISTS hnsw_index ON post_embeddings USING hnsw (embeddings vector_l2_ops);
table: post_embeddings
columns: ["post_id", "embeddings"]
args_mapping: "[this.id, this.embeddings.vector()]"
Writing to vector databases is just as easy! Here's a quick video showing how to creating an embeddings pipeline from Redpanda to Quadrant.
Strongly Typed data pipelines
Redpanda Connect also integrates with the broader data streaming ecosystem by supporting Schema Registry. Redpanda Connect also ensures that LLM responses adhere to your schema for the LLM runtimes that support structured outputs. This allows for robust data pipelines so all your downstream consumers can handle the evolution of your schemas as your business evolves.
input:
kafka:
addresses: ["localhost:9092"]
topics: ["emails:0"]
consumer_group: ""
pipeline:
processors:
- schema_registry_decode:
url: "http://localhost:8081"
- branch:
processors:
- openai_chat_completion:
api_key: "${OPENAI_API_KEY}"
model: gpt-4o-mini
prompt: |
Your task is to extract senders and classify emails into one of the following categories:
Primary: Emails from people you know and messages that don’t appear in other tabs.
Social: Messages from social networks and media-sharing sites.
Promotions: Deals, offers, and other promotional emails.
Updates: Automated confirmations, notifications, statements, and reminders that may not need immediate attention.
Forums: Messages from online groups, discussion boards, and mailing lists.
Respond in JSON only in the format of {"categories": ["Promotions"], "sender": "bob@example.com"}
Here is the email:
${!this.email}
response_format: json_schema
schema_registry:
url: "http://localhost:8081"
subject: "categorized_emails_enrichment"
result_map: |
root = root.merge(this)
- schema_registry_encode:
url: "http://localhost:8081"
subject: categorized_emails-value
output:
kafka:
addresses: ["localhost:9092"]
topic: "categorized_emails"
To see the magic happen, here’s a quick demo.
Redact PII with Sovereign AI
Here’s an example of redacting personally identifiable information (PII) from unstructured textual data stored in S3 and writing it to the local filesystem using ollama_chat
with the Llama 3.1 model locally. Experimenting with different prompts and models is ridiculously easy, thanks to Redpanda Connect's YAML-based approach.
input:
aws_s3:
bucket: "my-bucket"
scanner:
to_the_end: {}
pipeline:
processors:
- ollama_chat:
model: llama3.1:70b
prompt: |
Your task is to mask out all PII within the input and return a JSON
object of the output that looks like: {"masked": "..."}
The input is as follows: ${!content().string()}
output:
path: redacted_output.jsonl
codec: lines
Here’s the full demo.
Try our AI connectors with Redpanda Connect
Using GenAI in data streaming has never been easier with Redpanda Cloud. In addition to AI model services, we’re launching support for running GPU-powered ollama_chat
and ollama_embeddings
processors in Redpanda Cloud. This is currently available in Google Cloud Platform for our BYOC offering, with availability in other clouds coming soon.
In the meantime, try our new AI Connectors on Redpanda Connect with Redpanda Cloud!
Related articles
VIEW ALL POSTSLet's keep in touch
Subscribe and never miss another blog post, announcement, or community event. We hate spam and will never sell your contact information.