Learn how to build a data pipeline for real-time fraud detection in a financial use case

ByArtem OppermannonJanuary 2, 2024
Detecting fraud in real time using Redpanda and Pinecone

In data engineering, vector search engines can help provide efficient data retrieval and processing. Unlike traditional text-based search engines, vector search operates on numerical vector representations, enabling similarity search in high-dimensional spaces and effectively handling multidimensional data.

Vector search engines are used in various fields, from natural language processing (NLP) to vehicle sensors. For instance, NLP uses them for semantic search and translation tasks by capturing nuances in text data. In image and video searches, vector search engines compare visual content by converting it into numerical representations and finding similarities. Vector search engines are also used in fraud detection to identify suspicious transactions. Even recommendation engines use vector search to compare user profiles and item profiles and provide personalized suggestions.

Pinecone is a specialized tool that provides seamless scalability and precise vector searches. Integrating Pinecone with modern streaming data platforms like Redpanda can supercharge data processing pipelines with advanced search capabilities.

Happily enough, this post demonstrates how to integrate Redpanda and Pinecone in a fraud detection use case that you can follow step by step. It’ll also highlight the advantages of this powerful duo for data engineers accustomed to working with traditional streaming data platforms, like Apache Kafka®. Let’s get started.

What is Pinecone?

Pinecone is a vector database and search-as-a-service platform that provides similarity search and personalization tasks (recommendation systems or targeted advertising) at scale. The platform enables users to index and efficiently search through high-dimensional vectors.

Pinecone provides a managed service that reduces the operational overhead usually associated with deploying, scaling, and maintaining vector search infrastructure. This makes Pinecone a compelling choice for data engineers and developers looking to implement vector search capabilities without having to delve into the basics of the underlying algorithms or hardware optimizations.

Pinecone can index visual content as vectors derived from convolutional neural networks (CNNs). Subsequently, this enables a similarity search amongst these vectors. This is especially useful in tasks like reverse image searching and video categorization based on visual content. In NLP, Pinecone enables semantic search, sentiment analysis, and translation services. When textual data is indexed as vectors (as can be done by models like GPT or BERT), Pinecone can compare and retrieve text based on semantic similarity rather than exact keyword matches.

Pinecone's vector search uses machine learning models to convert transactional data into high-dimensional vectors, enabling efficient fraud detection by identifying anomalies in real time. This way, each transaction is represented as a point in a multidimensional space. Pinecone can help identify potentially fraudulent transactions based on their dissimilarity or distance from normal transaction vectors. Through real-time comparison of new transaction vectors with existing ones, anomalous patterns indicative of fraud can be swiftly identified and flagged.

In recommendation engines, Pinecone provides personalized suggestions by comparing user and item profiles and finding similarities and preferences. Similarly, in vehicle sensor applications, Pinecone processes high-dimensional sensor data to detect anomalies and predict maintenance needs, supporting proactive maintenance strategies and operational efficiency.

Integrating Redpanda and Pinecone

Redpanda is a powerful streaming data platform that’s essentially a drop-in replacement for Kafka—just without the complexity or high costs. To show you how it works, in this tutorial, you'll use Redpanda and Pinecone to implement a solution that can instantly detect fraudulent transactions in a financial ecosystem.

First, you'll produce the data from the Credit Card Fraud Detection data set to a specific Redpanda topic, ensuring that the raw transactional data is streamed efficiently and is ready for further processing. Once the data resides in Redpanda, you'll then consume it and upload it into Pinecone's vector database. Lastly, you’ll perform rapid similarity comparisons of transactional data to identify fraudulent activity.

Here’s a visual representation of what you'll be building:

Architecture diagram of the solution

Architecture diagram of the solution

Prerequisites

To complete the tutorial, you'll need the following:

1. Download the data set

The Credit Card Fraud Detection data set is a collection of credit card transactions with features that have been anonymized and reduced using principal component analysis (PCA), making each transaction a high-dimensional vector. It includes 284,807 transactions, only 0.172% of which are fraudulent. This data set is particularly suited for use with vector databases, like Pinecone, which are designed to handle and analyze high-dimensional data efficiently.

Please download the data set from Kaggle and save it as creditcard.csv in your home directory.

Note: This tutorial works with a smaller subset of the original data set, which includes only 620 transactions. This reduced data set expedites the indexing process in the Pinecode vector database, as indexing the entire database can be quite time-consuming. In the following sections, this data set is named creditcard_reduced.csv.

2. Create an index in Pinecone

First, create an index in Pinecone with 28 dimensions, which corresponds to the number of dimensions in the used data set. As similarity metrics to compare the individual transaction vectors, you can select cosine, dot product, or Euclidean distance similarity. In this tutorial, you'll use cosine similarity.

3. Produce the data to a Redpanda topic

