Building a real-time Customer 360 solution for Telco with Flink

The simplest way to build a Customer 360 view for supercharged customer satisfaction and personalization

By
on
October 1, 2024

A Customer 360 view is a comprehensive, unified profile of a customer that aggregates data from various touchpoints and interactions across a company's different channels and platforms. This approach can help a company obtain a holistic understanding of its customers' behavior, preferences, and needs.

This approach can be particularly handy in the telecommunications industry. For example, a Customer 360 view allows telecommunications agents to retrieve the history of previous calls with a customer, which can help them better understand ongoing issues regarding the current needs of this particular customer.

Furthermore, by having immediate access to a customer's service and subscription history, telecommunications companies can offer more personalized upsells or address any concerns related to billing or service changes. The Customer 360 view could provide a customer's current balances, such as the number of SMS messages left or any remaining data bundles. This enables telecommunications agents to proactively suggest top-ups, upgrades, or adjustments to the customer's plan based on their usage patterns.

In this tutorial, you'll learn how to use Apache Flink® and Redpanda — a proven Apache Kafka® alternative — to implement a Customer 360 view.

What is Redpanda?

Redpanda is a streaming data platform that can be used as the backbone for real-time applications and data pipelines. It presents a streamlined alternative to Kafka, especially since it was built without being dependent on external coordination services like ZooKeeper. Although Kafka traditionally required ZooKeeper for cluster coordination, this dependency has been eliminated with the introduction of Kafka's own mechanism, KRaft, in 2022.

Another major advantage of Redpanda is its API compatibility with Kafka, which means it can be used as its drop-in replacement in most systems without the need to change the existing application code. This streamlined architecture makes Redpanda particularly interesting for developers and organizations that aim to implement real-time data streaming without the overhead of managing a complex ecosystem.

On the other hand, Flink is an open-source framework for real-time data processing and analytics. One strength of Flink is its ability to process unbounded and bounded data streams, which makes the framework useful for applications that need real-time analytics. Flink can be integrated with different data sources and sinks, including Redpanda, to enable real-time processing and analysis of streaming data. As Flink can provide accurate, stateful computations across data streams, it's a good option for developing comprehensive Customer 360 views, where real-time data processing and analytics are necessary to provide personalized customer experiences.

Building a Customer 360 view for a telecommunications company

In the following sections, you'll delve into the practical aspects of implementing a Customer 360 view for a telecommunications company. This includes configuring the necessary infrastructure with Docker and Redpanda for data streaming, employing Flink for real-time data processing, and utilizing Python for data production and simulation.

Prerequisites

To follow along, you'll need:

  • Python 3.11 or higher. The code is written in Python, so you'll need to have Python installed on your system.
  • A Python virtual environment created and activated. All Python-related commands should be run in this environment.
  • A recent version of Docker installed on your machine (this tutorial uses Docker 24.0.6).
  • Flink (this tutorial uses version 1.18.1). Download the zipped binary Flink package, then extract the contents of the downloaded package into your project directory. In this tutorial, the project directory is called customer_view_application.
  • OpenJDK installed (this tutorial uses version 11.0.8); you'll use this to program the Flink application in Java.
  • Maven downloaded and installed (this tutorial uses version 3.6.3); you'll use this to create a Java project for the Apache Fink application.

You'll also need the Python packages faker for generating fake data and confluent_kafka for producing and sending this data to a Redpanda topic. To install the packages, open a terminal in your project directory and run the following command inside the terminal in the previously created Python virtual environment:

pip install faker confluent-kafka

1. Set up Redpanda

Running Redpanda inside a Docker container offers a straightforward and efficient method for setting up the service. To do so, open a terminal and execute the following command:

sudo docker run -d --name redpanda-1 -p 9092:9092 redpandadata/redpanda start \
  --kafka-addr PLAINTEXT://0.0.0.0:9092 --advertise-kafka-addr PLAINTEXT://localhost:9092

