Building efficient workflows: Asynchronous Request-Reply pattern

Learn how it works, popular use cases, and how to implement it in an async Python app

By
on
July 2, 2024

Many modern applications and services often depend on remote APIs to provide business logic and compose functionality. These API calls commonly occur over the HTTP protocol and follow request–response semantics. However, not all APIs can respond quickly enough to send a synchronous reply over the same connection, especially when the backend processing is long-running or exceeds the scope of an HTTP request.

Enter the asynchronous request-reply pattern.

The asynchronous request-reply pattern enables a client to send a request to a server or service and continue with other processing without waiting for the reply. The server processes the request at its own pace and responds when ready, which the client can handle at its convenience. This non-blocking processing is essential in modern applications, where waiting for synchronous replies can significantly hinder performance and the user experience.

In this post, you'll learn more about the asynchronous request-reply pattern and when to use it. You'll also learn how to build an async request-reply pattern using Redpanda —the simplest Apache Kafka® alternative.

Asynchronous Request-Reply pattern use cases

The asynchronous request-reply pattern has broad uses across various types of apps and services:

  • Microservices: In a microservices architecture, different services often need to communicate while maintaining independence and scalability. For instance, an e-commerce application might have separate services for user authentication, inventory management, and order processing. When a user places an order, the order service sends an asynchronous request to the inventory service to check stock. While waiting for the reply, the order service can continue processing other requests, thus improving both throughput and the user experience.
  • IoT signaling: Internet of Things (IoT) devices frequently communicate with central servers or other devices, often over unreliable networks with varying latency. The asynchronous request-reply pattern allows IoT devices to operate efficiently, sending data (like sensor readings) to a server and continuing their operations without waiting for a response. This pattern is especially useful for scenarios where IoT devices are battery-powered or need to perform critical functions continuously without interruption.
  • Storage services: Cloud-based storage services often employ this pattern to efficiently manage data operations. When a user requests to upload or retrieve a large file, the service responds with an acknowledgment and processes the request asynchronously. This approach allows the storage service to handle many requests concurrently, optimizing resource usage and reducing wait times for users.
  • Real-time data processing: Real-time data processing is essential in fields like finance and analytics. Here, the asynchronous request-reply pattern is used to handle streaming data. For example, a financial trading platform might use this pattern to process market data and execute trades. Data streams are continuously analyzed, and trade orders are placed without the need for synchronous, block-by-block data processing. Replies can be checked later to ensure orders are completed successfully. This results in high throughput and low latency, which are critical for maintaining competitiveness in real-time trading environments.

In each of these scenarios, the asynchronous request-reply pattern enhances the system's ability to handle multiple requests concurrently, maintain high throughput, and ensure responsiveness, all while decoupling various system components.

Asynchronous Request-Reply with Redpanda

These use cases have a common requirement to decouple the application making requests from the application that processes the request and creates a reply. This decoupling typically requires a message queue or streaming data platform to send data between applications.

Apache Kafka is a popular choice for teams that need a scalable way to send messages and stream data between applications. However, Redpanda is a powerful alternative that's significantly easier to deploy, operate, and maintain.

Redpanda is a streaming data platform that simplifies the implementation of the asynchronous request-reply pattern and offers a more streamlined and efficient approach to handling real-time data streams.

The easiest way to build a better understanding of the asynchronous request-reply pattern is to see it in action. Let's learn how to build a complete web application that uses the asynchronous request-reply pattern using FastAPI and Redpanda.

Redpanda Asynchronous Request-Reply in Python

Now it's time for a hands-on example of the asynchronous request-reply pattern. You'll create a small web application that uses a microservice to convert color images to grayscale.

Imagine you work for a real estate company. The company hires photographers to take photos of properties for sale, but the photos require processing before you can publish them on the web. In many cases, realtors who work for your company have noticed that publishing tasteful black-and-white photos of a property grabs home buyers' attention and results in the property selling more quickly.

Unfortunately, processing the images manually in Photoshop is time-consuming. You know you can automate the process using Python, but you also know that image processing is CPU-intensive and worry that a traditional web app will not be able to keep up with hundreds of realtors submitting images to be processed and synchronously waiting for a response.

You realize this is a perfect fit for the asynchronous request-reply pattern and design an application that uses Python, FastAPI, and Redpanda to offload image processing to a separate service and reply to the user asynchronously when their processed image is ready:

Architecture diagram

Architecture diagram

