Redis Pub/Sub

Built-in messaging feature of Redis. Provides lightweight publish-subscribe pattern. Optimal for real-time notifications, chat, and live updates.

Message BrokerPub/SubReal-timeAsynchronous MessagingHigh PerformanceEvent-drivenLightweightScalable

Server

Redis Pub/Sub

Overview

Redis Pub/Sub is a built-in real-time messaging feature in Redis that implements the Publish/Subscribe pattern, providing a simple yet high-performance messaging system. Since its introduction in 2009, it has been widely adopted for real-time communication, event-driven architectures, chat applications, and live notification systems. Leveraging Redis's inherent speed, it delivers messages with millisecond-level low latency and efficiently handles large numbers of publishers and subscribers while maintaining a lightweight footprint. Redis 7.0 introduced Sharded Pub/Sub functionality that significantly improves scalability in cluster environments, optimizing message distribution for large-scale distributed systems.

Details

Redis Pub/Sub 2025 edition has evolved as the definitive solution for real-time messaging through its simplicity and high performance. Beyond basic fire-and-forget message delivery, pattern matching capabilities enable flexible channel subscriptions, with wildcard support for dynamic channel management. Sharded Pub/Sub introduced in Redis 7.0 optimizes message distribution within clusters by assigning channels to slots, improving from traditional all-node broadcasting to efficient shard-based delivery. The design prioritizes real-time performance and lightweight operation over persistence and message guarantees, making it particularly powerful for speed-critical applications such as IoT data streaming, real-time game synchronization, financial transaction notifications, and any scenario where immediate delivery is paramount.

Key Features

  • Real-time Delivery: Millisecond-level low latency messaging
  • Pattern Matching: Flexible channel subscription using wildcards
  • Lightweight Design: Memory-efficient implementation with minimal overhead
  • High Scalability: Efficient handling of numerous publishers and subscribers
  • Sharded Pub/Sub: Optimized message distribution in cluster environments
  • Simple API: Intuitive and easy-to-learn command structure

Pros and Cons

Pros

  • Extremely fast message delivery performance and real-time capabilities
  • Memory-efficient messaging leveraging Redis's lightweight nature
  • Rapid development and easy maintenance through simple API
  • Rich ecosystem of client libraries supporting multiple languages
  • Dynamic and flexible channel management through pattern matching
  • Complete integration with existing Redis infrastructure reducing deployment costs

Cons

  • Limited message persistence and delivery guarantee (fire-and-forget design)
  • Risk of message loss when no subscribers are present
  • Lack of complex message routing capabilities
  • No transaction support or dead letter queue functionality
  • Not suitable for large message processing
  • Limited stream processing capabilities compared to systems like Kafka

Reference Pages

Code Examples

Basic Installation and Setup

# Redis server installation (Ubuntu/Debian)
sudo apt update
sudo apt install redis-server

# CentOS/RHEL/Fedora
sudo dnf install redis

# macOS (using Homebrew)
brew install redis

# Start Redis server
redis-server

# Connect to Redis CLI in another terminal
redis-cli

# Check Redis server status
redis-cli ping
# Should return PONG

# Docker usage
docker run --name redis-pubsub \
  -p 6379:6379 \
  -d redis:7-alpine

# Docker Compose configuration
cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    container_name: redis-pubsub
    ports:
      - "6379:6379"
    command: redis-server --appendonly yes
    volumes:
      - redis-data:/data
    restart: unless-stopped

volumes:
  redis-data:
EOF

docker-compose up -d

Basic Pub/Sub Operations (Redis CLI)

# Channel subscription (subscriber)
# Execute in Terminal 1
redis-cli
SUBSCRIBE news sport technology

# Multiple channel subscription
SUBSCRIBE chat:room1 chat:room2 chat:room3

# Pattern subscription (using wildcards)
PSUBSCRIBE news:*
PSUBSCRIBE chat:room*
PSUBSCRIBE user:*:notifications