This command will start the Redpanda service and prepare it for immediate use. The configuration flags specify that Redpanda listens on all network interfaces for Kafka protocol connections (PLAINTEXT://0.0.0.0:9092) and advertises itself to clients using the localhost address (PLAINTEXT://localhost:9092). This setup is particularly useful for development environments since it allows services on the same machine (in this case, Apache Flink) to produce data to and consume data from Redpanda.

To create a topic within the running Docker container, you can follow the instructions in the Redpanda documentation.

Make sure to name your topic user_documents.

2. Produce data to the Redpanda topic

In this section, you'll use a Python script to generate and store simulated user data relevant to the telecommunications industry in the previously created Redpanda topic user_documents. This step will create a mock environment that simulates real-world operations within a telecom company. To generate user data and produce it to Redpanda, create a file called produce_data.py in your project directory and paste the following code into it:

from faker import Faker
from confluent_kafka import Producer
import json
import random
from datetime import datetime, timedelta

# Initialize Faker for data generation
fake = Faker()

# Function to generate a fake user document
def generate_user_document():
    return {
        "name": fake.name(),
        "phone_number": fake.phone_number(),
        "service_start_date": (datetime.now() - timedelta(days=random.randint(1, 365))).strftime('%Y-%m-%d'),
        "current_balance_SMS": random.randint(0, 500),
        "data_bundles_MB": random.randint(0, 10000)
    }

# Function to generate and send data to Redpanda
def produce_fake_data(producer, topic_name, num_messages=100):
    for _ in range(num_messages):
        user_document = generate_user_document()
        # Convert the user document to a JSON string
        user_document_json = json.dumps(user_document)
        # Send the data to Redpanda
        producer.produce(topic_name, value=user_document_json)
        producer.flush()

if __name__ == "__main__":
    # Configuration for connecting to your Redpanda cluster
    conf = {'bootstrap.servers': 'localhost:9092'}
    
    # Create a Producer instance
    producer = Producer(**conf)
    
    # Name of the Redpanda topic
    topic_name = 'user_documents'
    
    # Generate and send the data
    produce_fake_data(producer, topic_name, 100)  # Change 100 to desired number of messages

In this script, the generate_user_document() function creates a dictionary representing a user document. Each document includes a randomly generated name, phone number, service start date, current SMS balance, and data bundle in MB. This function uses the Faker library to produce realistic names and phone numbers. The produce_fake_data() function iterates a specified number of times, each time generating a new user document, converting it into a JSON string, and sending this string to the user_documents topic in the Redpanda cluster. The Producer instance from confluent_kafka is configured with the address of the Redpanda broker. Upon execution, the script produces 100 user documents (this number can be adjusted) to the user_documents topic.

To run the script, open a terminal in the project directory where produce_data.py is saved and run the following command in the Python virtual environment:

python produce_data.py

Depending on your system's configuration, you might need to use python3 instead of python to run the script. This is common in environments where both Python 2 and Python 3 are installed.

To verify that the data was produced to the specified topic, you can use the rpk topic consumes command inside the running Docker container to consume messages from the user_documents topic. For this, open a terminal and run the following command:

docker exec -it redpanda-1 rpk topic consume user_documents --brokers localhost:9092

In the terminal, you should see an output similar to this:

{
  "topic": "user_documents",
  "value": "{\"name\": \"Christopher George\", \"phone_number\": \"+1-613-968-4215x92654\", \"service_start_date\": \"2023-12-14\", \"current_balance_SMS\": 287, \"data_bundles_MB\": 1484}",
  "timestamp": 1710579035649,
  "partition": 0,
  "offset": 0
}

… output omitted …

{
  "topic": "user_documents",
  "value": "{\"name\": \"Eric Jordan\", \"phone_number\": \"(698)295-7259x961\", \"service_start_date\": \"2023-12-13\", \"current_balance_SMS\": 346, \"data_bundles_MB\": 9079}",
  "timestamp": 1710579035653,
  "partition": 0,
  "offset": 99
}

The output snippet shows JSON representations of user documents as they are consumed from the user_documents topic. Each document reflects the randomly generated user profile, including details such as name, phone number, service start date, SMS balance, and data bundle size.

When you are finished using the rpk shell and wish to exit, press Ctrl+C. This keyboard shortcut sends an interrupt signal to the rpk shell process, terminating it and returning you to your system's command prompt.

3. Set up a Flink project

For the next part, you need to use Maven to create a Java project for the Apache Flink application. To generate a new Flink project, open a terminal in your project directory and run the following command:

mvn archetype:generate                               \
  -DarchetypeGroupId=org.apache.flink               \
  -DarchetypeArtifactId=flink-quickstart-java       \
  -DarchetypeVersion=1.18                           \
  -DgroupId=com.example                             \
  -DartifactId=customer360                          \
  -Dversion=1.0-SNAPSHOT                            \
  -Dpackage=com.example.customer360                 \
  -DinteractiveMode=false

This Maven command generates a new Apache Flink project based on the flink-quickstart-java archetype. It automatically sets up a project structure and includes essential dependencies for developing a Flink application in Java. The details provided (groupId, artifactId, version, and package) configure the project's metadata and base package name. Specifically, it creates a package named com.example.customer360 within a customer360 project that's ready for development with Apache Flink, which doesn't require further setup for project structure.

Your project should now have the following structure:

customer_view_application
├── customer360
	├── main
		├── java
			├── com.example.customer360
		├── resources
	├── pom.xml
├── flink-1.18.1

Open pom.xml and add the following dependencies within the <dependencies> node:

<dependency>
<groupId>org.apache.flink</groupId>
	<artifactId>flink-streaming-java</artifactId>
	<version>1.18.1</version>
	<scope>provided</scope>
</dependency>
<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>3.1.0-1.18</version>
</dependency>
<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version><flink-version></version> <!-- Add the Flink version you are using here -->
            <scope>provided</scope>
</dependency>

The flink-streaming-java dependency provides the core APIs necessary for building streaming applications in Java with Flink. The flink-connector-kafka dependency adds support for Apache Kafka, allowing the application to read from and write to Kafka topics (or Redpanda, in this case). Lastly, the flink-table-api-java-bridge dependency enables the use of Flink's Table API alongside the DataStream API, offering a more declarative way to define data transformations.

4. Run a Flink application that processes data from a Redpanda topic

Navigate to the folder customer360/main/java/com.example.customer360 and create a Java file called RedpandaToFlink.java inside it. Paste the following code into the file:

package com.example.customer360;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.types.Row;

import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.common.typeinfo.Types;

public class RedpandaToFlink{

    public static void main(String[] args) throws Exception {
        // Set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Initialize input parameters
        final ParameterTool params = ParameterTool.fromArgs(args);
        env.getConfig().setGlobalJobParameters(params);

        // Configure Kafka consumer
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092"); // Adjust for your Redpanda setup
        properties.setProperty("group.id", "flink_consumer");

        // Create a Kafka source (consumer)
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                "user_documents", // Redpanda topic name
                new SimpleStringSchema(), // Deserialization schema
                properties);

        // Assign the source to the environment
        DataStream<String> stream = env.addSource(consumer);

        // Simple processing logic: just print each message (in real applications, replace this with your processing logic)
        stream.print();

        // Execute the Flink job
        env.execute("Flink Redpanda Consumer Example");
    }
}

