Real-time call detail record (CDR) ingestion with Redpanda and Spark

Learn how to swiftly ingest and process insights on customer feedback in real time

By
on
June 13, 2024

Call detail records (CDRs) are rich sources of information in the telecommunications sector. They contain various data points related to a call, such as caller ID, call date, duration, receiver ID, and other pertinent details. Examining CDRs can yield valuable insights for performance assessment, cost efficiency, gathering customer feedback, analyzing the popularity of products/services, and enhancing security measures.

In this tutorial, you'll learn how to ingest real-time CDR data with Redpanda and Apache Spark. You'll also learn how to perform query operations (like aggregation and grouping) on real-time streaming data.

What are CDRs and why do they matter in telecom?

A CDR is a data entry generated by an exchange via telephone or similar equipment. It provides a comprehensive overview of a phone call or any other telecommunications activity, such as text messages, that goes through the respective facility or device.

A CDR includes call-related details such as the call time, status, phone numbers involved in the call (both sending and receiving), and more. It serves as the digital equivalent of the traditional paper toll tickets that operators once used to manually record the duration of long-distance calls within a standard telephone exchange.

Analyzing CDRs can provide significant insights in different use cases:

  • Performance analysis: By analyzing the information in CDRs, telecom companies can identify patterns and trends, such as peak usage times, most dialed numbers, and call drop rates. This information can help them optimize their network performance and improve service quality.
  • Cost optimization: CDRs can provide insights into usage patterns, which can help telecom companies optimize their pricing structures. For example, if the insights reveal that most calls occur during certain hours, a telecom company could introduce discounted rates during off-peak hours to encourage usage during those times.
  • Customer feedback: Analysis of CDRs can reveal problems in service usage. For example, if a particular service or number has a high call drop rate, it might indicate a problem with that service or a network issue in the area where the number is located.
  • Popularity analysis: Telecom companies can identify the most popular services or products by analyzing the usage data in CDRs. This can help guide their marketing efforts or develop new offerings based on customer preferences.
  • Security: Unusual patterns in CDRs can indicate fraudulent activities. For example, a sudden increase in international calls from a particular number in a marked zone might indicate that the number is being used for suspicious activities. By monitoring CDRs for such patterns, telecom companies can detect and prevent fraudulent activities, thus enhancing security.

Ingesting real-time CDR data with Redpanda and Spark

In this tutorial, you'll learn how to ingest real-time CDR data with Redpanda and Spark, using the example of a telecom company that wants to analyze customer feedback to improve its services. The company collects CDRs that include a feedback_score for each call, which is a number from 1 to 5 that indicates the customer's satisfaction with the call. The higher the score, the more satisfied the customer was with the call.

The company wants to analyze these feedback scores to understand how customer satisfaction varies by day and by customer. Here are some sample CDR records in JSON format:

[
  {"customer_id": "C001", "call_date": "2024-02-01", "call_duration": 5, "feedback_score": 3},
  {"customer_id": "C002", "call_date": "2024-02-01", "call_duration": 10, "feedback_score": 4},
  {"customer_id": "C001", "call_date": "2024-02-02", "call_duration": 7, "feedback_score": 5},
  {"customer_id": "C003", "call_date": "2024-02-02", "call_duration": 9, "feedback_score": 3},
  {"customer_id": "C002", "call_date": "2024-02-03", "call_duration": 6, "feedback_score": 4},
  {"customer_id": "C001", "call_date": "2024-02-03", "call_duration": 8, "feedback_score": 5},
  {"customer_id": "C003", "call_date": "2024-02-03", "call_duration": 10, "feedback_score": 4},
  {"customer_id": "C001", "call_date": "2024-02-04", "call_duration": 6, "feedback_score": 3},
  {"customer_id": "C002", "call_date": "2024-02-04", "call_duration": 8, "feedback_score": 5},
  {"customer_id": "C003", "call_date": "2024-02-04", "call_duration": 7, "feedback_score": 4}
]

In this scenario, the telecom company can produce the CDRs in a streaming fashion to a Redpanda topic (cdr_topic in this example). It can then use Spark to consume these records from the Redpanda topic, perform a group by operation on customer_id and call_date, and calculate the average feedback_score for each group. This will provide a daily feedback score for each customer, which the company can analyze to understand how customer satisfaction varies by day and by customer.