Next, you need to create a Redpanda topic called security_analytics_data. This can be accomplished by using the rpk command line tool, which provides an easy way to interact with your Redpanda cluster. Ensure that Redpanda is configured to listen on the correct address and port (eg localhost:9092).

To create the topic, execute the following code in the terminal:

docker exec -it redpanda-0 rpk topic create security-events

In this case, redpanda-0 is the name of the running Docker container.

After the topic has been created, you can produce the data set to that topic. But first, you must install the Confluent Kafka-Python library (this tutorial uses version 2.2.0), which will provide the necessary Consumer and Producer classes used in your code. You need to install the library in the Python virtual environment that you created beforehand. To install the library, open a terminal and execute the following command in your virtual environment:

pip install confluent-kafka

Next, create a Python file called produce.py in your home directory and paste in the following code:

import csv from confluent_kafka import Producer def delivery_report(err, msg): if err is not None: print(f'Message delivery failed: {err}') else: print(f'Message delivered to {msg.topic()} [{msg.partition()}]') def produce_data(file_path, topic_name): # Create a producer instance conf = { 'bootstrap.servers': 'localhost:19092', # Replace with your Redpanda server address 'queue.buffering.max.messages': 1000000, 'queue.buffering.max.kbytes': 500000 } producer = Producer(conf) # Open the CSV file with open(file_path, newline='', encoding='utf-8') as csvfile: csv_reader = csv.reader(csvfile) headers = next(csv_reader) # Skip header row for row in csv_reader: # Convert row to string and send to Redpanda # Exclude the first and last feature from each transaction message = ','.join(row[1:-1]) producer.produce(topic_name, value=message, callback=delivery_report) # Wait for any outstanding messages to be delivered and delivery report callbacks to be triggered producer.flush() if __name__ == '__main__': file_path = 'creditcard_reduced.csv' # Replace with the path to your CSV file topic_name = 'fraud-detection' produce_data(file_path, topic_name)

The code is designed to read data from a CSV file and produce it to a specified topic in Redpanda. At its core, the script uses the confluent_kafka library to interact with Redpanda. It defines a delivery_report function that provides feedback on whether a message was successfully delivered or if there was an error. The main function, produce_data, establishes a connection to the Redpanda server using the specified configurations.

Once the connection is successful, the script opens the designated CSV file, reads each row while skipping the header, and sends the data (excluding the first and last two data columns from each transaction) as a message to the Redpanda topic. The first feature is a timestamp, which is just metadata.

The last two columns represent the transaction value and the class label that are not required for the vector comparison. When executed, the script reads data from creditcard_reduced.csv and sends it to the fraud-detection topic in Redpanda.

To run the script, open a terminal in your home directory where the file produce.py is saved, and activate your virtual environment where you installed all the dependencies. Then, execute the following command:

python produce.py

To make sure that the data has been successfully produced to the Redpanda topic, execute the following command in the terminal to view the first data point that was produced to the topic:

docker exec -it redpanda-0 rpk topic consume fraud-detection -n 1 --brokers localhost:19092

If the data was produced successfully, you should see something like this:

…output omitted… { "topic": "fraud-detection", "value": "-1.3598071336738,-0.0727811733098497,2.53634673796914,1.37815522427443,-0.338320769942518,0.462387777762292,0.239598554061257,0.0986979012610507,0.363786969611213,0.0907941719789316,-0.551599533260813,-0.617800855762348,-0.991389847235408,-0.311169353699879,1.46817697209427,-0.470400525259478,0.207971241929242,0.0257905801985591,0.403992960255733,0.251412098239705,-0.018306777944153,0.277837575558899,-0.110473910188767,0.0669280749146731,0.128539358273528,-0.189114843888824,0.133558376740387,-0.0210530534538215", "timestamp": 1698163302205, "partition": 0, "offset": 0 } …output omitted…

4. Integrate Redpanda with Pinecone

In this section, you'll create a script to consume the credit card data from the fraud-detection Redpanda topic and insert the data into Pinecone. For this, create a file called consume_pinecone.py in your home directory and paste the following code into the file:

import csv from confluent_kafka import Consumer, KafkaError import pinecone # Initialize Pinecone pinecone.init(api_key="<YOUR PINECONE API KEY>", environment="<YOUR PINECONE ENVIRONMENT>") def create_consumer(): conf = { 'bootstrap.servers': 'localhost:19092', 'group.id': 'fraud-detection-consumer-group', 'auto.offset.reset': 'earliest', } consumer = Consumer(conf) return consumer def consume_and_index_data(consumer, topic_name): try: consumer.subscribe([topic_name]) while True: msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: print(f"{msg.topic()}:{msg.partition()} reached end at offset {msg.offset()}") else: print(f"Error: {msg.error()}") else: print(f"Received message on {msg.topic()} partition {msg.partition()} offset {msg.offset()}") # Assume each message is a CSV string vector = [float(feature) for feature in msg.value().decode('utf-8').split(',')] # Use the message offset as the vector id vector_id = str(msg.offset()) # Index the vector in Pinecone pinecone_index(vector_id, vector) except KeyboardInterrupt: pass finally: # Close down consumer to commit final offsets consumer.close() def pinecone_index(vector_id, vector): with pinecone.Index(index_name="fraud-detection") as index: index.upsert([ (vector_id, vector) ]) if __name__ == '__main__': topic_name = 'fraud-detection' consumer = create_consumer() consume_and_index_data(consumer, topic_name)