This Java code snippet defines a basic Apache Flink application that consumes data from the previously created Redpanda topic user_documents. To do this, the code uses the Flink DataStream API. It establishes a streaming execution environment, configures a Kafka consumer with the necessary properties (including the bootstrap servers and consumer group ID), and subscribes to the user_documents topic. Data fetched from this topic is deserialized into strings using `SimpleStringSchema`. The application then applies a simple processing logic that merely prints each received message to the console. Later in the tutorial, you'll replace the printing logic with sophisticated data analysis operations.

Before running this application, you must first package it. To do so, navigate to the customer360 directory, open a terminal, and execute the following Maven command:

mvn clean package

The command compiles the Java code and its dependencies into a JAR file in the target directory inside the customer360 directory. This makes it easy to deploy to the Flink cluster.

You now need to start Apache Flink. Navigate to the customer_view_application directory, open a terminal, and execute the command below:

./flink-1.18.1/bin/start-cluster.sh 

This command will start the Apache Flink cluster, and you should see an output in the same terminal that resembles the following:

Starting cluster.
Starting standalonesession daemon on host AWX-Lin-EN.
Starting taskexecutor daemon on host AWX-Lin-EN.

To instruct Flink to start the Java application, navigate to `customer_view_application`, open a terminal, and run the following command:

./flink-1.18.1/bin/flink run -c com.example.customer360.RedpandaToFlink customer360/target/customer360-1.0-SNAPSHOT.jar

By executing this command, the application connects to a Redpanda topic and consumes the data from the topic. To verify that the application works and that Flink consumes the data from the Redpanda topic, navigate to http://localhost:8081 in your web browser to access the Flink dashboard. In the dashboard, navigate to the Task Managers section and open the Stdout tab to see the output of the application—in this case, simple printing of the data consumed from the Redpanda topic that was defined in the code:

The consumed messages displayed in the dashboard

When running the Flink application that consumes data from the Redpanda topic, it's important to have the "Stdout" tab open in the Flink dashboard before producing new data. If the data production script (produce_data.py) is run before you open the "Stdout" tab, you might not see the output of consumed messages. This is because the Flink job might have already processed the messages by the time you check the output.

To ensure you see the live data consumption:

  • Start the Flink application and ensure it's running by checking the Flink dashboard.
  • Open the "Stdout" tab under the Task Managers section in the Flink dashboard to prepare for viewing the output.
  • Run the produce_data.py script again to produce fresh data while the "Stdout" tab is open. This will allow you to see the newly consumed data appear in real time.

5. Implement data analysis using Flink's Table API

In the next step, you'll use Apache Flink's Table API to perform more sophisticated processing and analysis of the streaming data. For this, you'll need to make some changes in the existing RedpandaToFlink.java file. In the file, rather than directly invoking the stream.print() method following the creation of the DataStream, include the following code snippet instead:

... code omitted... 

        // Assign the source to the environment
        DataStream<String> stream = env.addSource(consumer);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);


SingleOutputStreamOperator<Row> parsedStream = stream
    .map(value -> {
        // Remove leading and trailing curly braces and split by comma to get each key-value pair
        String[] parts = value.substring(1, value.length() - 1).split(",");

        // Extract each field from the parts
        // Assuming your JSON structure does not contain nested JSON objects or arrays
        // and does not include commas within the strings
        String name = parts[0].split(":")[1].replace("\"", "").trim();
        String phoneNumber = parts[1].split(":")[1].replace("\"", "").trim();
        String serviceStartDate = parts[2].split(":")[1].replace("\"", "").trim();
        int currentBalanceSMS = Integer.parseInt(parts[3].split(":")[1].trim());
        int dataBundlesMB = Integer.parseInt(parts[4].split(":")[1].trim());

        // Create a Row of objects
        return Row.of(name, phoneNumber, serviceStartDate, currentBalanceSMS, dataBundlesMB);
    })
    .returns(Types.ROW_NAMED(
        new String[]{"name", "phoneNumber", "serviceStartDate", "currentBalanceSMS", "dataBundlesMB"},
        Types.STRING, Types.STRING, Types.STRING, Types.INT, Types.INT
    )); // Specify the types of the fields

// Define a Schema that matches your Row data structure
Schema schema = Schema.newBuilder()
    .column("name", DataTypes.STRING())
    .column("phoneNumber", DataTypes.STRING())
    .column("serviceStartDate", DataTypes.STRING())
    .column("currentBalanceSMS", DataTypes.INT())
    .column("dataBundlesMB", DataTypes.INT())
    .build();

    // Use the Schema when creating a view
    tableEnv.createTemporaryView("user_documents", parsedStream, schema);

    // Now you can perform SQL operations on your table
    Table result = tableEnv.sqlQuery(
        "SELECT name, MAX(currentBalanceSMS) AS maxSmsBalance " +
        "FROM user_documents " +
        "GROUP BY name " +
        "ORDER BY maxSmsBalance DESC " +
        "LIMIT 10"
    );
    
    DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(result, Row.class);
    retractStream.print(); 

        env.execute("Flink Redpanda Consumer");
    }
}

This updated version of the Java code expands on the previous simple message-printing logic by incorporating Flink's Table API for more sophisticated stream processing. The code now transforms the incoming string messages into a structured format. This transformation involves parsing each message into separate fields (name, phone number, service start date, current balance SMS, and data bundles in MB) and creating a Row object for each message. The code uses Flink's Table API to create a StreamTableEnvironment, which bridges the DataStream API and the Table API and allows for SQL-like operations on streaming data. The code defines a schema that matches the structured data in the stream to create a temporary view called user_documents that represents the incoming data as a table.