Architecture diagram

Architecture diagram

Prerequisites

To complete the tutorial, you'll need the following:

For this tutorial, use the YAML content provided in the "Single Broker" tab on this official page to set up Redpanda. Create a project directory, place your docker-compose.yaml file there, and start Redpanda.

1. Check your Redpanda container status

Once you start the Redpanda container in your environment, you should see the following result if you execute the command docker ps in a terminal:

CONTAINER ID   IMAGE                                                COMMAND                   CREATED          STATUS          PORTS
                                                          NAMES
afed5810f662   docker.redpanda.com/redpandadata/console:v2.4.3     "/bin/sh -c 'echo \"$…"   14 seconds ago   Up 10 seconds   0.0.0.0:8080->8080/tcp
                                                         redpanda-console
1d8c8dff5d60   docker.redpanda.com/redpandadata/redpanda:v23.3.5   "/entrypoint.sh redp…"    14 seconds ago   Up 11 seconds   8081-8082/tcp, 0.0.0.0:18081-18082->18081-18082/tcp, 9092/tcp, 0.0.0.0:19092->19092/tcp, 0.0.0.0:19644->9644/tcp   redpanda-0

By default, the provided docker-compose.yaml file manages two containers: one containing the Redpanda broker and another containing Redpanda Console and a UI application to interact with and view the Redpanda instance and broker.

2. Create a Redpanda topic

You'll need to create a topic named cdr_topic in your Redpanda cluster to receive the events from a demo producer application you'll create shortly.

To do that, open a browser and access the Redpanda Console application at http://localhost:8080/. You should see the interface pictured in the screenshot below:

Redpanda Console home page

Redpanda Console home page

This interface provides an overview of the Redpanda cluster, including its health status. To create the topics in the cluster, click the Topics option in the side menu bar and click Create Topic:

Creating a topic

Creating a topic

Fill in the topic name on the resulting screen and click Create:

cdr_topic

cdr_topic

3. Create the producer app

Now that your topic is ready, you can develop a Python app to generate the CDRs and publish the records to cdr_topic.

In your project directory, create a subdirectory called cdr_producer. Inside cdr_producer, create a requirements.txt file with the following contents:

kafka-python==2.0.2

The kafka-python library allows your Python application to interact with the Redpanda cluster. Prepare the virtual environment required for this Python application by running these commands in a new terminal:

python -m venv venv
venv\Scripts\activate
pip install -r requirements.txt

Keep this terminal open for later use.

In the same subdirectory, create a Python file called main.py with the following contents:

import json
import uuid

from kafka import KafkaProducer