# Message publishing (publisher)
# Execute in Terminal 2
redis-cli
PUBLISH news "Breaking: New technology breakthrough announced"
PUBLISH sport "Football match result: Team A 3-1 Team B"
PUBLISH chat:room1 "Hello everyone in room 1!"

# Publishing to pattern-matching channels
PUBLISH news:tech "Latest in AI development"
PUBLISH news:sports "Olympic games update"
PUBLISH user:123:notifications "You have a new message"

# Unsubscribe operations
UNSUBSCRIBE news sport
PUNSUBSCRIBE news:*

# Channel information
PUBSUB CHANNELS           # List active channels
PUBSUB NUMSUB news sport  # Subscriber count for specific channels
PUBSUB NUMPAT             # Pattern subscription count

# Sharded Pub/Sub (Redis 7.0+)
SSUBSCRIBE orders:shard1  # Shard subscription
SPUBLISH orders:shard1 "Order #12345 processed"  # Shard publishing
SUNSUBSCRIBE orders:shard1  # Shard unsubscription

Python Implementation (redis-py)

# pip install redis

import redis
import json
import threading
import time
from datetime import datetime

class RedisPubSubManager:
    def __init__(self, host='localhost', port=6379, db=0, password=None):
        self.redis_client = redis.Redis(
            host=host,
            port=port,
            db=db,
            password=password,
            decode_responses=True,
            socket_keepalive=True,
            socket_keepalive_options={}
        )
        self.pubsub = self.redis_client.pubsub()
        self.subscribed_channels = set()
        self.message_handlers = {}
        
    def publish_message(self, channel, message, message_type='text'):
        """Publish message"""
        try:
            if isinstance(message, dict):
                message_data = {
                    'type': message_type,
                    'timestamp': datetime.now().isoformat(),
                    'data': message
                }
                message_str = json.dumps(message_data)
            else:
                message_str = str(message)
            
            # Publish message
            receivers = self.redis_client.publish(channel, message_str)
            print(f"Message published to '{channel}': {receivers} receivers")
            return receivers
            
        except Exception as e:
            print(f"Error publishing message: {e}")
            return 0
    
    def subscribe_channel(self, channel, message_handler=None):
        """Subscribe to channel"""
        try:
            self.pubsub.subscribe(channel)
            self.subscribed_channels.add(channel)
            
            if message_handler:
                self.message_handlers[channel] = message_handler
                
            print(f"Subscribed to channel: {channel}")
            
        except Exception as e:
            print(f"Error subscribing to channel {channel}: {e}")
    
    def subscribe_pattern(self, pattern, message_handler=None):
        """Subscribe to pattern"""
        try:
            self.pubsub.psubscribe(pattern)
            
            if message_handler:
                self.message_handlers[pattern] = message_handler
                
            print(f"Subscribed to pattern: {pattern}")
            
        except Exception as e:
            print(f"Error subscribing to pattern {pattern}: {e}")
    
    def default_message_handler(self, message):
        """Default message handler"""
        if message['type'] == 'message':
            channel = message['channel']
            data = message['data']
            
            # Try parsing JSON format
            try:
                parsed_data = json.loads(data)
                print(f"[{channel}] {parsed_data}")
            except json.JSONDecodeError:
                print(f"[{channel}] {data}")
                
        elif message['type'] == 'pmessage':
            pattern = message['pattern']
            channel = message['channel']
            data = message['data']
            print(f"[Pattern: {pattern}] [{channel}] {data}")
            
        elif message['type'] in ['subscribe', 'psubscribe']:
            print(f"Subscribed to {message['channel']}, total: {message['data']} subscriptions")
            
        elif message['type'] in ['unsubscribe', 'punsubscribe']:
            print(f"Unsubscribed from {message['channel']}, remaining: {message['data']} subscriptions")
    
    def start_listening(self, timeout=1.0):
        """Start message reception"""
        print("Starting message listener...")
        
        try:
            for message in self.pubsub.listen():
                if message is not None:
                    # Check for channel-specific handlers
                    channel = message.get('channel')
                    pattern = message.get('pattern')
                    
                    handler = None
                    if channel and channel in self.message_handlers:
                        handler = self.message_handlers[channel]
                    elif pattern and pattern in self.message_handlers:
                        handler = self.message_handlers[pattern]
                    
                    if handler:
                        try:
                            handler(message)
                        except Exception as e:
                            print(f"Error in message handler: {e}")
                    else:
                        self.default_message_handler(message)
                        
        except KeyboardInterrupt:
            print("\nStopping message listener...")
            self.close()
    
    def unsubscribe_all(self):
        """Unsubscribe from all channels"""
        try:
            self.pubsub.unsubscribe()
            self.pubsub.punsubscribe()
            self.subscribed_channels.clear()
            self.message_handlers.clear()
            print("Unsubscribed from all channels and patterns")
            
        except Exception as e:
            print(f"Error unsubscribing: {e}")
    
    def get_channel_info(self, *channels):
        """Get channel information"""
        try:
            # Active channels list
            active_channels = self.redis_client.pubsub_channels()
            print(f"Active channels: {active_channels}")
            
            # Subscriber count for specific channels
            if channels:
                subscriber_counts = self.redis_client.pubsub_numsub(*channels)
                for channel, count in subscriber_counts:
                    print(f"Channel '{channel}': {count} subscribers")
            
            # Pattern subscription count
            pattern_count = self.redis_client.pubsub_numpat()
            print(f"Pattern subscriptions: {pattern_count}")
            
        except Exception as e:
            print(f"Error getting channel info: {e}")
    
    def close(self):
        """Close connection"""
        try:
            self.pubsub.close()
            self.redis_client.close()
            print("Redis Pub/Sub connection closed")
            
        except Exception as e:
            print(f"Error closing connection: {e}")