The code outlines the process of consuming data from the Redpanda topic and subsequently indexing it into Pinecone. Initially, the script imports required libraries, including Consumer for consuming messages and pinecone for interacting with the Pinecone vector database. The script starts by initializing the Pinecone environment with an API key and environment details.

The create_consumer function configures and returns a Kafka consumer with specified settings, such as the Kafka server address, consumer group ID, and the offset reset policy. This consumer is used to subscribe to a Redpanda topic and continuously poll for new messages.

In the consume_and_index_data function, the consumer subscribes to a given topic and enters a loop where it polls for new messages. Each message is processed to extract its content, which is assumed to be a CSV string. The message's content is then transformed into a vector of floats. This vector is upserted into a Pinecone index for the fraud-detection use case. Upserting means that if the vector ID already exists in the index, it will be updated; otherwise, it will be inserted as a new entry.

Finally, if this script is run as the main program, it sets the topic name for fraud-detection, creates a consumer, and begins the process of consuming data from the Kafka topic and indexing it into Pinecone.

To run the script, open a terminal in your home directory where the script is saved and activate your virtual environment for this terminal. Then, execute the following command:

python consume_pinecone.py

When executed, the script initializes a consumer, subscribes it to the fraud-detection topic in Redpanda, and processes incoming messages to index their corresponding vectors into Pinecone.

5. Ensure the vector data is indexed and searchable

To verify that the data has been correctly uploaded and indexed in Pinecone, you'll now create and execute a script that runs a few API calls to check the status and statistics of the index.

Create a new file named pinecone_script.sh in your home directory with the following code:

#!/bin/bash # Set up environment variables export PINECONE_ENVIRONMENT='<YOUR PINECONE ENVIRONMENT>' export PINECONE_API_KEY='YOUR PINECONE APIKEY' # Get the project ID PINECONE_PROJECT_ID=$(curl -s "https://controller.$PINECONE_ENVIRONMENT.pinecone.io/actions/whoami" -H "Api-Key: $PINECONE_API_KEY" | jq -r '.project_name') # Make a POST request to the Pinecone API curl -X POST "https://quickstart-$PINECONE_PROJECT_ID.svc.$PINECONE_ENVIRONMENT.pinecone.io/describe_index_stats" \ -H "Api-Key: $PINECONE_API_KEY"

The script is intended to fetch specific data from the Pinecone API. Initially, it sets up environment variables for the Pinecone environment and API key. These variables authenticate and construct the API calls throughout the script. After setting up the environment, it performs a curl request to Pinecone's whoamai API endpoint to determine the project ID associated with the given API key. The jq utility extracts the project ID to parse the response, which is stored in a variable for later use.

Subsequently, the script constructs a URL to Pinecone's describe_index_stats API endpoint using this project ID and the previously set environment variable. Another curl request is made to this URL to fetch statistics about the index associated with the project. The API key is included in the request header for authentication.

To run pinecone_script.sh, open a terminal in your home directory and execute the following command:

chmod +x pinecone_script.sh

This makes the script executable. To execute the script, use the following command in the same terminal:

./pinecone_script.sh

When executing the script, you should get the following output:

{ "namespaces": { "": { "vectorCount": 620 } }, "dimension": 28, "indexFullness": 0.0, "totalVectorCount": 620 }

This response from Pinecone indicates that you successfully indexed a total of 620 vectors that have 28 dimensions. Since the data seems to be complete, you can continue with the next step of identifying fraudulent transactions.

6. Identify fraudulent transactions

To identify fraudulent transactions, you'll compare individual transaction vectors directly. You'll begin by selecting a transaction vector that's known to be fraudulent. To do this, consult creditcard_reduced.csv. Within this data set, a vector labeled as 1 indicates a fraudulent transaction. As a point of reference, you might consider a fraudulent transaction like the one detailed below:

…output omitted… [0.00843036489558254,4.13783683497998,-6.24069657194744,6.6757321631344,0.768307024571449,-3.35305954788994,-1.63173467271809,0.15461244822474,-2.79589246446281,-6.18789062970647,5.66439470857116,-9.85448482287037,-0.306166658250084,-10.6911962118171,-0.638498192673322,-2.04197379107768,-1.12905587703585,0.116452521226364,-1.93466573889727,0.488378221134715,0.36451420978479,-0.608057133838703,-0.539527941820093,0.128939982991813,1.48848121006868,0.50796267782385,0.735821636119662,0.513573740679437] …output omitted…

Next, you'll create a script that probes the Pinecone database to find the three vectors most akin to the 28-dimensional fraudulent transaction vector you've chosen. This similarity is determined using the cosine similarity metric.

Create a new file named fraud_detection_script.sh in your home directory and add the following code to the script:

#!/bin/bash export PINECONE_ENVIRONMENT='your-environment' export PINECONE_API_KEY='your-api-key' # Get the project ID PINECONE_PROJECT_ID=$( curl "https://controller.$PINECONE_ENVIRONMENT.pinecone.io/actions/whoami" \ -H "Api-Key: $PINECONE_API_KEY" | jq -r '.project_name' ) # Make a POST request to the Pinecone API curl -X POST "https://quickstart-$PINECONE_PROJECT_ID.svc.$PINECONE_ENVIRONMENT.pinecone.io/query" \ -H "Api-Key: $PINECONE_API_KEY" \ -H 'Content-Type: application/json' \ -d '{ "vector": [0.00843036489558254,4.13783683497998,... vector entries omitted…, ,0.735821636119662,0.513573740679437] "topK": 3, "includeValues": true }'

The script queries the Pinecone database for vectors similar to a specified 28-dimensional fraudulent transaction vector. It sends a POST request to Pinecone's query endpoint. The script in this request specifies the vector in question, asks for the top three most similar vectors ("topK": 3), and requests that the actual values of these vectors be included in the response.

To run fraud_detection_script.sh, open a terminal in your home directory and execute the following command:

chmod +x fraud_detection_script.sh

This makes the script executable. To execute the script, use the following command in the same terminal:

./fraud_detection_script.sh

When executing the script, you should get the following output:

…output omitted… { "matches":[ { "id": "518", "score": 0.9914, "values": [0.026779227, 4.13246393, -6.5606, 6.34855652, 1.32966566, -2.51347876, -1.68910217, 0.303252786, -3.13940907, -6.04546785, 6.75462532, -8.94817829, 0.702725, -10.7338543, -1.37951982, -1.63896012, -1.74635017, 0.776744127, -1.32735658, 0.587743223, 0.370508641, -0.576752484, -0.669605374, -0.759907544, 1.60505557, 0.540675402, 0.737040401, 0.496699095] }, { "id": "520", "score": 0.9837, "values": [0.329594344, 3.71288919, -5.77593517, 6.07826567, 1.66735899, -2.4201684, -0.812891245, 0.133080125, -2.21431136, -5.13445425, 4.56072, -8.87374878, -0.797483623, -9.17716599, -0.257024765, -0.871688485, 1.31301367, 0.77391386, -2.37059951, 0.269772768, 0.156617165, -0.652450442, -0.551572204, -0.716521621, 1.41571665, 0.555264711, 0.530507386, 0.40447405] }, { "id": "522", "score": 0.9794, "values": [0.316459, 3.80907583, -5.61515903, 6.0474453, 1.55402601, -2.65135312, -0.746579289, 0.055586312, -2.67867851, -4.95949268, 6.43905354, -7.52011728, 0.386351675, -9.25230694, -1.36518836, -0.502362192, 0.78442657, 1.49430466, -1.80801213, 0.388307422, 0.208828375, -0.511746645, -0.58381325, -0.219845027, 1.47475255, 0.491191924, 0.518868268, 0.402528077] } ], "namespace": "" } …output omitted…

The output displays the top three vectors from the database that closely match your reference vector, which symbolizes a fraudulent transaction. Considering their similarity to a known fraudulent vector, it's reasonable to suspect that the transactions these vectors represent might also be fraudulent.

When you cross-reference these vectors with their labels in the original data set, you'll find they're indeed marked with the label 1, confirming the suspicion that they represent fraudulent transactions. And there you have it!

Conclusion

This post highlighted how Redpanda's efficient data streaming capabilities can be combined with Pinecone's specialized vector search capabilities to create a powerful pipeline—particularly for real-time fraud detection.

The credit card fraud detection data set example highlighted a practical use case demonstrating the potential of this integration in a real-world scenario. For those eager to dive deeper and experiment further, all the code and steps outlined in this guide are in this GitHub repository.

To keep exploring Redpanda, check the documentation and browse the Redpanda blog for more tutorials. If you have questions or feedback, tell us in the Redpanda Community on Slack.

Let's keep in touch

Subscribe and never miss another blog post, announcement, or community event. We hate spam and will never sell your contact information.