# Define the Redpanda topic and Kafka producer
topic = "cdr_topic"
producer = KafkaProducer(bootstrap_servers='localhost:19092',
                         key_serializer=str.encode,
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

# Input CDRs file - JSON format
source_file = "cdr_records.json"

# Open the file and read the CDRs
with open(source_file, 'r') as file:
    cdrs = json.load(file)
    for cdr in cdrs:
        # Generate message key (optional)
        message_key = str(uuid.uuid4())

        # Send the CDR to the Redpanda topic
        future = producer.send(topic, key=message_key, value=cdr)
        # Block until a single message is sent (or timeout in 15 seconds)
        result = future.get(timeout=15)

        print("Message sent, partition: ", result.partition, ", offset: ", result.offset)

This code acts as a simple producer application that reads CDR data from a JSON file and sends it to a Redpanda topic, potentially enabling further processing or analysis of the data stream.

Next, in the same subdirectory, create a JSON file called cdr_records.json to hold the sample CDRs. Paste the following into it:

[
  {"customer_id": "C001", "call_date": "2024-02-01", "call_duration": 5, "feedback_score": 3},
  {"customer_id": "C001", "call_date": "2024-02-01", "call_duration": 7, "feedback_score": 4},
  {"customer_id": "C002", "call_date": "2024-02-01", "call_duration": 10, "feedback_score": 5},
  {"customer_id": "C003", "call_date": "2024-02-01", "call_duration": 15, "feedback_score": 4},
  {"customer_id": "C004", "call_date": "2024-02-01", "call_duration": 17, "feedback_score": 4},
  {"customer_id": "C005", "call_date": "2024-02-01", "call_duration": 20, "feedback_score": 5},
  {"customer_id": "C006", "call_date": "2024-02-01", "call_duration": 25, "feedback_score": 5},
  {"customer_id": "C007", "call_date": "2024-02-01", "call_duration": 9, "feedback_score": 4},
  {"customer_id": "C008", "call_date": "2024-02-01", "call_duration": 4, "feedback_score": 5},
  {"customer_id": "C009", "call_date": "2024-02-01", "call_duration": 5, "feedback_score": 2},
  {"customer_id": "C010", "call_date": "2024-02-01", "call_duration": 8, "feedback_score": 3}
]

Your producer application is now ready to publish the CDRs.

4. Set up the Spark cluster

You'll now set up the Spark cluster by opening the docker-compose.yaml file you created earlier. Edit the YAML file to add the following code snippet at the bottom of the file, then save the file:

spark-master:
    image: bitnami/spark:3.4.2
    environment:
      - SPARK_MODE=master
    ports:
      - '8081:8080'
    networks:
      - redpanda_network
  spark-worker:
    image: bitnami/spark:3.4.2
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark-master:7077
    volumes:
      - .\pyspark-app\:/pyspark-app/
    depends_on:
      - spark-master
    networks:
      - redpanda_network

This Docker Compose snippet defines a multicontainer setup for a Spark cluster, consisting of the following:

  • The spark-master container runs the Spark Master, which manages worker allocation and job coordination. It exposes port 8081 for the Spark UI.
  • The spark-worker container runs a Spark Worker, which performs computations assigned by the Master. It connects to the Master at spark://spark-master:7077 and mounts the local .\pyspark-app directory for persistent storage.

Both containers share redpanda_network and run the same Spark image, ensuring compatibility. This setup lets you easily deploy and manage a Spark cluster on Docker for your PySpark applications.

Start the Spark cluster by executing the following command in the same terminal as the docker-compose.yaml file:

docker compose up -d

Once the containers are up, you can open a browser and visit http://localhost:8081/ to access the Spark Master UI:

Spark Master UI

Spark Master UI

5. Create the PySpark app

You'll now create a PySpark application to consume CDRs from the cdr_topic and process them. This involves computing the total call_duration for each customer_id and call_date.

Inside your main project directory, create a subdirectory named pyspark-app. Inside pyspark-app, create a requirements.txt file with the following contents:

pyspark==3.5.0

The pyspark library allows the PySpark application to interact with the Spark and Redpanda clusters. Prepare the virtual environment required for this Python app by running the following commands in a command line terminal:

python -m venv venv
venv\Scripts\activate
pip install -r requirements.txt

Keep this terminal open for later use.

In the same subdirectory, create a Python file called cdr_processor.py and paste the following code into it:

# Import the required classes and functions
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, to_json, struct, sum
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define the schema of the CDR records
cdr_schema = StructType([
    StructField("customer_id", StringType()),
    StructField("call_date", StringType()),
    StructField("call_duration", IntegerType()),
    StructField("feedback_score", IntegerType())
])

# Create a SparkSession
spark = SparkSession.builder.appName("CDRProcessor").getOrCreate()

# Stream the CDR records from the input topic (cdr_topic)
# The records are read from the Kafka broker running at redpanda-0:9092
# The read stream is stored in cdr_df
cdr_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "redpanda-0:9092") \
    .option("subscribe", "cdr_topic") \
    .option("startingOffsets", "latest") \
    .option("group.id", "cdr_processor_group") \
    .load()

# Parse the value column of the read stream as JSON and store it in cdr_df
cdr_df = cdr_df.select(from_json(col("value").cast("string"), cdr_schema).alias("cdr"))

# Perform the aggregation
result_df = cdr_df \
    .groupBy("cdr.customer_id", "cdr.call_date") \
    .agg(sum("cdr.call_duration").alias("total_call_duration"))