Subsequently, the code executes an SQL query over this table, selecting the top ten users by their maximum SMS balance. The query results are printed out, similar to the execution in the last section, but now after a transformation and aggregation step.

As you did in the last section, package the code by executing the following Maven command in the terminal in the customer360 directory:

mvn clean package

Before running a new or updated Flink application, it's advisable to first stop any existing jobs that might conflict with the new deployment by pressing Ctrl+C in the terminal where the previous job was executed. Following this, open a terminal in the customer_view_application directory and instruct Flink to start the application:

./flink-1.18.1/bin/flink run -c com.example.customer360.RedpandaToFlink customer360/target/customer360-1.0-SNAPSHOT.jar

Navigate to http://localhost:8081 in your web browser to access the Flink dashboard. In the dashboard, navigate to the Task Managers section and open the Stdout tab to see the new output of the application after the transformation and aggregation step:

Results of data analysis

With this tab already open, run the python produce_data.py command again to generate and send new data.

The output shows the result, where each record is emitted as a tuple with two parts. The first is a Boolean flag, where true indicates an "insert" operation (or a new record being added to the result set) and false indicates a "delete" operation (or an existing record being removed from the result set). The second is a row object, indicated by +I for inserts and -D for deletes, followed by the actual data. For example, (true,+I[Name, Number]) indicates a new result has been added to the output. The +I signifies inserted data, with the name and number showing the actual data inserted. The process of insertions and deletions allows the stream to dynamically update the result set in response to changes in the underlying data or query conditions. This is particularly useful for continuous queries on dynamic tables where the result set changes over time.

Next steps

Until now, you've used Flink's Table API to consume data from a single source (a Redpanda topic) and to get basic SQL query results from the streamed data. The next logical step in implementing a 360-degree customer view would be to aggregate more query results to compile a complete profile for each customer. This can involve combining data from different sources and channels to create a single, unified view of customer interactions, preferences, and history. These aggregated results could identify key customer segments, preferences, and potential opportunities for personalized marketing or service improvements. The aggregated customer views could be stored in a database or data warehouse that supports fast querying and data analysis.

Consider an example where you're aggregating various customer data points. Suppose you have a table called CustomerData that includes fields like customer_id, interaction_date, transaction_amount, service_used, and data_consumed. To create a comprehensive view of a customer's average daily usage and transaction details, you could use the following SQL query:

SELECT 
    customer_id,
    AVG(transaction_amount) AS average_transaction,
    AVG(data_consumed) AS average_data_usage,
    COUNT(DISTINCT service_used) AS number_of_services_used
FROM 
    CustomerData
GROUP BY 
    customer_id

Imagine running the above query on your stream of customer data. The output might look like this:

customer_idaverage_transactionaverage_data_usagenumber_of_services_used
1234585.752.5 GB3
67890150.204.1 GB4
5432165.001.0 GB1

The aggregated data, while insightful on its own, can go even further if you integrate it with BI tools like Tableau or Power BI for more dynamic visualizations and dashboards.

Conclusion

For more insights and a practical implementation of this solution, visit the GitHub repository, where you can find the complete code and setup instructions.

In this tutorial, you learned how to implement a real-time Customer 360 view using Flink and Redpanda. This approach allows for the ingestion, processing, and analysis of streaming data to enhance customer understanding, predict behavior, personalize experiences, and engage more effectively. As a result, a real-time Customer 360 view should ultimately lead to improved customer satisfaction and service personalization.

To explore Redpanda and how it can make your job easier, check the documentation and browse the Redpanda blog for use cases and tutorials. If you have questions or want to chat with the team, join the Redpanda Community on Slack.

No items found.

Related articles

VIEW ALL POSTS
Real-time product recommendation AI inferencing
Tyler Rockwood
&
&
&
January 14, 2025
Text Link
Real-time analytics with MongoDB and Redpanda Connect
Aykut Bulgu
&
&
&
January 7, 2025
Text Link
Flag non-compliant content in real time with AI and Pinecone
Keanan Koppenhaver
&
&
&
December 24, 2024
Text Link