Apache Kafka website activity tracking
Real-time website activity tracking with Apache Kafka®
Real-time tracking empowers developers and DevOps engineers by collecting and updating data on visitor activities, such as page views, clicks, form submissions, and user locations. This data is important for analyzing user behavior, optimizing website performance, and quickly addressing trends or issues. Real-time tracking is a versatile tool with countless use cases for monitoring website activity, like real-time user behavior analysis, fraud detection and prevention, or personalized content delivery.
If you have an e-commerce site, real-time tracking can help you determine popular products not just by sales but also by visit frequency. It can also uncover insights about customer behavior and product performance.
In this article, you’ll learn how to implement real-time website activity tracking using Apache Kafka and Apicurio.
Implementing real-time user behavior tracking
This tutorial focuses on monitoring user behavior on a shopping website with minimal impact on performance. Let’s say you’re an owner of an e-commerce website. As the shop owner, your goal is to gain insights into user behavior by monitoring which products attract the most attention and which ones are often added to shopping carts. To get the required data, you’ll implement a tracking script that captures page views and product interactions.
You’ll set up a system where each time a user opens a page on your site, the URL and request parameters are sent to the backend. There, you’ll extract the relevant data and write it into an Apache Kafka topic, which ensures that the tracking process has minimal impact on your site’s performance. You’ll then use a schema registry to minimize the data transfer overhead compared to sending complete JSON data.
Once the data is collected, you’ll aggregate it and determine the most popular products. This way, every time a user opens a page, the system has the most popular products ready to be rendered.