# Usage examples and sample applications
def user_notification_handler(message):
    """User notification specific handler"""
    try:
        data = json.loads(message['data'])
        user_id = data['data'].get('user_id')
        notification_type = data['data'].get('type')
        content = data['data'].get('content')
        
        print(f"🔔 USER {user_id} | {notification_type.upper()}: {content}")
        
    except Exception as e:
        print(f"Error processing user notification: {e}")

def chat_message_handler(message):
    """Chat message specific handler"""
    try:
        data = json.loads(message['data'])
        username = data['data'].get('username')
        content = data['data'].get('message')
        
        print(f"💬 {username}: {content}")
        
    except Exception as e:
        print(f"Error processing chat message: {e}")

# Publisher example
def publisher_example():
    manager = RedisPubSubManager()
    
    try:
        # Publish various message types
        # News publishing
        manager.publish_message('news:tech', {
            'title': 'AI breakthrough in 2024',
            'content': 'New AI model achieves human-level performance',
            'category': 'technology'
        }, 'news')
        
        # Chat message publishing
        manager.publish_message('chat:general', {
            'username': 'alice',
            'message': 'Hello everyone!',
            'room': 'general'
        }, 'chat')
        
        # User notification publishing
        manager.publish_message('user:notifications', {
            'user_id': 12345,
            'type': 'message',
            'content': 'You have a new private message'
        }, 'notification')
        
        # System notification publishing
        manager.publish_message('system:alerts', {
            'level': 'warning',
            'message': 'High CPU usage detected',
            'timestamp': datetime.now().isoformat()
        }, 'system_alert')
        
    finally:
        manager.close()

# Subscriber example
def subscriber_example():
    manager = RedisPubSubManager()
    
    try:
        # Individual channel subscriptions
        manager.subscribe_channel('chat:general', chat_message_handler)
        manager.subscribe_channel('user:notifications', user_notification_handler)
        
        # Pattern subscriptions
        manager.subscribe_pattern('news:*')
        manager.subscribe_pattern('system:*')
        
        # Check channel information
        manager.get_channel_info('chat:general', 'user:notifications')
        
        # Start message reception
        manager.start_listening()
        
    except KeyboardInterrupt:
        print("\nShutting down subscriber...")
    finally:
        manager.unsubscribe_all()
        manager.close()

