Redis Pub/Sub
Built-in messaging feature of Redis. Provides lightweight publish-subscribe pattern. Optimal for real-time notifications, chat, and live updates.
Server
Redis Pub/Sub
Overview
Redis Pub/Sub is a built-in real-time messaging feature in Redis that implements the Publish/Subscribe pattern, providing a simple yet high-performance messaging system. Since its introduction in 2009, it has been widely adopted for real-time communication, event-driven architectures, chat applications, and live notification systems. Leveraging Redis's inherent speed, it delivers messages with millisecond-level low latency and efficiently handles large numbers of publishers and subscribers while maintaining a lightweight footprint. Redis 7.0 introduced Sharded Pub/Sub functionality that significantly improves scalability in cluster environments, optimizing message distribution for large-scale distributed systems.
Details
Redis Pub/Sub 2025 edition has evolved as the definitive solution for real-time messaging through its simplicity and high performance. Beyond basic fire-and-forget message delivery, pattern matching capabilities enable flexible channel subscriptions, with wildcard support for dynamic channel management. Sharded Pub/Sub introduced in Redis 7.0 optimizes message distribution within clusters by assigning channels to slots, improving from traditional all-node broadcasting to efficient shard-based delivery. The design prioritizes real-time performance and lightweight operation over persistence and message guarantees, making it particularly powerful for speed-critical applications such as IoT data streaming, real-time game synchronization, financial transaction notifications, and any scenario where immediate delivery is paramount.
Key Features
- Real-time Delivery: Millisecond-level low latency messaging
- Pattern Matching: Flexible channel subscription using wildcards
- Lightweight Design: Memory-efficient implementation with minimal overhead
- High Scalability: Efficient handling of numerous publishers and subscribers
- Sharded Pub/Sub: Optimized message distribution in cluster environments
- Simple API: Intuitive and easy-to-learn command structure
Pros and Cons
Pros
- Extremely fast message delivery performance and real-time capabilities
- Memory-efficient messaging leveraging Redis's lightweight nature
- Rapid development and easy maintenance through simple API
- Rich ecosystem of client libraries supporting multiple languages
- Dynamic and flexible channel management through pattern matching
- Complete integration with existing Redis infrastructure reducing deployment costs
Cons
- Limited message persistence and delivery guarantee (fire-and-forget design)
- Risk of message loss when no subscribers are present
- Lack of complex message routing capabilities
- No transaction support or dead letter queue functionality
- Not suitable for large message processing
- Limited stream processing capabilities compared to systems like Kafka
Reference Pages
Code Examples
Basic Installation and Setup
# Redis server installation (Ubuntu/Debian)
sudo apt update
sudo apt install redis-server
# CentOS/RHEL/Fedora
sudo dnf install redis
# macOS (using Homebrew)
brew install redis
# Start Redis server
redis-server
# Connect to Redis CLI in another terminal
redis-cli
# Check Redis server status
redis-cli ping
# Should return PONG
# Docker usage
docker run --name redis-pubsub \
-p 6379:6379 \
-d redis:7-alpine
# Docker Compose configuration
cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
redis:
image: redis:7-alpine
container_name: redis-pubsub
ports:
- "6379:6379"
command: redis-server --appendonly yes
volumes:
- redis-data:/data
restart: unless-stopped
volumes:
redis-data:
EOF
docker-compose up -d
Basic Pub/Sub Operations (Redis CLI)
# Channel subscription (subscriber)
# Execute in Terminal 1
redis-cli
SUBSCRIBE news sport technology
# Multiple channel subscription
SUBSCRIBE chat:room1 chat:room2 chat:room3
# Pattern subscription (using wildcards)
PSUBSCRIBE news:*
PSUBSCRIBE chat:room*
PSUBSCRIBE user:*:notifications
# Message publishing (publisher)
# Execute in Terminal 2
redis-cli
PUBLISH news "Breaking: New technology breakthrough announced"
PUBLISH sport "Football match result: Team A 3-1 Team B"
PUBLISH chat:room1 "Hello everyone in room 1!"
# Publishing to pattern-matching channels
PUBLISH news:tech "Latest in AI development"
PUBLISH news:sports "Olympic games update"
PUBLISH user:123:notifications "You have a new message"
# Unsubscribe operations
UNSUBSCRIBE news sport
PUNSUBSCRIBE news:*
# Channel information
PUBSUB CHANNELS # List active channels
PUBSUB NUMSUB news sport # Subscriber count for specific channels
PUBSUB NUMPAT # Pattern subscription count
# Sharded Pub/Sub (Redis 7.0+)
SSUBSCRIBE orders:shard1 # Shard subscription
SPUBLISH orders:shard1 "Order #12345 processed" # Shard publishing
SUNSUBSCRIBE orders:shard1 # Shard unsubscription
Python Implementation (redis-py)
# pip install redis
import redis
import json
import threading
import time
from datetime import datetime
class RedisPubSubManager:
def __init__(self, host='localhost', port=6379, db=0, password=None):
self.redis_client = redis.Redis(
host=host,
port=port,
db=db,
password=password,
decode_responses=True,
socket_keepalive=True,
socket_keepalive_options={}
)
self.pubsub = self.redis_client.pubsub()
self.subscribed_channels = set()
self.message_handlers = {}
def publish_message(self, channel, message, message_type='text'):
"""Publish message"""
try:
if isinstance(message, dict):
message_data = {
'type': message_type,
'timestamp': datetime.now().isoformat(),
'data': message
}
message_str = json.dumps(message_data)
else:
message_str = str(message)
# Publish message
receivers = self.redis_client.publish(channel, message_str)
print(f"Message published to '{channel}': {receivers} receivers")
return receivers
except Exception as e:
print(f"Error publishing message: {e}")
return 0
def subscribe_channel(self, channel, message_handler=None):
"""Subscribe to channel"""
try:
self.pubsub.subscribe(channel)
self.subscribed_channels.add(channel)
if message_handler:
self.message_handlers[channel] = message_handler
print(f"Subscribed to channel: {channel}")
except Exception as e:
print(f"Error subscribing to channel {channel}: {e}")
def subscribe_pattern(self, pattern, message_handler=None):
"""Subscribe to pattern"""
try:
self.pubsub.psubscribe(pattern)
if message_handler:
self.message_handlers[pattern] = message_handler
print(f"Subscribed to pattern: {pattern}")
except Exception as e:
print(f"Error subscribing to pattern {pattern}: {e}")
def default_message_handler(self, message):
"""Default message handler"""
if message['type'] == 'message':
channel = message['channel']
data = message['data']
# Try parsing JSON format
try:
parsed_data = json.loads(data)
print(f"[{channel}] {parsed_data}")
except json.JSONDecodeError:
print(f"[{channel}] {data}")
elif message['type'] == 'pmessage':
pattern = message['pattern']
channel = message['channel']
data = message['data']
print(f"[Pattern: {pattern}] [{channel}] {data}")
elif message['type'] in ['subscribe', 'psubscribe']:
print(f"Subscribed to {message['channel']}, total: {message['data']} subscriptions")
elif message['type'] in ['unsubscribe', 'punsubscribe']:
print(f"Unsubscribed from {message['channel']}, remaining: {message['data']} subscriptions")
def start_listening(self, timeout=1.0):
"""Start message reception"""
print("Starting message listener...")
try:
for message in self.pubsub.listen():
if message is not None:
# Check for channel-specific handlers
channel = message.get('channel')
pattern = message.get('pattern')
handler = None
if channel and channel in self.message_handlers:
handler = self.message_handlers[channel]
elif pattern and pattern in self.message_handlers:
handler = self.message_handlers[pattern]
if handler:
try:
handler(message)
except Exception as e:
print(f"Error in message handler: {e}")
else:
self.default_message_handler(message)
except KeyboardInterrupt:
print("\nStopping message listener...")
self.close()
def unsubscribe_all(self):
"""Unsubscribe from all channels"""
try:
self.pubsub.unsubscribe()
self.pubsub.punsubscribe()
self.subscribed_channels.clear()
self.message_handlers.clear()
print("Unsubscribed from all channels and patterns")
except Exception as e:
print(f"Error unsubscribing: {e}")
def get_channel_info(self, *channels):
"""Get channel information"""
try:
# Active channels list
active_channels = self.redis_client.pubsub_channels()
print(f"Active channels: {active_channels}")
# Subscriber count for specific channels
if channels:
subscriber_counts = self.redis_client.pubsub_numsub(*channels)
for channel, count in subscriber_counts:
print(f"Channel '{channel}': {count} subscribers")
# Pattern subscription count
pattern_count = self.redis_client.pubsub_numpat()
print(f"Pattern subscriptions: {pattern_count}")
except Exception as e:
print(f"Error getting channel info: {e}")
def close(self):
"""Close connection"""
try:
self.pubsub.close()
self.redis_client.close()
print("Redis Pub/Sub connection closed")
except Exception as e:
print(f"Error closing connection: {e}")
# Usage examples and sample applications
def user_notification_handler(message):
"""User notification specific handler"""
try:
data = json.loads(message['data'])
user_id = data['data'].get('user_id')
notification_type = data['data'].get('type')
content = data['data'].get('content')
print(f"🔔 USER {user_id} | {notification_type.upper()}: {content}")
except Exception as e:
print(f"Error processing user notification: {e}")
def chat_message_handler(message):
"""Chat message specific handler"""
try:
data = json.loads(message['data'])
username = data['data'].get('username')
content = data['data'].get('message')
print(f"💬 {username}: {content}")
except Exception as e:
print(f"Error processing chat message: {e}")
# Publisher example
def publisher_example():
manager = RedisPubSubManager()
try:
# Publish various message types
# News publishing
manager.publish_message('news:tech', {
'title': 'AI breakthrough in 2024',
'content': 'New AI model achieves human-level performance',
'category': 'technology'
}, 'news')
# Chat message publishing
manager.publish_message('chat:general', {
'username': 'alice',
'message': 'Hello everyone!',
'room': 'general'
}, 'chat')
# User notification publishing
manager.publish_message('user:notifications', {
'user_id': 12345,
'type': 'message',
'content': 'You have a new private message'
}, 'notification')
# System notification publishing
manager.publish_message('system:alerts', {
'level': 'warning',
'message': 'High CPU usage detected',
'timestamp': datetime.now().isoformat()
}, 'system_alert')
finally:
manager.close()
# Subscriber example
def subscriber_example():
manager = RedisPubSubManager()
try:
# Individual channel subscriptions
manager.subscribe_channel('chat:general', chat_message_handler)
manager.subscribe_channel('user:notifications', user_notification_handler)
# Pattern subscriptions
manager.subscribe_pattern('news:*')
manager.subscribe_pattern('system:*')
# Check channel information
manager.get_channel_info('chat:general', 'user:notifications')
# Start message reception
manager.start_listening()
except KeyboardInterrupt:
print("\nShutting down subscriber...")
finally:
manager.unsubscribe_all()
manager.close()
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == 'publish':
print("Running publisher example...")
publisher_example()
else:
print("Running subscriber example (Ctrl+C to stop)...")
subscriber_example()
Node.js Implementation (ioredis)
// npm install ioredis
const Redis = require('ioredis');
class RedisPubSubManager {
constructor(options = {}) {
this.publisherClient = new Redis({
host: options.host || 'localhost',
port: options.port || 6379,
db: options.db || 0,
password: options.password,
retryDelayOnFailover: 100,
enableReadyCheck: false,
maxRetriesPerRequest: null,
});
this.subscriberClient = new Redis({
host: options.host || 'localhost',
port: options.port || 6379,
db: options.db || 0,
password: options.password,
retryDelayOnFailover: 100,
enableReadyCheck: false,
maxRetriesPerRequest: null,
});
this.messageHandlers = new Map();
this.setupErrorHandling();
}
setupErrorHandling() {
this.publisherClient.on('error', (err) => {
console.error('Publisher Redis Error:', err);
});
this.subscriberClient.on('error', (err) => {
console.error('Subscriber Redis Error:', err);
});
this.publisherClient.on('connect', () => {
console.log('Publisher connected to Redis');
});
this.subscriberClient.on('connect', () => {
console.log('Subscriber connected to Redis');
});
}
async publishMessage(channel, message, metadata = {}) {
try {
const messageData = {
type: metadata.type || 'text',
timestamp: new Date().toISOString(),
source: metadata.source || 'node-app',
data: message
};
const receiverCount = await this.publisherClient.publish(
channel,
JSON.stringify(messageData)
);
console.log(`📤 Published to '${channel}': ${receiverCount} receivers`);
return receiverCount;
} catch (error) {
console.error(`Error publishing message to ${channel}:`, error);
return 0;
}
}
subscribeChannel(channel, messageHandler) {
try {
this.subscriberClient.subscribe(channel);
if (messageHandler) {
this.messageHandlers.set(channel, messageHandler);
}
console.log(`📥 Subscribed to channel: ${channel}`);
} catch (error) {
console.error(`Error subscribing to channel ${channel}:`, error);
}
}
subscribePattern(pattern, messageHandler) {
try {
this.subscriberClient.psubscribe(pattern);
if (messageHandler) {
this.messageHandlers.set(pattern, messageHandler);
}
console.log(`📥 Subscribed to pattern: ${pattern}`);
} catch (error) {
console.error(`Error subscribing to pattern ${pattern}:`, error);
}
}
startListening() {
// Regular channel messages
this.subscriberClient.on('message', (channel, message) => {
this.handleMessage('message', channel, null, message);
});
// Pattern match messages
this.subscriberClient.on('pmessage', (pattern, channel, message) => {
this.handleMessage('pmessage', channel, pattern, message);
});
// Subscription confirmations
this.subscriberClient.on('subscribe', (channel, count) => {
console.log(`✅ Subscribed to '${channel}', total subscriptions: ${count}`);
});
this.subscriberClient.on('psubscribe', (pattern, count) => {
console.log(`✅ Pattern subscribed to '${pattern}', total subscriptions: ${count}`);
});
// Unsubscription confirmations
this.subscriberClient.on('unsubscribe', (channel, count) => {
console.log(`❌ Unsubscribed from '${channel}', remaining: ${count}`);
});
this.subscriberClient.on('punsubscribe', (pattern, count) => {
console.log(`❌ Pattern unsubscribed from '${pattern}', remaining: ${count}`);
});
console.log('🎧 Message listener started...');
}
handleMessage(type, channel, pattern, message) {
try {
// Find appropriate handler
let handler = null;
if (type === 'message' && this.messageHandlers.has(channel)) {
handler = this.messageHandlers.get(channel);
} else if (type === 'pmessage' && this.messageHandlers.has(pattern)) {
handler = this.messageHandlers.get(pattern);
}
// Parse message data
let messageData;
try {
messageData = JSON.parse(message);
} catch (e) {
messageData = { data: message, timestamp: new Date().toISOString() };
}
if (handler) {
handler({
type,
channel,
pattern,
message: messageData,
rawMessage: message
});
} else {
this.defaultMessageHandler({ type, channel, pattern, message: messageData });
}
} catch (error) {
console.error('Error handling message:', error);
}
}
defaultMessageHandler({ type, channel, pattern, message }) {
const timestamp = new Date().toLocaleTimeString();
if (type === 'message') {
console.log(`[${timestamp}] 📨 ${channel}: ${JSON.stringify(message)}`);
} else if (type === 'pmessage') {
console.log(`[${timestamp}] 🔍 ${pattern} -> ${channel}: ${JSON.stringify(message)}`);
}
}
async getChannelInfo(...channels) {
try {
// Active channels list
const activeChannels = await this.publisherClient.pubsub('CHANNELS');
console.log('Active channels:', activeChannels);
// Subscriber count for specific channels
if (channels.length > 0) {
const subscriberCounts = await this.publisherClient.pubsub('NUMSUB', ...channels);
for (let i = 0; i < subscriberCounts.length; i += 2) {
console.log(`Channel '${subscriberCounts[i]}': ${subscriberCounts[i + 1]} subscribers`);
}
}
// Pattern subscription count
const patternCount = await this.publisherClient.pubsub('NUMPAT');
console.log(`Pattern subscriptions: ${patternCount}`);
} catch (error) {
console.error('Error getting channel info:', error);
}
}
async unsubscribeAll() {
try {
await this.subscriberClient.unsubscribe();
await this.subscriberClient.punsubscribe();
this.messageHandlers.clear();
console.log('Unsubscribed from all channels and patterns');
} catch (error) {
console.error('Error unsubscribing:', error);
}
}
async close() {
try {
await this.unsubscribeAll();
await this.publisherClient.quit();
await this.subscriberClient.quit();
console.log('Redis Pub/Sub connections closed');
} catch (error) {
console.error('Error closing connections:', error);
}
}
}
// Specific message handler examples
function createChatHandler(roomName) {
return ({ channel, message }) => {
const { data } = message;
console.log(`💬 [${roomName}] ${data.username}: ${data.message}`);
};
}
function createNotificationHandler() {
return ({ channel, message }) => {
const { data } = message;
const emoji = data.level === 'error' ? '🚨' :
data.level === 'warning' ? '⚠️' : 'ℹ️';
console.log(`${emoji} ${data.level.toUpperCase()}: ${data.message}`);
};
}
function createUserActivityHandler() {
return ({ channel, message }) => {
const { data } = message;
console.log(`👤 User ${data.userId} performed action: ${data.action}`);
};
}
// Publisher example
async function publisherExample() {
const manager = new RedisPubSubManager();
try {
// Publish various message types
await manager.publishMessage('chat:general', {
username: 'alice',
message: 'Hello from Node.js!',
timestamp: new Date().toISOString()
}, { type: 'chat', source: 'node-publisher' });
await manager.publishMessage('notifications:system', {
level: 'info',
message: 'System maintenance completed',
component: 'web-server'
}, { type: 'notification' });
await manager.publishMessage('user:activity', {
userId: 12345,
action: 'login',
ip: '192.168.1.100',
userAgent: 'Node.js Client'
}, { type: 'user_activity' });
// Pattern matching test
await manager.publishMessage('events:user:login', {
event: 'user_login',
details: { userId: 67890, location: 'Tokyo' }
});
await manager.publishMessage('events:order:created', {
event: 'order_created',
details: { orderId: 'ORD-001', amount: 150.00 }
});
} finally {
await manager.close();
}
}
// Subscriber example
async function subscriberExample() {
const manager = new RedisPubSubManager();
try {
// Subscribe to channels with custom handlers
manager.subscribeChannel('chat:general', createChatHandler('General'));
manager.subscribeChannel('notifications:system', createNotificationHandler());
manager.subscribeChannel('user:activity', createUserActivityHandler());
// Pattern subscriptions
manager.subscribePattern('events:*');
manager.subscribePattern('alerts:*');
// Start listening for messages
manager.startListening();
// Check channel information
setTimeout(() => {
manager.getChannelInfo('chat:general', 'notifications:system', 'user:activity');
}, 1000);
// Graceful shutdown
process.on('SIGINT', async () => {
console.log('\n🛑 Shutting down subscriber...');
await manager.close();
process.exit(0);
});
// Keep running
console.log('Press Ctrl+C to stop the subscriber...');
} catch (error) {
console.error('Error in subscriber:', error);
await manager.close();
}
}
// Execution example
if (require.main === module) {
const mode = process.argv[2];
if (mode === 'publish') {
console.log('🚀 Running publisher example...');
publisherExample().catch(console.error);
} else {
console.log('🎧 Running subscriber example...');
subscriberExample().catch(console.error);
}
}
module.exports = RedisPubSubManager;
Pattern Matching and Advanced Features
# Advanced pattern matching examples
redis-cli
# Hierarchical channel structure
SUBSCRIBE app:user:login
SUBSCRIBE app:user:logout
SUBSCRIBE app:order:created
SUBSCRIBE app:order:completed
# Pattern subscriptions for bulk processing
PSUBSCRIBE app:user:* # All user-related events
PSUBSCRIBE app:order:* # All order-related events
PSUBSCRIBE app:*:error # All error events from any module
# Regional channels
PSUBSCRIBE region:asia:*
PSUBSCRIBE region:europe:*
PSUBSCRIBE region:america:*
# Level-based notifications
PSUBSCRIBE alerts:critical:*
PSUBSCRIBE alerts:warning:*
PSUBSCRIBE alerts:info:*
# Time-based channels
PSUBSCRIBE events:2024:*
PSUBSCRIBE logs:hourly:*
PSUBSCRIBE stats:daily:*
# Sharded Pub/Sub (Redis 7.0+)
# High-performance usage in cluster environments
SSUBSCRIBE orders:asia
SSUBSCRIBE orders:europe
SSUBSCRIBE orders:america
SPUBLISH orders:asia "Order #A123 from Tokyo"
SPUBLISH orders:europe "Order #E456 from London"
# Channel information and monitoring
PUBSUB CHANNELS # Active channels
PUBSUB CHANNELS app:user:* # Pattern-matching channels
PUBSUB NUMSUB app:user:login # Subscriber count for specific channel
PUBSUB NUMPAT # Total pattern subscriptions
# Sharded Pub/Sub information
PUBSUB SHARDCHANNELS # Shard channel list
PUBSUB SHARDNUMSUB orders:asia # Shard channel subscriber count
Practical Use Case Implementations
# Real-time chat application
import asyncio
import redis.asyncio as redis
import json
from datetime import datetime
class RealTimeChatApp:
def __init__(self):
self.redis = redis.Redis(host='localhost', port=6379, decode_responses=True)
self.pubsub = self.redis.pubsub()
self.active_users = set()
async def join_room(self, user_id, room_id):
"""User joins chat room"""
channel = f"chat:room:{room_id}"
# Room join notification
join_message = {
'type': 'user_joined',
'user_id': user_id,
'timestamp': datetime.now().isoformat(),
'message': f"User {user_id} joined the room"
}
await self.redis.publish(channel, json.dumps(join_message))
await self.pubsub.subscribe(channel)
self.active_users.add(user_id)
print(f"User {user_id} joined room {room_id}")
async def send_message(self, user_id, room_id, message):
"""Send chat message"""
channel = f"chat:room:{room_id}"
chat_message = {
'type': 'chat_message',
'user_id': user_id,
'room_id': room_id,
'message': message,
'timestamp': datetime.now().isoformat()
}
receivers = await self.redis.publish(channel, json.dumps(chat_message))
print(f"Message sent to {receivers} users in room {room_id}")
async def leave_room(self, user_id, room_id):
"""User leaves chat room"""
channel = f"chat:room:{room_id}"
leave_message = {
'type': 'user_left',
'user_id': user_id,
'timestamp': datetime.now().isoformat(),
'message': f"User {user_id} left the room"
}
await self.redis.publish(channel, json.dumps(leave_message))
await self.pubsub.unsubscribe(channel)
self.active_users.discard(user_id)
print(f"User {user_id} left room {room_id}")
async def listen_messages(self):
"""Message reception processing"""
async for message in self.pubsub.listen():
if message['type'] == 'message':
try:
data = json.loads(message['data'])
await self.handle_chat_message(data)
except json.JSONDecodeError:
print(f"Invalid message format: {message['data']}")
async def handle_chat_message(self, data):
"""Chat message processing"""
message_type = data.get('type')
if message_type == 'chat_message':
print(f"💬 [{data['room_id']}] {data['user_id']}: {data['message']}")
elif message_type == 'user_joined':
print(f"👋 {data['message']}")
elif message_type == 'user_left':
print(f"👋 {data['message']}")
async def close(self):
await self.pubsub.close()
await self.redis.close()
# Event-driven system
class EventDrivenSystem:
def __init__(self):
self.redis = redis.Redis(host='localhost', port=6379, decode_responses=True)
self.event_handlers = {}
def register_event_handler(self, event_type, handler):
"""Register event handler"""
if event_type not in self.event_handlers:
self.event_handlers[event_type] = []
self.event_handlers[event_type].append(handler)
async def emit_event(self, event_type, event_data):
"""Emit event"""
event = {
'type': event_type,
'data': event_data,
'timestamp': datetime.now().isoformat(),
'event_id': f"{event_type}_{int(datetime.now().timestamp() * 1000)}"
}
channel = f"events:{event_type}"
receivers = await self.redis.publish(channel, json.dumps(event))
print(f"Event '{event_type}' emitted to {receivers} listeners")
async def subscribe_events(self, *event_types):
"""Subscribe to events"""
pubsub = self.redis.pubsub()
for event_type in event_types:
channel = f"events:{event_type}"
await pubsub.subscribe(channel)
print(f"Subscribed to events: {event_type}")
async for message in pubsub.listen():
if message['type'] == 'message':
try:
event = json.loads(message['data'])
await self.handle_event(event)
except json.JSONDecodeError:
print(f"Invalid event format: {message['data']}")
async def handle_event(self, event):
"""Event processing"""
event_type = event.get('type')
if event_type in self.event_handlers:
for handler in self.event_handlers[event_type]:
try:
await handler(event)
except Exception as e:
print(f"Error in event handler for {event_type}: {e}")
else:
print(f"No handler for event type: {event_type}")
# Usage examples
async def chat_demo():
"""Chat application demo"""
app = RealTimeChatApp()
try:
# Users join room
await app.join_room("alice", "general")
await app.join_room("bob", "general")
# Send messages
await app.send_message("alice", "general", "Hello everyone!")
await app.send_message("bob", "general", "Hi Alice!")
# Start message reception (runs in background in actual app)
# await app.listen_messages()
finally:
await app.close()
async def event_demo():
"""Event-driven system demo"""
system = EventDrivenSystem()
# Register event handlers
async def user_registered_handler(event):
data = event['data']
print(f"📧 Sending welcome email to {data['email']}")
async def order_created_handler(event):
data = event['data']
print(f"📦 Processing order {data['order_id']} for ${data['amount']}")
system.register_event_handler('user_registered', user_registered_handler)
system.register_event_handler('order_created', order_created_handler)
# Emit events
await system.emit_event('user_registered', {
'user_id': 12345,
'email': '[email protected]',
'name': 'John Doe'
})
await system.emit_event('order_created', {
'order_id': 'ORD-001',
'user_id': 12345,
'amount': 99.99,
'items': ['Product A', 'Product B']
})
if __name__ == "__main__":
# asyncio.run(chat_demo())
asyncio.run(event_demo())
Monitoring and Performance Optimization
# Redis Pub/Sub monitoring script
cat > monitor-pubsub.sh << 'EOF'
#!/bin/bash
echo "=== Redis Pub/Sub Monitoring Report $(date) ==="
# Redis connection check
echo "=== Redis Connection Status ==="
redis-cli ping > /dev/null 2>&1 && echo "Redis: CONNECTED" || echo "Redis: DISCONNECTED"
# Active channel count
echo "=== Active Channels ==="
ACTIVE_CHANNELS=$(redis-cli PUBSUB CHANNELS | wc -l)
echo "Total active channels: $ACTIVE_CHANNELS"
# Pattern subscription count
echo "=== Pattern Subscriptions ==="
PATTERN_SUBS=$(redis-cli PUBSUB NUMPAT)
echo "Pattern subscriptions: $PATTERN_SUBS"
# Subscriber count for major channels
echo "=== Channel Subscribers ==="
redis-cli PUBSUB NUMSUB chat:general news:tech system:alerts user:notifications | \
while read channel count; do
if [ -n "$channel" ] && [ -n "$count" ]; then
echo "Channel '$channel': $count subscribers"
fi
done
# Redis information
echo "=== Redis Server Info ==="
redis-cli INFO clients | grep connected_clients
redis-cli INFO memory | grep used_memory_human
redis-cli INFO stats | grep total_commands_processed
# Pub/Sub statistics
echo "=== Pub/Sub Statistics ==="
redis-cli INFO stats | grep pubsub
echo "=== Monitoring Complete ==="
EOF
chmod +x monitor-pubsub.sh
# Performance testing
# High-volume message publishing test
redis-cli --eval - 0 << 'EOF'
for i=1,10000 do
redis.call('PUBLISH', 'performance_test', 'Message ' .. i)
end
return 'Published 10000 messages'
EOF
# Subscriber performance test
redis-cli --latency-history -i 1 PUBLISH test_channel "performance test"
# Memory usage monitoring
redis-cli --bigkeys
# Client connection monitoring
redis-cli CLIENT LIST
Redis Pub/Sub, with its simplicity and high performance, is widely adopted for real-time messaging, event-driven architectures, chat applications, and live notification systems. It particularly excels in applications requiring millisecond-level low latency and lightweight, high-speed message delivery.