New AI connectors and GPU runtime support for AI models

Build powerful, flexible AI pipelines with Redpanda Connect

By
on
September 12, 2024

Redpanda Connect now has a suite of GenAI processors to support building data pipelines using the latest and most advanced Large Language Models (LLM) available. Whether you analyze unstructured data or compute embeddings for your vector search as part of your retrieval augmented generation (RAG) application, Redpanda Connect is a powerful and flexible solution for building AI data pipelines.

We're building where our customers are today by integrating with the most popular AI providers including OpenAI, AWS Bedrock, and Google Vertex AI, but also building for where many customers want to be: in their private environment where they can maintain full sovereignty over their data and models.

In this post, we introduce our new AI connectors and showcase a few demos to illustrate how they simplify building cutting-edge AI apps.

Build bigger and better with AI using Redpanda Connect

In keeping with our mission to give every organization full ownership over their data, we aren’t just integrating with major AI providers, but also providing connectors to locally run an Ollama instance on private GPUs leveraging the latest open-source LLM and embedding models.

ProviderProcessorsAvailability
ServerlessCloudSelf-managed
AWS Bedrockaws_bedrock_chat
Google Cloud Vertex AIgcp_vertex_ai_chat
OpenAI / Azure OpenAIopenai_chat_completion
openai_embeddings
openai_image_generation
openai_speech
openai_transcription
openai_translation
Ollamaollama_chat
ollama_embeddings
 

To give you a better understanding of how you can use these AI connectors, here are three scenarios made simpler with Redpanda Connect.

Connect to popular vector databases

In addition to AI processors, we also support Pinecone, Qdrant, and any PGVector-compatible Postgres database such as Neon, Enterprise DB, and Google's AlloyDB. With Redpanda Connect's ability to ingest data from a plethora of data sources, it's the ideal way to quickly stand up production-ready vector indexing pipelines that handle retries, at-least-once delivery, and out-of-the-box observability.

Here's an example Connect configuration that reads some data from Redpanda Cloud and writes to a local PGVector-enabled Postgres instance.

input:
  kafka:
    addresses: ["${REDPANDA_CLUSTER}"]
    topics: ["posts"]
    consumer_group: "rp-connect-consumer-group"
    tls:
      enabled: true
    sasl:
      mechanism: SCRAM-SHA-256
      user: ${RP_USER}
      password: ${RP_PASSWORD}
pipeline:
  processors:
  - branch:
      processors:
      - ollama_embeddings:
          model: nomic-embed-text
          text: "search_document: ${!this.text}"
      result_map: "root.embeddings = this"
output:
  sql_insert:
    driver: "postgres"
    dsn: postgres://${PG_USER}:${PG_PASS}@${PG_HOST}/${PG_DB}
    init_statement: |
      CREATE EXTENSION IF NOT EXISTS vector;
      CREATE TABLE IF NOT EXISTS post_embeddings (
        post_id bigint,
        embeddings vector(768)
      );
      CREATE INDEX IF NOT EXISTS hnsw_index ON post_embeddings USING hnsw (embeddings vector_l2_ops);
    table: post_embeddings
    columns: ["post_id", "embeddings"]
    args_mapping: "[this.id, this.embeddings.vector()]"

Writing to vector databases is just as easy! Here's a quick video showing how to creating an embeddings pipeline from Redpanda to Quadrant.

Strongly Typed data pipelines

Redpanda Connect also integrates with the broader data streaming ecosystem by supporting Schema Registry. Redpanda Connect also ensures that LLM responses adhere to your schema for the LLM runtimes that support structured outputs. This allows for robust data pipelines so all your downstream consumers can handle the evolution of your schemas as your business evolves.

input:
  kafka:
	addresses: ["localhost:9092"]
	topics: ["emails:0"]
	consumer_group: ""
pipeline:
  processors:
  - schema_registry_decode:
      url: "http://localhost:8081"
  - branch:
      processors:
      	- openai_chat_completion:
          api_key: "${OPENAI_API_KEY}"
          model: gpt-4o-mini
          prompt: |
            	Your task is to extract senders and classify emails into one of the following categories:

            	Primary: Emails from people you know and messages that don’t appear in other tabs.
            	Social: Messages from social networks and media-sharing sites.
            	Promotions: Deals, offers, and other promotional emails.
            	Updates: Automated confirmations, notifications, statements, and reminders that may not need immediate attention.
            	Forums: Messages from online groups, discussion boards, and mailing lists.

            	Respond in JSON only in the format of {"categories": ["Promotions"], "sender": "bob@example.com"}

            	Here is the email:
            	${!this.email}
          response_format: json_schema
          schema_registry:
            url: "http://localhost:8081"
            subject: "categorized_emails_enrichment"
    	result_map: |
      	  root = root.merge(this)
	- schema_registry_encode:
    	    url: "http://localhost:8081"
    	    subject: categorized_emails-value
output:
  kafka:
    addresses: ["localhost:9092"]
    topic: "categorized_emails"

To see the magic happen, here’s a quick demo.

Redact PII with Sovereign AI

Here’s an example of redacting personally identifiable information (PII) from unstructured textual data stored in S3 and writing it to the local filesystem using ollama_chat with the Llama 3.1 model locally. Experimenting with different prompts and models is ridiculously easy, thanks to Redpanda Connect's YAML-based approach.

input:
  aws_s3:
    bucket: "my-bucket"
    scanner:
  	to_the_end: {}
pipeline:
  processors:
  - ollama_chat:
      model: llama3.1:70b
      prompt: |
        Your task is to mask out all PII within the input and return a JSON 
        object of the output that looks like: {"masked": "..."}
        The input is as follows: ${!content().string()}
output:
  path: redacted_output.jsonl
  codec: lines

Here’s the full demo.

Try our AI connectors with Redpanda Connect

Using GenAI in data streaming has never been easier with Redpanda Cloud. In addition to AI model services, we’re launching support for running GPU-powered ollama_chat and ollama_embeddings processors in Redpanda Cloud. This is currently available in Google Cloud Platform for our BYOC offering, with availability in other clouds coming soon.

In the meantime, try our new AI Connectors on Redpanda Connect with Redpanda Cloud!

Graphic for downloading streaming data report
Redpanda Connect for Cloud
Christina Lin
&
&
&
September 12, 2024
Text Link
Cloud Topics: Efficiently stream data through object storage
Noah Watkins
&
Matt Schumpert
&
&
September 12, 2024
Text Link
Redpanda One: a beginning
Alexander Gallego
&
&
&
September 12, 2024
Text Link