if __name__ == "__main__":
    import sys
    
    if len(sys.argv) > 1 and sys.argv[1] == 'publish':
        print("Running publisher example...")
        publisher_example()
    else:
        print("Running subscriber example (Ctrl+C to stop)...")
        subscriber_example()

Node.js Implementation (ioredis)

// npm install ioredis

const Redis = require('ioredis');

class RedisPubSubManager {
    constructor(options = {}) {
        this.publisherClient = new Redis({
            host: options.host || 'localhost',
            port: options.port || 6379,
            db: options.db || 0,
            password: options.password,
            retryDelayOnFailover: 100,
            enableReadyCheck: false,
            maxRetriesPerRequest: null,
        });
        
        this.subscriberClient = new Redis({
            host: options.host || 'localhost',
            port: options.port || 6379,
            db: options.db || 0,
            password: options.password,
            retryDelayOnFailover: 100,
            enableReadyCheck: false,
            maxRetriesPerRequest: null,
        });
        
        this.messageHandlers = new Map();
        this.setupErrorHandling();
    }
    
    setupErrorHandling() {
        this.publisherClient.on('error', (err) => {
            console.error('Publisher Redis Error:', err);
        });
        
        this.subscriberClient.on('error', (err) => {
            console.error('Subscriber Redis Error:', err);
        });
        
        this.publisherClient.on('connect', () => {
            console.log('Publisher connected to Redis');
        });
        
        this.subscriberClient.on('connect', () => {
            console.log('Subscriber connected to Redis');
        });
    }
    
    async publishMessage(channel, message, metadata = {}) {
        try {
            const messageData = {
                type: metadata.type || 'text',
                timestamp: new Date().toISOString(),
                source: metadata.source || 'node-app',
                data: message
            };
            
            const receiverCount = await this.publisherClient.publish(
                channel, 
                JSON.stringify(messageData)
            );
            
            console.log(`📤 Published to '${channel}': ${receiverCount} receivers`);
            return receiverCount;
            
        } catch (error) {
            console.error(`Error publishing message to ${channel}:`, error);
            return 0;
        }
    }
    
    subscribeChannel(channel, messageHandler) {
        try {
            this.subscriberClient.subscribe(channel);
            
            if (messageHandler) {
                this.messageHandlers.set(channel, messageHandler);
            }
            
            console.log(`📥 Subscribed to channel: ${channel}`);
            
        } catch (error) {
            console.error(`Error subscribing to channel ${channel}:`, error);
        }
    }
    
    subscribePattern(pattern, messageHandler) {
        try {
            this.subscriberClient.psubscribe(pattern);
            
            if (messageHandler) {
                this.messageHandlers.set(pattern, messageHandler);
            }
            
            console.log(`📥 Subscribed to pattern: ${pattern}`);
            
        } catch (error) {
            console.error(`Error subscribing to pattern ${pattern}:`, error);
        }
    }
    
    startListening() {
        // Regular channel messages
        this.subscriberClient.on('message', (channel, message) => {
            this.handleMessage('message', channel, null, message);
        });
        
        // Pattern match messages
        this.subscriberClient.on('pmessage', (pattern, channel, message) => {
            this.handleMessage('pmessage', channel, pattern, message);
        });
        
        // Subscription confirmations
        this.subscriberClient.on('subscribe', (channel, count) => {
            console.log(`✅ Subscribed to '${channel}', total subscriptions: ${count}`);
        });
        
        this.subscriberClient.on('psubscribe', (pattern, count) => {
            console.log(`✅ Pattern subscribed to '${pattern}', total subscriptions: ${count}`);
        });
        
        // Unsubscription confirmations
        this.subscriberClient.on('unsubscribe', (channel, count) => {
            console.log(`❌ Unsubscribed from '${channel}', remaining: ${count}`);
        });
        
        this.subscriberClient.on('punsubscribe', (pattern, count) => {
            console.log(`❌ Pattern unsubscribed from '${pattern}', remaining: ${count}`);
        });
        
        console.log('🎧 Message listener started...');
    }
    