Prerequisites

You'll need a few prerequisites to complete the tutorial app:

  • Python 3.8 or greater.
  • A running instance of Redpanda. If you don't yet have one, there are a couple of easy options:

Creating Redpanda Topics

The first step in creating the app is setting up Redpanda topics. You'll need two topics for this app: image-request and image-reply. If you're new to Redpanda and/or Kafka, think of a topic as a queue that can receive data and pass it on to waiting consumers.

If you started Redpanda with Docker Compose, you can create new topics by loading the web console, clicking Topics in the menu, and using the Create Topic button:

Redpanda web UI Topics page

Otherwise, follow the instructions for creating a topic using the Redpanda CLI or Redpanda Cloud.

Building the application

If you'd like to see how everything fits together before you begin, you can find the complete code for this example on GitHub.

Start by opening a terminal, then create a directory to hold the app and navigate to the directory. Here, you'll create subdirectories to hold the app's web assets and images:

mkdir redpanda-async-request-reply &\ 
cd redpanda-async-request-reply &\ 
mkdir static &\ 
mkdir static/images

Then, create a virtual environment by running:

python -m venv ./env/bin

Next, activate it by running:

source ./env/bin/activate

After activating your virtual environment, use pip to install the packages required to run the web app and microservice:

pip install fastapi[all] websockets aiokafka pillow

This pip command installs:

Open the redpanda-async-request-reply folder in a text editor or IDE of your choice and create a file named server.py. Start by adding code to it that imports dependencies and defines a few constants:

import asyncio
import os 

from contextlib import asynccontextmanager
from fastapi import FastAPI, WebSocket, UploadFile, File
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer

redpanda_server = "localhost:9092"  # Replace with your Redpanda server address
request_topic = "image-request"
reply_topic = "image-reply"

If you're using Redpanda Cloud or running your own instance of Redpanda somewhere other than your own computer, update the redpanda_server address accordingly.

Since Redpanda has a Kafka-compatible API, the aiokafka library lets you send messages to and receive messages from Redpanda topics. You'll need to create an AIOKafkaProducer for sending image processing requests to one Redpanda topic and an AIOKafkaConsumer for receiving replies about processed images.

aiokafka must run in the same event loop as the FastAPI app; otherwise, it will not work. This means the producer and consumer can't be created when the server.py script first runs. Instead, you must create a lifespan function to create both objects after FastAPI has created its event loop.

Create the lifespan function by adding the following code snippet to server.py:

@asynccontextmanager
async def startup(app):
    app.producer = AIOKafkaProducer(bootstrap_servers=[kafka_server])
    await app.producer.start()

    app.consumer = AIOKafkaConsumer(
        reply_topic,
        group_id="image-reply-group"
    )
    await app.consumer.start()

    yield

app = FastAPI(lifespan=startup)

With the app created, the only remaining step is adding route and WebSocket handlers to server.py:

# Mount the static directory to serve static files
app.mount("/static", StaticFiles(directory="static"), name="static")

# Redirect root URL to the static index.html
@app.get("/")
async def read_index():
    return FileResponse('static/index.html')

# Endpoint to upload an image
@app.post("/upload-image/")
async def upload_image(file: UploadFile = File(...)):
    # Save the file
    current_dir = os.path.dirname(os.path.realpath(__file__))
    file_location = os.path.join(current_dir, f"static/images/{file.filename}")
    print(f"Saving file to {file_location}")
    with open(file_location, "wb") as file_object:
        file_object.write(await file.read())

    # Send filename to Redpanda
    await app.producer.send(request_topic, file.filename.encode('utf-8'))
    return {"filename": file.filename}

# WebSocket endpoint
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()

    async def send_message_to_websocket(msg):
        await websocket.send_text(msg.value.decode('utf-8'))

    async def consume_from_topic(topic, callback):
        print(f"Consuming from {topic}")
        async for msg in app.consumer:
            print(f"Received message: {msg.value.decode('utf-8')}")
            await callback(msg)

    # Start consuming
    asyncio.create_task(consume_from_topic(reply_topic, send_message_to_websocket))

    # Keep the connection open
    while True:
        await asyncio.sleep(10)

In this code, the await app.producer.send(request_topic, file.filename.encode('utf-8')) call sends an image processing request, and the consume_from_topic(reply_topic, send_message_to_websocket) call consumes image processing replies.