# Write the result to the output topic (cdr_output_topic)
query = result_df \
    .select(to_json(struct(col("*"))).alias("value")) \
    .writeStream \
    .outputMode("update") \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "redpanda-0:9092") \
    .option("topic", "cdr_output_topic") \
    .option("checkpointLocation", "/pyspark-app/checkpoint-dir") \
    .start()

# Wait for the query to terminate
query.awaitTermination()

Save this Python file.

This script reads the CDR records in real-time from the cdr_topic and then proceeds to aggregate call durations based on customer and call date. The result is written to cdr_output_topic.

6. Submit the Spark Job

Now that the CDR processing app is ready on the Spark end, open a terminal and execute the following command to initiate a terminal session with the worker node:

docker exec -it redpanda-quickstart-spark-worker-1 bash

Submit the Spark job by executing the following command to launch your cdr_processor.py script on a Spark cluster and ensure it has the necessary Kafka libraries for streaming data access and processing:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.2 /pyspark-app/cdr_processor.py

7. Run the producer app

Now that your consumer app is up and running, you can focus on the producer app. Switch back to the terminal where you activated the producer app's virtual environment earlier. Execute the following command to start publishing the CDRs to cdr_topic:

python main.py

You should see an output similar to the one shown below:

Message sent, partition:  0 , offset:  0
Message sent, partition:  0 , offset:  1 
Message sent, partition:  0 , offset:  2 
Message sent, partition:  0 , offset:  3 
Message sent, partition:  0 , offset:  4 
Message sent, partition:  0 , offset:  5 
Message sent, partition:  0 , offset:  6 
Message sent, partition:  0 , offset:  7 
Message sent, partition:  0 , offset:  8 
Message sent, partition:  0 , offset:  9 
Message sent, partition:  0 , offset:  10

This output indicates that the CDRs in JSON format were published to the Redpanda topic.

8. View the results

Go back to Redpanda Console in your browser and navigate to the Topics screen. You should now see two topics:

Two topics

Two topics

The second topic, cdr_output_topic, was automatically created by the Spark app when it fetched CDRs from the source topic and wrote the processed records to the output topic. You can view the contents of each topic to understand how they were processed.

Click the source topic (cdr_topic) to view the messages that were published from the producer app:

Messages in cdr_topic

Messages in cdr_topic

You had eleven records in the project's cdr_records.json file, so the screen displays eleven CDRs.

Return to the Topics page and select the output topic (cdr_output_topic). You'll be taken to the following screen, where you can see the messages that were published from the PySpark application:

Messages in cdr_output_topic

Messages in cdr_output_topic

You can see that the total call duration is 12 for the customer ID C001, which is the sum of the duration of two different calls by the same customer on the same day. You can also try replacing the contents of the cdr_records.json file with the following set of new sample data:

[
  {"customer_id": "C001", "call_date": "2024-02-02", "call_duration": 7, "feedback_score": 5},
  {"customer_id": "C001", "call_date": "2024-02-02", "call_duration": 8, "feedback_score": 4},
  {"customer_id": "C003", "call_date": "2024-02-02", "call_duration": 9, "feedback_score": 3}
]

After you save the file, start the producer app again to see the new results from Redpanda Console based on the steps detailed earlier.

Note: You don't need to resubmit the Spark job again. It should already be running and listening to the incoming messages to cdr_topic, unless you terminated the session.

Conclusion

In this tutorial, you learned how to ingest real-time CDR data with Redpanda and process it with Spark to derive insights on customer feedback in the telecom domain. You successfully developed a Kafka producer app and published the CDRs to a topic. You then used a Spark real-time streaming application to consume those messages and process the aggregated results to another topic.

You can find the complete code for this tutorial in this GitHub repository. To explore Redpanda, check the documentation and browse the Redpanda blog for more tutorials. If you have questions or want to chat with the team, join the Redpanda Community on Slack.

No items found.

Related articles

VIEW ALL POSTS
Flag non-compliant content in real time with AI and Pinecone
Keanan Koppenhaver
&
&
&
December 24, 2024
Text Link
Build an inventory monitoring system with Flink and MongoDB
Rexford A. Nyarko
&
&
&
October 29, 2024
Text Link
Building a real-time Customer 360 solution for Telco with Flink
Artem Oppermann
&
&
&
October 1, 2024
Text Link