    handleMessage(type, channel, pattern, message) {
        try {
            // Find appropriate handler
            let handler = null;
            
            if (type === 'message' && this.messageHandlers.has(channel)) {
                handler = this.messageHandlers.get(channel);
            } else if (type === 'pmessage' && this.messageHandlers.has(pattern)) {
                handler = this.messageHandlers.get(pattern);
            }
            
            // Parse message data
            let messageData;
            try {
                messageData = JSON.parse(message);
            } catch (e) {
                messageData = { data: message, timestamp: new Date().toISOString() };
            }
            
            if (handler) {
                handler({
                    type,
                    channel,
                    pattern,
                    message: messageData,
                    rawMessage: message
                });
            } else {
                this.defaultMessageHandler({ type, channel, pattern, message: messageData });
            }
            
        } catch (error) {
            console.error('Error handling message:', error);
        }
    }
    
    defaultMessageHandler({ type, channel, pattern, message }) {
        const timestamp = new Date().toLocaleTimeString();
        
        if (type === 'message') {
            console.log(`[${timestamp}] 📨 ${channel}: ${JSON.stringify(message)}`);
        } else if (type === 'pmessage') {
            console.log(`[${timestamp}] 🔍 ${pattern} -> ${channel}: ${JSON.stringify(message)}`);
        }
    }
    
    async getChannelInfo(...channels) {
        try {
            // Active channels list
            const activeChannels = await this.publisherClient.pubsub('CHANNELS');
            console.log('Active channels:', activeChannels);
            
            // Subscriber count for specific channels
            if (channels.length > 0) {
                const subscriberCounts = await this.publisherClient.pubsub('NUMSUB', ...channels);
                for (let i = 0; i < subscriberCounts.length; i += 2) {
                    console.log(`Channel '${subscriberCounts[i]}': ${subscriberCounts[i + 1]} subscribers`);
                }
            }
            
            // Pattern subscription count
            const patternCount = await this.publisherClient.pubsub('NUMPAT');
            console.log(`Pattern subscriptions: ${patternCount}`);
            
        } catch (error) {
            console.error('Error getting channel info:', error);
        }
    }
    
    async unsubscribeAll() {
        try {
            await this.subscriberClient.unsubscribe();
            await this.subscriberClient.punsubscribe();
            this.messageHandlers.clear();
            console.log('Unsubscribed from all channels and patterns');
        } catch (error) {
            console.error('Error unsubscribing:', error);
        }
    }
    
    async close() {
        try {
            await this.unsubscribeAll();
            await this.publisherClient.quit();
            await this.subscriberClient.quit();
            console.log('Redis Pub/Sub connections closed');
        } catch (error) {
            console.error('Error closing connections:', error);
        }
    }
}

// Specific message handler examples
function createChatHandler(roomName) {
    return ({ channel, message }) => {
        const { data } = message;
        console.log(`💬 [${roomName}] ${data.username}: ${data.message}`);
    };
}

function createNotificationHandler() {
    return ({ channel, message }) => {
        const { data } = message;
        const emoji = data.level === 'error' ? '🚨' : 
                     data.level === 'warning' ? '⚠️' : 'ℹ️';
        console.log(`${emoji} ${data.level.toUpperCase()}: ${data.message}`);
    };
}

function createUserActivityHandler() {
    return ({ channel, message }) => {
        const { data } = message;
        console.log(`👤 User ${data.userId} performed action: ${data.action}`);
    };
}