Also note the use of a WebSocket, which is useful when using the asynchronous request-reply pattern because it provides an easy way to send a reply back to the user as soon as it is ready. Without a WebSocket or other persistent connection, the user might need to periodically poll for results to see if a reply is waiting for an earlier request.

server.py now contains all the Python code needed for the FastAPI backend. The only thing left to do is add a simple web interface for interacting with the app by adding two more files.

Create a file in the static subdirectory named index.html and add this code to it. Then, add a file named app.js in the static subdirectory and add this JavaScript code that uploads images to the API and receives replies from the WebSocket.

Covering the web UI in detail is beyond the scope of this tutorial, but the above are plain HTML and JavaScript files that provide a friendly way to observe the asynchronous request-reply pattern in action.

Finally, create a file named image-service.py for the code that will mimic a microservice that converts images to grayscale. Then, start by adding code to the new file to import dependencies and define constants, just as you did in the server.py file:

import asyncio
import os 

from PIL import Image, ImageOps
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer

redpanda_server = "localhost:9092"  # Replace with your Redpanda server address
request_topic = "image-request"
reply_topic = "image-reply"

current_dir = os.path.dirname(os.path.realpath(__file__))
images_path = os.path.join(current_dir, "static/images")

Next, add a function that continuously reads from the request topic, followed by a function that writes data to a Redpanda topic:

async def read_requests():
    consumer = AIOKafkaConsumer(
        request_topic,
        bootstrap_servers=redpanda_server,
        group_id="image-process-group"
    )
    await consumer.start()
    try:
        async for msg in consumer:
            filename = msg.value.decode('utf-8')
            await process_image(filename)
    finally:
        await consumer.stop()

async def send_to_topic(topic, message):
    producer = AIOKafkaProducer(bootstrap_servers=redpanda_server)
    await producer.start()
    try:
        await producer.send_and_wait(topic, message.encode('utf-8'))
    finally:
        await producer.stop()

These should look familiar because they're very similar to the code in server.py. The difference here is that instead of writing to the request topic and consuming the reply topic, you are consuming the request topic, processing the request, and then writing to the reply topic.

Add the following function to process each image file name received, followed by a couple of lines of code:

async def process_image(filename):
    try:
        with Image.open(os.path.join(images_path, filename)) as img:
            grayscale = ImageOps.grayscale(img)
            new_filename = f"desaturated_{filename}"
            grayscale.save(os.path.join(images_path, new_filename))
            print(f"Processed: {new_filename}")
            # Send new filename to Redpanda
            await send_to_topic(reply_topic, new_filename)
    except Exception as e:
        print(f"Error processing {filename}: {e}")

if __name__ == "__main__":
    print("Starting image service...")
    asyncio.run(read_requests())

This microservice works with the code in the FastAPI server app to process images to grayscale asynchronously and reply to the user whenever the task finishes. This process can be almost instant for small images but could take up to a second or more for very large images.

With all the code in place, it's time to run both apps. Open a terminal in the redpanda-async-request-reply directory and run:

uvicorn server:app

The uvicorn server gets installed automatically alongside FastAPI, and when you run this command, it starts your web service on port 8000.

Open another terminal in the redpanda-async-request-reply directory and run:

python image-service.py

When both are up and running, open http://localhost:8000 in a web browser. You'll see a file selection box and a "Process Image" button:

Demo app startup screen

Select an image, then click the button to process it. After a small delay, a copy of the image should appear to the right in grayscale instead of color. Try it with several more images, and you should see that they get added as a grid on the right side of the page:

Processed image grid

With that, you now have a complete working application that demonstrates the asynchronous request-response pattern!

Conclusion

The asynchronous request-response pattern is a powerful way of decoupling applications and services. You learned why the pattern is useful and how to implement it for yourself in a modern async Python application using Redpanda. Remember, you can find all of this app's code on GitHub.

To keep exploring Redpanda, check the documentation and browse the Redpanda blog for more tutorials. If you have questions, join the Redpanda Community on Slack.

Graphic for downloading streaming data report
Save Your Spot

Related articles

VIEW ALL POSTS
Build an inventory monitoring system with Flink and MongoDB
Rexford A. Nyarko
&
&
&
October 29, 2024
Text Link
Building a real-time Customer 360 solution for Telco with Flink
Artem Oppermann
&
&
&
October 1, 2024
Text Link
Build a blazing fast real-time dashboard with serverless technologies
Nico Acosta
&
&
&
August 29, 2024
Text Link