Prerequisites
Before you begin this tutorial, make sure you have the following tools installed and configured on your machine:
- Docker to run all required tools out of the box
- Git to be able to clone repositories
- An AWS account (a free tier is available for personal use)
- An S3 bucket (can be created at the AWS console)
You can access the starting point of the project on GitHub. By using the Docker setup provided in the repo, all the required tools, including Kafka and Apicurio, are automatically configured for you.
Getting started with the base application
In this tutorial, you’ll work with a prebuilt e-commerce application made with Spring Boot. This webshop includes essential features such as product listing, search filtering, detailed product views, and shopping cart management.
Clone the repository to get the starting point and follow along with the tutorial:
git clone --branch starting-point --single-branch https://github.com/redpanda-data-blog/website-activity-tracking-kafka.git
Use the included Docker Compose configuration to start the setup:
docker compose up -d
This will download and start all necessary containers in the background, allowing you to begin working with the application immediately. Once started, you can access the webshop at http://localhost:8080/list.
Creating Kafka topics
Before streaming data, you need to create two Kafka topics, which serve as the primary structure for organizing message streams in Kafka:
- The
product-interactions
topic will store user interactions with products, such as page views and cart additions. - The
product-scores
topic will store the aggregated popularity scores for products.
To create these topics, you first need to connect to the Kafka container:
docker exec -it kafka bash
Use the following code to create the product-interactions
topic:
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic product-interactions
If successful, you should see this output:
Created topic product-interactions.
This code will create the product-scores
topic:
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic product-scores
Here’s the expected output:
Created topic product-scores.
Implementing the Kafka producer
To start streaming data to your Kafka topic, you’ll need to implement a Kafka producer that’s responsible for sending events to Kafka.
The implementation consists of four main components (as Java classes):
- KafkaProducerConfig
- KafkaProducerService
- TrackingEvent
- UserBehaviorController
1. KafkaProducerConfig
Create a file named KafkaProducerConfig.java
in the service/configuration
directory with the following content:
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
The KafkaProducerConfig
class is responsible for setting up the basic configuration for your Kafka producer. It defines two essential components:
ProducerFactory
, which handles the creation of producer instancesKafkaTemplate
, which provides high-level operations for sending messages
You configure your producer in ProducerFactory
. In this configuration class, the provided options are:
BOOTSTRAP_SERVERS_CONFIG
, which defines the location of the Kafka bootstrap serverKEY_SERIALIZER_CLASS_CONFIG
, which defines the class that will be in charge of serializing the keyVALUE_SERIALIZER_CLASS_CONFIG
, which defines the class that will be in charge of serializing the value
2. KafkaProducerService
KafkaProducerService
is your main producer class that handles the actual sending of messages. Create a file named KafkaProducerService.java
in the service
directory with the following content:
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
private static final String TOPIC = "product-interactions";
@Autowired
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendEvent(String key, String message) {
kafkaTemplate.send(TOPIC, key, message);
System.out.println("Sent event: Key = " + key + ", Message = " + message);
}
}
The class provides a simple way to send messages to Kafka using the KafkaTemplate
. The sendEvent
method sends messages to a Kafka topic asynchronously. It doesn’t wait for the message to be fully processed before continuing, which helps keep the application running smoothly without delays.
Currently, the service sends simple string messages. Later in the tutorial, you’ll change it to handle serialized objects.
3. TrackingEvent
The TrackingEvent
class represents the event that will be sent from your webshop to the backend. It will also serve as the primary class to be serialized and transmitted to Kafka as a JSON object. Create a file named TrackingEvent.java
in the model
directory with the following content:
package com.example.shop.model;
public class TrackingEvent {
private String productId;
private Integer quantity;
private Integer weight;
// Getters and setters
public String getProductId() {
return productId;
}
public void setProductId(String productId) {
this.productId = productId;
}
public Integer getQuantity() {
return quantity;
}
public void setQuantity(Integer quantity) {
this.quantity = quantity;
}
public Integer getWeight() {
return weight;
}
public void setWeight(Integer weight) {
this.weight = weight;
}
}
4. UserBehaviorController
To capture events generated by your webshop and forward them to Kafka, you’ll need to create a controller. Create a file named UserBehaviorController.java
in the controller
directory and add the following content:
@RestController
@RequestMapping("/api")
public class UserBehaviorController {
private final KafkaProducerService kafkaProducerService;
public UserBehaviorController(KafkaProducerService kafkaProducerService) {
this.kafkaProducerService = kafkaProducerService;
}
@Operation(summary = "Track user behavior on the website",
description = "Receives pre-parsed tracking data and sends it to Kafka.")
@ApiResponses(value = {
@ApiResponse(responseCode = "200", description = "Event successfully tracked"),
@ApiResponse(responseCode = "400", description = "Invalid request payload")
})
@PostMapping("/track")
public ResponseEntity<Void> trackUserBehavior(@RequestBody TrackingEvent trackingEvent) {
if (trackingEvent.getProductId() == null || trackingEvent.getWeight() == null) {
return ResponseEntity.badRequest().build();
}
for(int i=0;i<trackingEvent.getQuantity();i++){
kafkaProducerService.sendEvent("product-interaction", trackingEvent.getProductId()+";"+trackingEvent.getWeight());
}
return ResponseEntity.ok().build();
}
}
This controller provides a single REST endpoint that receives tracking events. The /track
endpoint accepts product interactions, and input validation ensures that the required data (product ID and weight) is present. The kafkaProducerService
then sends the events to Kafka.
Testing the event flow
With the application now capable of producing events, it’s time to test its functionality. Start by initializing your setup with the command docker compose up -d
, then navigate to http://localhost:8080/swagger-ui/index.html#/user-behavior-controller/trackUserBehavior. This interface allows you to send an event to the application.
For example, you can send the following event payload:
{
"productId": "4",
"quantity": 3,
"weight": 5
}
Next, connect to the Kafka container and execute the following command to monitor the messages being sent to the product-interactions
topic:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic product-interactions --from-beginning
This command will display all messages in the topic. You should see an output similar to the following:
…output omitted…
4;5
4;5
4;5
…output omitted…
Implementing the tracking script
Now that both the producer and related RESTful service are ready, you can set up the frontend tracking functionality. The challenge is to collect and send data without impacting the website’s performance. You can solve it through a dedicated tracking script.
Create a file named trackingScript.js
in the resources/static/assets
directory with the following content:
// Tracking script to run on page load
(function () {
function extractTrackingData(page) {
let payload = { weight: null, productId: null, quantity: null };
if (page.startsWith("/detail/")) {
// Extract productId from the product detail page URL
payload.productId = page.split("/detail/")[1];
payload.weight = 1;
payload.quantity = 1;
} else if (page.startsWith("/shopping-cart")) {
// Extract query parameters from the shopping cart page URL
const urlParams = new URLSearchParams(page.split("?")[1]);
payload.productId = urlParams.get("addProduct");
payload.quantity = urlParams.has("quantity") ? parseInt(urlParams.get("quantity"), 10) : 1;
payload.weight = 2;
}
return payload;
}
function sendTrackingData(payload) {
if (!payload) return;
fetch("/api/track", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(payload)
})
.then(response => {
if (!response.ok) {
console.error("Failed to track user behavior:", response.statusText);
}
})
.catch(error => console.error("Error sending tracking data:", error));
}
// Extract current page and send tracking data
const currentPage = window.location.pathname + window.location.search;
const trackingData = extractTrackingData(currentPage);
if (trackingData.productId) {
sendTrackingData(trackingData);
}
})();
This script captures information from the current web page, such as its URL path and query string, to extract relevant tracking data. The extractTrackingData
function processes the page details, identifying the product ID, quantity, and weight depending on the page type (1
for product detail page or 2
for shopping cart). The extracted data is then sent to the server through the sendTrackingData
function, which makes a POST request to the /api/track
endpoint.
While the script’s functionality is straightforward, it’s sufficient to track user interactions with products across the website.
To enable the tracking script, you must include it in the <head>
section of your HTML templates. Locate the files product-detail.html
, product-list.html
, and shopping-cart.html
in the resources/templates
directory. Add the following line to the end of the <head>
section in each file:
<script src="/assets/trackingScript.js"></script>
Connecting Apicurio Registry
With your events flowing into Kafka, the next step is to optimize data schema handling by storing them in a central registry. This reduces overhead by only sending the actual data in messages. Schema registries also enable schema evolution, which allows you to modify schemas over time while maintaining compatibility with past data through schema mutation.
For this implementation, you’ll use the Apicurio schema registry to manage your schemas. The registry will run on http://localhost:8081/
when started through Docker Compose.
Configuration changes
To integrate your example app with Apicurio Registry, you need to update the KafkaProducerConfig
class in service/configuration/KafkaProducerConfig.java
. Configure JsonSchemaKafkaSerializer
from io.apicurio.registry.serde.jsonschema
as the value serializer by setting the VALUE_SERIALIZER_CLASS_CONFIG
property to JsonSchemaKafkaSerializer.class
:
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSchemaKafkaSerializer.class);
You also need to specify the schema registry endpoint so your producer knows where to get the schemas from. Add the following line to the producerFactory
function, right before the return
statement:
configProps.put(SerdeConfig.REGISTRY_URL, "http://apicurio-registry:8080");
Adapting the Kafka template and producer service
You also need to modify the producerFactory()
function to return a ProducerFactory
that can handle TrackingEvent
objects. In KafkaProducerConfig
, update the function to return a ProducerFactory<String, TrackingEvent>
object:
…code omitted…
public ProducerFactory<String, TrackingEvent> producerFactory() {
…code omitted…
Similarly, change the kafkaTemplate()
function so it returns a KafkaTemplate<String, TrackingEvent>
object.
Together with the configuration updates applied, your KafkaProducerConfig
class should now look as follows:
package com.example.shop.service.configuration;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import com.example.shop.model.TrackingEvent;
import io.apicurio.registry.serde.SerdeConfig;
import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, TrackingEvent> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSchemaKafkaSerializer.class); // Using Apicurio's serializer
configProps.put(SerdeConfig.REGISTRY_URL, "http://apicurio-registry:8080"); // Apicurio Schema Registry URL
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, TrackingEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
You need to change the KafkaProducerService
class, located in the service/KafkaProducerService.java
file, to work with the new template. You also need to update the type of the kafkaTemplate
variable and the function argument of the KafkaProducerService
function. Your class should now look like this:
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, TrackingEvent> kafkaTemplate;
private static final String TOPIC = "product-interactions";
@Autowired
public KafkaProducerService(KafkaTemplate<String, TrackingEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendEvent(String key, TrackingEvent message) {
kafkaTemplate.send(TOPIC, key, message);
}
}
Making TrackingEvent serializable
To ensure compatibility with Kafka messages, make the TrackingEvent
model serializable using the jackson-databind
library. Add @JsonCreator
and @JsonProperty
annotations to the class constructor. After these changes, the class should look like this:
package com.example.shop.model;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
public class TrackingEvent {
private String productId;
private Integer quantity;
private Integer weight;
@JsonCreator
public TrackingEvent(
@JsonProperty("productId") String productId,
@JsonProperty("quantity") int quantity,
@JsonProperty("weight") int weight
) {
this.productId = productId;
this.quantity = quantity;
this.weight = weight;
}
// Getters and setters
public String getProductId() {
return productId;
}
public void setProductId(String productId) {
this.productId = productId;
}
public Integer getQuantity() {
return quantity;
}
public void setQuantity(Integer quantity) {
this.quantity = quantity;
}
public Integer getWeight() {
return weight;
}
public void setWeight(Integer weight) {
this.weight = weight;
}
}
Registering schemas via the Apicurio UI
The easiest way to register schemas is through the Apicurio UI. To register schemas, access the Apicurio UI at http://localhost:8888/explore. You should see the dashboard:

For the product-interactions
topic, you need to create a new artifact with the ID product-interactions-value
:

Note: The schema ID must follow a specific naming convention. Since you are creating a schema for the value of theproduct-interactions
topic, the ID should beproduct-interactions-value
.
This artifact will use the following JSON schema:
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ProductInteractions",
"type": "object",
"properties": {
"productId": {
"type": "string"
},
"quantity": {
"type": "integer"
},
"weight": {
"type": "integer"
}
},
"required": ["productId", "quantity", "weight"]
}
After defining the schema, check that there are no errors and that all properties and required fields are accurate:

Similarly, for the product-scores
topic, create another schema with the ID product-scores-value
:
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ProductScores",
"type": "object",
"properties": {
"score": {
"type": "integer"
}
},
"required": ["score"]
}
You’ll only need the score; the productId
will be the key.
With these configurations in place, your application is now integrated with Apicurio Registry, which allows you to manage your schemas and use its evolution capabilities.
Implementing Kafka Streams
To display the most popular products, you’ll process and aggregate tracking data in real time using a Kafka Streams application within your Java application. This involves setting up two classes: one for the configuration and another for the service handling the Kafka Streams logic.
KafkaStreamsConfig
This class defines the configuration for the Kafka Streams application. Create a file named KafkaStreamsConfig.java
in the service/configuration
directory with the following code:
@Configuration
public class KafkaStreamsConfig {
@Bean
public Properties kafkaStreamsProperties() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "product-scores-aggregator");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSchemaSerde.class.getName());
props.put(SerdeConfig.REGISTRY_URL, "http://apicurio-registry:8080");
props.put(SerdeConfig.FIND_LATEST_ARTIFACT, true);
return props;
}
}
The unique identifier for your Streams application will be product-scores-aggregator
, and you’ll once again use Apicurio for schema management.
KafkaStreamsService
Create a file named KafkaStreamsService.java
in the service
directory with the following content, which will set up the Kafka Streams pipeline:
@Service
public class KafkaStreamsService {
private final Properties streamsConfig;
public KafkaStreamsService(@Qualifier("kafkaStreamsProperties") Properties streamsConfig) {
this.streamsConfig = streamsConfig;
initializeStream();
}
private void initializeStream() {
StreamsBuilder builder = new StreamsBuilder();
// Configure Serde for JSON using Apicurio's Serde
JsonSchemaSerde<TrackingEvent> trackingEventSerde = new JsonSchemaSerde<>();
trackingEventSerde.configure(serdeConfig(), false); // false for value serde
// Step 1: Read from "product-interactions" topic
KStream<String, TrackingEvent> trackingEventStream = builder.stream(
"product-interactions",
Consumed.with(Serdes.String(), trackingEventSerde)
);
// Step 2: Process and aggregate data
trackingEventStream
.groupBy((key, event) -> event.getProductId())
.aggregate(
() -> 0, // Initializer
(key, event, aggregate) -> aggregate + event.getQuantity() * event.getWeight(),
Materialized.with(Serdes.String(), Serdes.Integer())
)
.toStream()
.mapValues(aggregate -> {
return new ProductScore(aggregate);
})
.to("product-scores", Produced.with(Serdes.String(), createProductScoreSerde())); // Explicitly define ProductScore type
// Build and start the stream
KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
streams.start();
// Graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
private JsonSchemaSerde<ProductScore> createProductScoreSerde() {
JsonSchemaSerde<ProductScore> productScoreSerde = new JsonSchemaSerde<>();
productScoreSerde.configure(serdeConfig(), false); // configure with the schema registry URL
return productScoreSerde;
}
private Map<String, Object> serdeConfig() {
Map<String, Object> config = new HashMap<>();
config.put(SerdeConfig.REGISTRY_URL, "http://apicurio-registry:8080/");
return config;
}
}
The configuration from KafkaStreamsConfig
is autowired, and initializeStream()
sets up the stream topology. Events from the product-interactions
topic are read as a KStream
, then grouped by productId
. The data is then aggregated using an initializer (0
) and an aggregation function that calculates quantity * weight. The aggregated results are converted into a KStream
and mapped to ProductScore
objects. The processed scores are then written to the product-scores
topic.
However, the code still won’t work because Kafka Streams creates intermediate topics for processing, which require schemas to be registered in the schema registry. To resolve this, create a new artifact in the Apicurio UI with the artifact ID product-scores-aggregator-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-value
and the following schema content:
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"productId": { "type": "string" },
"quantity": { "type": "integer" },
"weight": { "type": "integer" }
},
"required": ["productId", "quantity", "weight"]
}
While the artifact ID might seem unusual, it still follows the specific naming convention.
Once the application is started, it will continuously process events from the product-interactions
topic, compute aggregated scores, and write them to the product-scores
topic in real time.
Creating the consumer and displaying the top products
The final step in your real-time tracking system is to implement the consumer that will read from the product-scores
topic and display the top five products to users. The consumer implementation includes three main components: the consumer configuration, the service layer, and the ProductScore
model.
Create a file named KafkaConsumerConfig.java
in the service/configuration
directory with the following content to create the consumer configuration:
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public DefaultKafkaConsumerFactory<String, ProductScore> consumerFactory() {
Random rand = new Random();
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSchemaKafkaDeserializer.class); // Using Apicurio's deserializer
configProps.put("value.class.name", ProductScore.class.getName());
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "product-scores-consumer-group-"+rand.nextInt(1000));
configProps.put(SerdeConfig.REGISTRY_URL, "http://apicurio-registry:8080"); // Apicurio Schema Registry URL
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Start from the beginning
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ProductScore> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ProductScore> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
While it’s similar to the producer, it includes a new configuration option, AUTO_OFFSET_RESET_CONFIG
:
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
This setting tells the consumer where it starts reading messages when it first connects to a topic. By setting it to earliest
, you ensure that the consumer processes all events from the beginning of the topic’s history, preventing any loss of important information.
Next, create KafkaConsumerService
in the service
directory:
@Service
public class KafkaConsumerService {
private final Map<String, Integer> productScores = new HashMap<>();
@KafkaListener(topics = "product-scores", containerFactory = "kafkaListenerContainerFactory")
public void consumeEvent(ConsumerRecord<String, ProductScore> record) {
String productId = record.key();
ProductScore productScore = record.value();
if(productScores.get(productId) == null || productScores.get(productId) < productScore.getScore())
productScores.put(productId, productScore.getScore());
}
public Map<String, Integer> getAllProductScores() {
return new HashMap<>(productScores);
}
// Method to get the top N products
public List<Map.Entry<String, Integer>> getTopProducts(int topN, String productIdToExclude) {
return productScores.entrySet()
.stream()
.filter(entry -> productIdToExclude == null || !entry.getKey().equals(productIdToExclude))
.sorted((e1, e2) -> Integer.compare(e2.getValue(), e1.getValue()))
.limit(topN)
.collect(Collectors.toList());
}
public Integer getProductScore(String productId) {
return productScores.get(productId);
}
}
The class includes a consumeEvent
function that does exactly as the name suggests. The function is triggered whenever a new event is published to the product-scores
topic. When an event is consumed, the service extracts the productId
and score, storing them in an internal HashMap
for quick access and updates.
The service also provides a getTopProducts
function that:
- Orders products in the HashMap based on their scores
- Returns the top N products
- Supports filtering out specific products (useful when displaying recommendations on product detail pages)
Integration with the frontend
To update ShopController
(in controller/ShopController.java
) to fetch the top five products and pass them to the frontend via a popularProducts
attribute, you first need add a private variable called kafkaConsumerService
and assign it in the controller:
private final KafkaConsumerService kafkaConsumerService;
public ShopController(KafkaConsumerService kafkaConsumerService) {
this.kafkaConsumerService = kafkaConsumerService;
}
Don’t worry, the KafkaConsumerService
instance will automatically be sent to the controller due to dependency injection.
Add the following line before the return
statement in the showProductList
and updateShoppingCart
methods:
model.addAttribute("popularProducts", Products.getProductsByIds(kafkaConsumerService.getTopProducts(5, null)));
Use the showProductDetail
method to filter out the product being displayed on the detail page:
model.addAttribute("popularProducts", Products.getProductsByIds(kafkaConsumerService.getTopProducts(5,productId.toString())));
This ensures that the popularProducts
attribute is populated with the top products retrieved from the kafkaConsumerService
and passed to the frontend for rendering.
Once implemented and the web application is running, users will see a “top products” section on every site page.
This implementation creates a complete feedback loop in your system, from tracking user interactions, processing them through Kafka Streams, to finally displaying personalized recommendations based on aggregate user behavior.
Forwarding a stream to a data lake
Eventually, you’ll need to store your data in external storage for long-term analysis and to free up space. Kafka Connect, which comes bundled with Kafka, can help you with that. In this section, you’ll learn how to forward your stream to an Amazon S3 bucket.
Creating the Kafka S3 sink connector
The Kafka S3 sink connector is required to forward data to S3. You can find it in the project’s GitHub repository or download it directly from the official link. Place the connector in the plugins
directory of Kafka Connect (./plugins
if using Docker Compose).
Configuring the S3 sink connector
Create a file named s3-sink-connector.json
with the following configuration:
{
"name": "s3-sink-connector",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"topics": "product-interactions",
"s3.bucket.name": "<The name of your S3 bucket>",
"s3.region": "eu-north-1",
"s3.part.size": "5242880",
"flush.size": "1000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schema.registry.url": "http://apicurio:8080/apis/ccompat/v7",
"value.converter.schemas.enable": "true",
"s3.credentials.provider.class": "io.confluent.connect.s3.auth.AwsAccessKeyCredentialsProvider",
"aws.access.key.id": "<YourAccessKeyID>",
"aws.secret.access.key": "<YourSecretAccessKey>",
"topics.dir": "kafka-data"
}
}
The configuration contains numerous options, but the most important ones are:
s3.bucket.name
, which tells the connector the bucket nameflush.size
, which tells Kafka Connect how many records the sink connector processes before the data is committed to its destinationvalue.converter
, which tells Kafka Connect how to convert the value (in this case, it will deal with JSON)value.converter.schema.registry.url
, which sets the registry URLvalue.converter.schemas.enable
, which enables or disables the schemasaws.access.key.id
, which contains your AWS keyaws.secret.access.key
, which contains your AWS secret
Deploy the configuration with the Kafka Connect REST API:
curl -X POST -H "Content-Type: application/json" \
--data @s3-sink-connector.json \
http://localhost:8083/connectors
You can now verify that the connector is running:
curl http://localhost:8083/connectors
Configuring the S3 source connector
You can use the data saved in the S3 bucket to analyze past user behavior, such as comparing products visited last Christmas with current user preferences. Tools like Apache Spark can batch process this data, or you can continue using your existing tools to process it with a custom Streams application.
First, you have to load the data back into Kafka. To do so, you need to set up the source connector. Save the following config to a file called s3-source-connector.json
:
{
"name": "s3-source-connector",
"config": {
"connector.class": "io.confluent.connect.s3.S3SourceConnector",
"tasks.max": "1",
"s3.bucket.name": "kafka-connect-data",
"s3.region": "eu-north-1",
"s3.key.format": "json",
"s3.value.format": "json",
"s3.credentials.provider.class": "io.confluent.connect.s3.auth.AwsAccessKeyCredentialsProvider",
"aws.access.key.id": "<YourAccessKeyID>",
"aws.secret.access.key": "<YourSecretAccessKey>",
"topic": "past-product-interactions",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://apicurio:8080/apis/ccompat/v7",
"value.converter.schemas.enable": "true"
}
}
Deploy the configuration via the REST endpoint:
curl -X POST -H "Content-Type: application/json" --data @s3-source-connector.json http://localhost:8083/connectors
After you restart Kafka Connect, your new topic should be populated.
Examining the Kafka Streams application
Once you have historical data saved in S3 and loaded back into Kafka through the source connector, you can write a Kafka Streams application that consumes the new topic and processes the data into usable information.
In your cloned project, open service/KafkaStreamAnalysisService.java
. This file contains a Java class called KafkaStreamAnalysisService
, which implements the Kafka Streams API and compares product popularity over time periods:
@Service
public class KafkaStreamAnalysisService {
private final Properties streamsConfig;
public KafkaStreamAnalysisService(@Qualifier("kafkaStreamAnalysisProperties") Properties streamsConfig) {
this.streamsConfig = streamsConfig;
initializeStream();
}
private void initializeStream() {
StreamsBuilder builder = new StreamsBuilder();
// Define Serde for POJO
JsonSerde<TrackingEvent> pastProductInteractionSerde = new JsonSerde<>(TrackingEvent.class);
pastProductInteractionSerde.configure(serdeConfig(), false);
// Read past-product-interactions
KStream<String, TrackingEvent> interactionStream = builder.stream(
"past-product-interactions",
Consumed.with(Serdes.String(), pastProductInteractionSerde)
);
// Windowed aggregation: compare product popularity over time periods
KTable<Windowed<String>, Long> interactionCounts = interactionStream
.groupBy((key, value) -> value.getProductId(), Grouped.with(Serdes.String(), pastProductInteractionSerde))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(30))) // Tumbling window of 1 month
.count(Materialized.with(Serdes.String(), Serdes.Long()));
// Print results
interactionCounts.toStream().foreach((key, count) -> {
System.out.printf("Product: %s, Count: %d, Start: %s, End: %s%n",
key.key(),
count,
key.window().startTime(),
key.window().endTime()
);
});
KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
streams.start();
// Graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
private Map<String, Object> serdeConfig() {
Map<String, Object> config = new HashMap<>();
config.put(SerdeConfig.REGISTRY_URL, "http://apicurio-registry:8080/");
return config;
}
}
The Kafka Streams application reads events from the Kafka topic “past-product-interactions,” which is populated using Kafka Connect. It uses windowed aggregation to get the amount of product hits for each month.
Your Kafka Streams application is already running in the background, integrated with your webshop
application. The Kafka Connect setup is also ready, so you can check the logs to view the processed data output by running the following command:
docker logs -d webshop
The output of the application should be as follows:
…output omitted…
Product: 2, Count: 12, Start: 2025-01-12T00:00:00Z, End: 2025-02-11T00:00:00Z
Product: 12, Count: 16, Start: 2025-01-12T00:00:00Z, End: 2025-02-11T00:00:00Z
Product: 15, Count: 60, Start: 2025-01-12T00:00:00Z, End: 2025-02-11T00:00:00Z
Product: 11, Count: 33, Start: 2025-01-12T00:00:00Z, End: 2025-02-11T00:00:00Z
Product: 1, Count: 24, Start: 2025-01-12T00:00:00Z, End: 2025-02-11T00:00:00Z
…output omitted…
Note: This tutorial demonstrates how to integrate a simple stream processing solution. However, for real-world scenarios involving large volumes of historical data, tools like Apache Spark are recommended due to their superior performance and optimization for such tasks.
Data retention and cleanup
With historical data stored in S3, you don’t need to keep all events in Kafka indefinitely. You can use time-based retention to automatically delete records older than a specified period. For example, you could delete any records older than seven days (604,800,000 ms):
kafka-topics.sh --alter --topic product-interactions --partitions 1 --config retention.ms=604800000 --bootstrap-server localhost:9092
kafka-topics.sh --alter --topic product-scores --partitions 1 --config retention.ms=604800000 --bootstrap-server localhost:9092
You could also use size-based retention to delete records once a topic exceeds a defined size limit.
You can further improve your data storage efficiency with log compaction, which maintains only the most recent version of each message key while removing outdated records. This is especially useful for your product-scores
topic, where you only need the latest score for each product. You can configure the topic to enable log compaction with the following command:
kafka-topics.sh --alter --topic product-scores --config cleanup.policy=compact --bootstrap-server localhost:9092
This configuration makes sure that only the most current event per product ID is retained.
Conclusion
In this tutorial, you learned how to implement real-time website activity tracking using Apache Kafka by creating a system that monitors user behavior while maintaining site performance, which is crucial in production environments. You explored Kafka Streams for data processing and manipulation, achieving professional analytics without overly complex functionality. This approach offers flexibility, maintainability, and access to the statistical data you need.
You can find the full code for this tutorial on GitHub.
If you’re looking for a more streamlined implementation, Redpanda offers an excellent alternative. As a Kafka-compatible, high-performance streaming platform, Redpanda simplifies event streaming significantly. With Redpanda Serverless, you can build real-time activity tracking systems with minimal operational overhead while maintaining the scalability and efficiency needed for processing website events in real time. This makes it an ideal choice for teams looking to implement robust tracking solutions without managing complex infrastructure.