// Publisher example
async function publisherExample() {
    const manager = new RedisPubSubManager();
    
    try {
        // Publish various message types
        await manager.publishMessage('chat:general', {
            username: 'alice',
            message: 'Hello from Node.js!',
            timestamp: new Date().toISOString()
        }, { type: 'chat', source: 'node-publisher' });
        
        await manager.publishMessage('notifications:system', {
            level: 'info',
            message: 'System maintenance completed',
            component: 'web-server'
        }, { type: 'notification' });
        
        await manager.publishMessage('user:activity', {
            userId: 12345,
            action: 'login',
            ip: '192.168.1.100',
            userAgent: 'Node.js Client'
        }, { type: 'user_activity' });
        
        // Pattern matching test
        await manager.publishMessage('events:user:login', {
            event: 'user_login',
            details: { userId: 67890, location: 'Tokyo' }
        });
        
        await manager.publishMessage('events:order:created', {
            event: 'order_created',
            details: { orderId: 'ORD-001', amount: 150.00 }
        });
        
    } finally {
        await manager.close();
    }
}

// Subscriber example
async function subscriberExample() {
    const manager = new RedisPubSubManager();
    
    try {
        // Subscribe to channels with custom handlers
        manager.subscribeChannel('chat:general', createChatHandler('General'));
        manager.subscribeChannel('notifications:system', createNotificationHandler());
        manager.subscribeChannel('user:activity', createUserActivityHandler());
        
        // Pattern subscriptions
        manager.subscribePattern('events:*');
        manager.subscribePattern('alerts:*');
        
        // Start listening for messages
        manager.startListening();
        
        // Check channel information
        setTimeout(() => {
            manager.getChannelInfo('chat:general', 'notifications:system', 'user:activity');
        }, 1000);
        
        // Graceful shutdown
        process.on('SIGINT', async () => {
            console.log('\n🛑 Shutting down subscriber...');
            await manager.close();
            process.exit(0);
        });
        
        // Keep running
        console.log('Press Ctrl+C to stop the subscriber...');
        
    } catch (error) {
        console.error('Error in subscriber:', error);
        await manager.close();
    }
}

// Execution example
if (require.main === module) {
    const mode = process.argv[2];
    
    if (mode === 'publish') {
        console.log('🚀 Running publisher example...');
        publisherExample().catch(console.error);
    } else {
        console.log('🎧 Running subscriber example...');
        subscriberExample().catch(console.error);
    }
}

module.exports = RedisPubSubManager;

Pattern Matching and Advanced Features

# Advanced pattern matching examples
redis-cli

# Hierarchical channel structure
SUBSCRIBE app:user:login
SUBSCRIBE app:user:logout
SUBSCRIBE app:order:created
SUBSCRIBE app:order:completed

# Pattern subscriptions for bulk processing
PSUBSCRIBE app:user:*      # All user-related events
PSUBSCRIBE app:order:*     # All order-related events
PSUBSCRIBE app:*:error     # All error events from any module

# Regional channels
PSUBSCRIBE region:asia:*
PSUBSCRIBE region:europe:*
PSUBSCRIBE region:america:*

# Level-based notifications
PSUBSCRIBE alerts:critical:*
PSUBSCRIBE alerts:warning:*
PSUBSCRIBE alerts:info:*

# Time-based channels
PSUBSCRIBE events:2024:*
PSUBSCRIBE logs:hourly:*
PSUBSCRIBE stats:daily:*

# Sharded Pub/Sub (Redis 7.0+)
# High-performance usage in cluster environments
SSUBSCRIBE orders:asia
SSUBSCRIBE orders:europe  
SSUBSCRIBE orders:america

SPUBLISH orders:asia "Order #A123 from Tokyo"
SPUBLISH orders:europe "Order #E456 from London"

# Channel information and monitoring
PUBSUB CHANNELS                    # Active channels
PUBSUB CHANNELS app:user:*         # Pattern-matching channels
PUBSUB NUMSUB app:user:login       # Subscriber count for specific channel
PUBSUB NUMPAT                      # Total pattern subscriptions

# Sharded Pub/Sub information
PUBSUB SHARDCHANNELS              # Shard channel list
PUBSUB SHARDNUMSUB orders:asia    # Shard channel subscriber count

Practical Use Case Implementations

# Real-time chat application
import asyncio
import redis.asyncio as redis
import json
from datetime import datetime

