Publish and subscribe to events with Redis pub/sub

Completed

Both Azure Cache for Redis and Azure Managed Redis provide powerful messaging capabilities through the publish/subscribe (pub/sub) pattern, enabling real-time, asynchronous communication between different parts of your AI application. This messaging model allows publishers to send messages to named channels without knowing which subscribers, if any, are listening. Subscribers can listen to one or more channels and receive messages as they're published, creating a loosely coupled, event-driven AI architecture.

Imagine you're building an intelligent customer service system with multiple AI components: a conversation analyzer, sentiment detector, knowledge retrieval system, and response generator. Without pub/sub, each component would need to constantly poll for new messages or wait for direct API calls, creating bottlenecks and delays in AI processing. With Redis pub/sub (available in both Azure services), when a new customer message arrives, the message ingestion service publishes an event to the "conversations:new" channel. The sentiment analyzer, intent classifier, and context manager can all subscribe to this channel and immediately begin parallel processing, enabling real-time AI responses and seamless coordination between AI services without tight coupling.

How publish/subscribe works

The pub/sub pattern operates on a simple but powerful concept: channels. Publishers send messages to named channels, and subscribers listen to channels of interest. When a message is published to a channel, Redis immediately delivers it to all active subscribers of that channel.

Key characteristics

Redis pub/sub has several important characteristics that determine when and how to use it effectively:

  • At-most-once delivery: Messages are delivered to subscribers that are actively connected when the message is published. There are no delivery guarantees if a subscriber is temporarily disconnected.

  • No message persistence: Messages exist only in memory while being distributed to subscribers. Once delivered, they're not stored by Redis, making this pattern ideal for real-time notifications but unsuitable for scenarios requiring message durability.

  • Fire-and-forget: Publishers don't know if any subscribers received their messages. This decoupling is powerful for scalability but means you can't rely on delivery confirmation.

  • High throughput: Redis can handle high message volumes across thousands of channels, with performance scaling based on your Azure service tier and configuration. Making it excellent for high-volume, low-latency messaging scenarios.

Channel naming and patterns

Channels in Redis are simply strings, giving you flexibility in organizing your AI messaging architecture:

ai:conversations:123       # Conversation-specific AI events
ml:model:predictions       # ML model prediction results
ai:training:status         # Model training status updates
embeddings:cache:refresh   # Vector embedding cache updates

Redis also supports pattern matching for subscriptions, allowing AI components to listen to multiple related channels:

ai:conversations:*        # All conversation AI events
ml:models:*:status       # All model status updates
ai:*:predictions        # All AI prediction events

Common use cases for pub/sub

Redis pub/sub shines in AI applications where components need to react immediately to events without blocking or polling. The following scenarios demonstrate how AI systems can use pub/sub to achieve better performance, scalability, and responsiveness while maintaining loose coupling between services.

  • Real-time AI coordination: Pub/sub excels at coordinating multiple AI components simultaneously. When new data arrives for processing, model training completes, or AI predictions are ready, a single publish operation can notify all interested AI services instantly.

  • AI model and embedding cache invalidation: When AI models are updated or embeddings are refreshed, you can publish invalidation messages to ensure all AI services clear stale cached predictions and vectors simultaneously.

  • Event-driven AI services: AI microservices can communicate through pub/sub without direct dependencies, enabling loose coupling and better scalability for AI workloads.

  • AI system monitoring and model performance tracking: AI applications can publish model performance metrics, prediction accuracy, and resource utilization to monitoring channels that alerting and optimization systems subscribe to.

Limitations to consider

While pub/sub is powerful for real-time messaging, it has important limitations:

  • No message durability: Messages that arrive when no subscribers are listening are lost forever. If a subscriber disconnects temporarily due to network issues or restarts, it misses messages published during the offline period.

  • No delivery guarantees: Publishers don't know if messages were successfully delivered. If all subscribers are offline or there's a network partition, messages are discarded.

  • Memory-only operation: Since messages aren't persisted to disk, Redis pub/sub isn't suitable for scenarios requiring message durability, ordered processing, or replay capabilities.

  • No backpressure handling: If subscribers can't keep up with message volume, Redis continues delivering messages, potentially causing memory issues or message loss in the subscriber applications.

When to use pub/sub

Understanding when to choose Redis pub/sub versus other messaging solutions is crucial for building effective AI systems.

Use Redis pub/sub for AI solutions when you need:

  • Real-time coordination between AI services with sub-millisecond latency
  • Broadcasting model updates or predictions to multiple AI components simultaneously
  • Loose coupling between AI microservices and ML pipelines
  • High-throughput messaging for AI events and model predictions
  • Simple event distribution for AI workflows without complex routing
  • Parallel processing triggers for multiple AI models

Code examples for pub/sub

This section provides code examples showing how to implement pub/sub in your AI applications. All examples in this section assume a Redis client connection named redis_client is implemented.

Publishing events from your application

Use PUBLISH to send messages to channels. Publishers are typically part of your API handlers or services that detect events worth broadcasting.

# Publish a cache invalidation event
def notify_model_updated(model_name, version):
    message = f"{model_name}:{version}"
    redis_client.publish('ai:models:updated', message)
    
# Publish embedding refresh notification
def notify_embeddings_refreshed(collection_id):
    redis_client.publish('ai:embeddings:refresh', collection_id)

Subscribing to channels in worker services

Use SUBSCRIBE to listen to specific channels. Subscribers run in background services or worker processes that react to events.

# Subscribe to specific channels
pubsub = redis_client.pubsub()
pubsub.subscribe('ai:models:updated', 'ai:embeddings:refresh')

# Listen for messages
for message in pubsub.listen():
    if message['type'] == 'message':
        channel = message['channel']
        data = message['data']
        
        if channel == 'ai:models:updated':
            # Clear model cache
            clear_model_cache(data)
        elif channel == 'ai:embeddings:refresh':
            # Refresh embedding cache
            refresh_embeddings(data)

Using pattern subscriptions

Use PSUBSCRIBE to subscribe to multiple channels with pattern matching. This is useful when you want to listen to all channels in a category.

# Subscribe to all AI-related channels
pubsub = redis_client.pubsub()
pubsub.psubscribe('ai:*')

# Handle messages from any AI channel
for message in pubsub.listen():
    if message['type'] == 'pmessage':
        pattern = message['pattern']  # 'ai:*'
        channel = message['channel']  # Actual channel name
        data = message['data']
        
        print(f"Received on {channel}: {data}")
        handle_ai_event(channel, data)

Broadcasting to WebSocket clients

Combine pub/sub with WebSocket connections to push real-time AI updates to browser clients.

from fastapi import FastAPI, WebSocket
import redis.asyncio as redis
import asyncio

app = FastAPI()

# Background task to listen for Redis pub/sub messages
async def redis_listener(websocket: WebSocket):
    redis_client = redis.Redis(
        host='your-redis.redis.cache.windows.net',
        port=<PORT>,
        password='your-access-key',
        ssl=True,
        decode_responses=True
    )
    
    async with redis_client.pubsub() as pubsub:
        await pubsub.subscribe('ai:predictions:ready')
        
        async for message in pubsub.listen():
            if message['type'] == 'message':
                # Forward Redis message to WebSocket client
                await websocket.send_json({
                    'type': 'prediction',
                    'data': message['data']
                })

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await websocket.accept()
    await redis_listener(websocket)

Additional resources