class RealTimeChatApp:
    def __init__(self):
        self.redis = redis.Redis(host='localhost', port=6379, decode_responses=True)
        self.pubsub = self.redis.pubsub()
        self.active_users = set()
        
    async def join_room(self, user_id, room_id):
        """User joins chat room"""
        channel = f"chat:room:{room_id}"
        
        # Room join notification
        join_message = {
            'type': 'user_joined',
            'user_id': user_id,
            'timestamp': datetime.now().isoformat(),
            'message': f"User {user_id} joined the room"
        }
        
        await self.redis.publish(channel, json.dumps(join_message))
        await self.pubsub.subscribe(channel)
        self.active_users.add(user_id)
        
        print(f"User {user_id} joined room {room_id}")
    
    async def send_message(self, user_id, room_id, message):
        """Send chat message"""
        channel = f"chat:room:{room_id}"
        
        chat_message = {
            'type': 'chat_message',
            'user_id': user_id,
            'room_id': room_id,
            'message': message,
            'timestamp': datetime.now().isoformat()
        }
        
        receivers = await self.redis.publish(channel, json.dumps(chat_message))
        print(f"Message sent to {receivers} users in room {room_id}")
        
    async def leave_room(self, user_id, room_id):
        """User leaves chat room"""
        channel = f"chat:room:{room_id}"
        
        leave_message = {
            'type': 'user_left',
            'user_id': user_id,
            'timestamp': datetime.now().isoformat(),
            'message': f"User {user_id} left the room"
        }
        
        await self.redis.publish(channel, json.dumps(leave_message))
        await self.pubsub.unsubscribe(channel)
        self.active_users.discard(user_id)
        
        print(f"User {user_id} left room {room_id}")
    
    async def listen_messages(self):
        """Message reception processing"""
        async for message in self.pubsub.listen():
            if message['type'] == 'message':
                try:
                    data = json.loads(message['data'])
                    await self.handle_chat_message(data)
                except json.JSONDecodeError:
                    print(f"Invalid message format: {message['data']}")
    
    async def handle_chat_message(self, data):
        """Chat message processing"""
        message_type = data.get('type')
        
        if message_type == 'chat_message':
            print(f"💬 [{data['room_id']}] {data['user_id']}: {data['message']}")
        elif message_type == 'user_joined':
            print(f"👋 {data['message']}")
        elif message_type == 'user_left':
            print(f"👋 {data['message']}")
    
    async def close(self):
        await self.pubsub.close()
        await self.redis.close()

# Event-driven system
class EventDrivenSystem:
    def __init__(self):
        self.redis = redis.Redis(host='localhost', port=6379, decode_responses=True)
        self.event_handlers = {}
        
    def register_event_handler(self, event_type, handler):
        """Register event handler"""
        if event_type not in self.event_handlers:
            self.event_handlers[event_type] = []
        self.event_handlers[event_type].append(handler)
        
    async def emit_event(self, event_type, event_data):
        """Emit event"""
        event = {
            'type': event_type,
            'data': event_data,
            'timestamp': datetime.now().isoformat(),
            'event_id': f"{event_type}_{int(datetime.now().timestamp() * 1000)}"
        }
        
        channel = f"events:{event_type}"
        receivers = await self.redis.publish(channel, json.dumps(event))
        print(f"Event '{event_type}' emitted to {receivers} listeners")
        
    async def subscribe_events(self, *event_types):
        """Subscribe to events"""
        pubsub = self.redis.pubsub()
        
        for event_type in event_types:
            channel = f"events:{event_type}"
            await pubsub.subscribe(channel)
            print(f"Subscribed to events: {event_type}")
        
        async for message in pubsub.listen():
            if message['type'] == 'message':
                try:
                    event = json.loads(message['data'])
                    await self.handle_event(event)
                except json.JSONDecodeError:
                    print(f"Invalid event format: {message['data']}")
    
    async def handle_event(self, event):
        """Event processing"""
        event_type = event.get('type')
        
        if event_type in self.event_handlers:
            for handler in self.event_handlers[event_type]:
                try:
                    await handler(event)
                except Exception as e:
                    print(f"Error in event handler for {event_type}: {e}")
        else:
            print(f"No handler for event type: {event_type}")

# Usage examples
async def chat_demo():
    """Chat application demo"""
    app = RealTimeChatApp()
    
    try:
        # Users join room
        await app.join_room("alice", "general")
        await app.join_room("bob", "general")
        
        # Send messages
        await app.send_message("alice", "general", "Hello everyone!")
        await app.send_message("bob", "general", "Hi Alice!")
        
        # Start message reception (runs in background in actual app)
        # await app.listen_messages()
        
    finally:
        await app.close()

async def event_demo():
    """Event-driven system demo"""
    system = EventDrivenSystem()
    
    # Register event handlers
    async def user_registered_handler(event):
        data = event['data']
        print(f"📧 Sending welcome email to {data['email']}")
    
    async def order_created_handler(event):
        data = event['data']
        print(f"📦 Processing order {data['order_id']} for ${data['amount']}")
    
    system.register_event_handler('user_registered', user_registered_handler)
    system.register_event_handler('order_created', order_created_handler)
    
    # Emit events
    await system.emit_event('user_registered', {
        'user_id': 12345,
        'email': '[email protected]',
        'name': 'John Doe'
    })
    
    await system.emit_event('order_created', {
        'order_id': 'ORD-001',
        'user_id': 12345,
        'amount': 99.99,
        'items': ['Product A', 'Product B']
    })

if __name__ == "__main__":
    # asyncio.run(chat_demo())
    asyncio.run(event_demo())

Monitoring and Performance Optimization

# Redis Pub/Sub monitoring script
cat > monitor-pubsub.sh << 'EOF'
#!/bin/bash

echo "=== Redis Pub/Sub Monitoring Report $(date) ==="

# Redis connection check
echo "=== Redis Connection Status ==="
redis-cli ping > /dev/null 2>&1 && echo "Redis: CONNECTED" || echo "Redis: DISCONNECTED"

# Active channel count
echo "=== Active Channels ==="
ACTIVE_CHANNELS=$(redis-cli PUBSUB CHANNELS | wc -l)
echo "Total active channels: $ACTIVE_CHANNELS"

# Pattern subscription count
echo "=== Pattern Subscriptions ==="
PATTERN_SUBS=$(redis-cli PUBSUB NUMPAT)
echo "Pattern subscriptions: $PATTERN_SUBS"

# Subscriber count for major channels
echo "=== Channel Subscribers ==="
redis-cli PUBSUB NUMSUB chat:general news:tech system:alerts user:notifications | \
while read channel count; do
    if [ -n "$channel" ] && [ -n "$count" ]; then
        echo "Channel '$channel': $count subscribers"
    fi
done

# Redis information
echo "=== Redis Server Info ==="
redis-cli INFO clients | grep connected_clients
redis-cli INFO memory | grep used_memory_human
redis-cli INFO stats | grep total_commands_processed

# Pub/Sub statistics
echo "=== Pub/Sub Statistics ==="
redis-cli INFO stats | grep pubsub

echo "=== Monitoring Complete ==="
EOF

chmod +x monitor-pubsub.sh

# Performance testing
# High-volume message publishing test
redis-cli --eval - 0 << 'EOF'
for i=1,10000 do
    redis.call('PUBLISH', 'performance_test', 'Message ' .. i)
end
return 'Published 10000 messages'
EOF

# Subscriber performance test
redis-cli --latency-history -i 1 PUBLISH test_channel "performance test"

# Memory usage monitoring
redis-cli --bigkeys

# Client connection monitoring
redis-cli CLIENT LIST

Redis Pub/Sub, with its simplicity and high performance, is widely adopted for real-time messaging, event-driven architectures, chat applications, and live notification systems. It particularly excels in applications requiring millisecond-level low latency and lightweight, high-speed message delivery.