Redis Pub/Sub
Redisの組み込みメッセージング機能。軽量なパブリッシュ・サブスクライブパターンを提供。リアルタイム通知、チャット、ライブアップデートに最適。
サーバー
Redis Pub/Sub
概要
Redis Pub/Subは、Redisに内蔵されたリアルタイムメッセージング機能で、Publish/Subscribe(発行/購読)パターンを実装したシンプルかつ高性能なメッセージングシステムです。2009年の導入以来、リアルタイム通信、イベント駆動アーキテクチャ、チャットアプリケーション、ライブ通知システムなど幅広い用途で活用されています。Redis本来の高速性を活かし、1ミリ秒レベルの低遅延でメッセージを配信でき、軽量でありながら大量のパブリッシャーとサブスクライバーを効率的に処理します。Redis 7.0では、クラスター環境での性能向上を図るSharded Pub/Sub機能が追加され、大規模分散システムでのスケーラビリティが大幅に改善されています。
詳細
Redis Pub/Sub 2025年版は、そのシンプルさと高性能により、リアルタイムメッセージングの定番ソリューションとして進化しています。基本的なfire-and-forgetメッセージ配信に加え、パターンマッチング機能により柔軟なチャネル購読が可能で、wildcard(ワイルドカード)を使用した動的なチャネル管理が実現できます。Redis 7.0で導入されたSharded Pub/Subは、チャネルをスロットに割り当てることでクラスター内でのメッセージ配信を最適化し、従来の全ノード配信から効率的なシャード内配信へと改善されています。永続化やメッセージ保証よりもリアルタイム性と軽量性を重視した設計により、IoTデータストリーミング、ゲームのリアルタイム同期、金融取引の即座通知など、速度が重要な用途で特に威力を発揮します。
主な特徴
- リアルタイム配信: 1ミリ秒レベルの低遅延メッセージング
- パターンマッチング: ワイルドカードを使用した柔軟なチャネル購読
- 軽量設計: メモリ効率的でオーバーヘッドの少ない実装
- 高スケーラビリティ: 多数のパブリッシャー・サブスクライバーを効率処理
- Sharded Pub/Sub: クラスター環境での最適化されたメッセージ配信
- 簡潔なAPI: 直感的で習得しやすいコマンド構造
メリット・デメリット
メリット
- 極めて高速なメッセージ配信性能とリアルタイム性の実現
- Redisの軽量性を活かしたメモリ効率の良いメッセージング
- シンプルなAPIによる迅速な開発と容易な保守性
- 多言語対応の豊富なクライアントライブラリエコシステム
- パターンマッチングによる動的で柔軟なチャネル管理
- 既存のRedisインフラとの完全統合による導入コストの削減
デメリット
- メッセージ永続化とdelivery guaranteeの制限(fire-and-forget設計)
- サブスクライバー不在時のメッセージ消失リスク
- 複雑なメッセージルーティング機能の不足
- トランザクション機能やデッドレターキューの未対応
- 大容量メッセージ処理には不向き
- Kafkaのような本格的なストリーム処理機能の制限
参考ページ
書き方の例
基本的なインストールとセットアップ
# Redis サーバーインストール(Ubuntu/Debian)
sudo apt update
sudo apt install redis-server
# CentOS/RHEL/Fedora
sudo dnf install redis
# macOS(Homebrew使用)
brew install redis
# Redisサーバー起動
redis-server
# 別のターミナルでRedis CLIに接続
redis-cli
# Redis サーバー状態確認
redis-cli ping
# PONG が返ればOK
# Docker を使用した起動
docker run --name redis-pubsub \
-p 6379:6379 \
-d redis:7-alpine
# Docker Compose設定例
cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
redis:
image: redis:7-alpine
container_name: redis-pubsub
ports:
- "6379:6379"
command: redis-server --appendonly yes
volumes:
- redis-data:/data
restart: unless-stopped
volumes:
redis-data:
EOF
docker-compose up -d
基本的なPub/Sub操作(Redis CLI)
# チャネル購読(サブスクライバー)
# ターミナル1で実行
redis-cli
SUBSCRIBE news sport technology
# 複数チャネル同時購読
SUBSCRIBE chat:room1 chat:room2 chat:room3
# パターン購読(ワイルドカード使用)
PSUBSCRIBE news:*
PSUBSCRIBE chat:room*
PSUBSCRIBE user:*:notifications
# メッセージ発行(パブリッシャー)
# ターミナル2で実行
redis-cli
PUBLISH news "Breaking: New technology breakthrough announced"
PUBLISH sport "Football match result: Team A 3-1 Team B"
PUBLISH chat:room1 "Hello everyone in room 1!"
# パターンにマッチするチャネルへの発行
PUBLISH news:tech "Latest in AI development"
PUBLISH news:sports "Olympic games update"
PUBLISH user:123:notifications "You have a new message"
# 購読解除
UNSUBSCRIBE news sport
PUNSUBSCRIBE news:*
# チャネル情報確認
PUBSUB CHANNELS # アクティブなチャネル一覧
PUBSUB NUMSUB news sport # 特定チャネルの購読者数
PUBSUB NUMPAT # パターン購読数
# Sharded Pub/Sub(Redis 7.0+)
SSUBSCRIBE orders:shard1 # シャード購読
SPUBLISH orders:shard1 "Order #12345 processed" # シャード発行
SUNSUBSCRIBE orders:shard1 # シャード購読解除
Python実装(redis-py)
# pip install redis
import redis
import json
import threading
import time
from datetime import datetime
class RedisPubSubManager:
def __init__(self, host='localhost', port=6379, db=0, password=None):
self.redis_client = redis.Redis(
host=host,
port=port,
db=db,
password=password,
decode_responses=True,
socket_keepalive=True,
socket_keepalive_options={}
)
self.pubsub = self.redis_client.pubsub()
self.subscribed_channels = set()
self.message_handlers = {}
def publish_message(self, channel, message, message_type='text'):
"""メッセージ発行"""
try:
if isinstance(message, dict):
message_data = {
'type': message_type,
'timestamp': datetime.now().isoformat(),
'data': message
}
message_str = json.dumps(message_data)
else:
message_str = str(message)
# メッセージ発行
receivers = self.redis_client.publish(channel, message_str)
print(f"Message published to '{channel}': {receivers} receivers")
return receivers
except Exception as e:
print(f"Error publishing message: {e}")
return 0
def subscribe_channel(self, channel, message_handler=None):
"""チャネル購読"""
try:
self.pubsub.subscribe(channel)
self.subscribed_channels.add(channel)
if message_handler:
self.message_handlers[channel] = message_handler
print(f"Subscribed to channel: {channel}")
except Exception as e:
print(f"Error subscribing to channel {channel}: {e}")
def subscribe_pattern(self, pattern, message_handler=None):
"""パターン購読"""
try:
self.pubsub.psubscribe(pattern)
if message_handler:
self.message_handlers[pattern] = message_handler
print(f"Subscribed to pattern: {pattern}")
except Exception as e:
print(f"Error subscribing to pattern {pattern}: {e}")
def default_message_handler(self, message):
"""デフォルトメッセージハンドラー"""
if message['type'] == 'message':
channel = message['channel']
data = message['data']
# JSON形式の解析を試行
try:
parsed_data = json.loads(data)
print(f"[{channel}] {parsed_data}")
except json.JSONDecodeError:
print(f"[{channel}] {data}")
elif message['type'] == 'pmessage':
pattern = message['pattern']
channel = message['channel']
data = message['data']
print(f"[Pattern: {pattern}] [{channel}] {data}")
elif message['type'] in ['subscribe', 'psubscribe']:
print(f"Subscribed to {message['channel']}, total: {message['data']} subscriptions")
elif message['type'] in ['unsubscribe', 'punsubscribe']:
print(f"Unsubscribed from {message['channel']}, remaining: {message['data']} subscriptions")
def start_listening(self, timeout=1.0):
"""メッセージ受信開始"""
print("Starting message listener...")
try:
for message in self.pubsub.listen():
if message is not None:
# チャネル固有のハンドラーがある場合
channel = message.get('channel')
pattern = message.get('pattern')
handler = None
if channel and channel in self.message_handlers:
handler = self.message_handlers[channel]
elif pattern and pattern in self.message_handlers:
handler = self.message_handlers[pattern]
if handler:
try:
handler(message)
except Exception as e:
print(f"Error in message handler: {e}")
else:
self.default_message_handler(message)
except KeyboardInterrupt:
print("\nStopping message listener...")
self.close()
def unsubscribe_all(self):
"""全チャネル購読解除"""
try:
self.pubsub.unsubscribe()
self.pubsub.punsubscribe()
self.subscribed_channels.clear()
self.message_handlers.clear()
print("Unsubscribed from all channels and patterns")
except Exception as e:
print(f"Error unsubscribing: {e}")
def get_channel_info(self, *channels):
"""チャネル情報取得"""
try:
# アクティブチャネル一覧
active_channels = self.redis_client.pubsub_channels()
print(f"Active channels: {active_channels}")
# 特定チャネルの購読者数
if channels:
subscriber_counts = self.redis_client.pubsub_numsub(*channels)
for channel, count in subscriber_counts:
print(f"Channel '{channel}': {count} subscribers")
# パターン購読数
pattern_count = self.redis_client.pubsub_numpat()
print(f"Pattern subscriptions: {pattern_count}")
except Exception as e:
print(f"Error getting channel info: {e}")
def close(self):
"""接続クローズ"""
try:
self.pubsub.close()
self.redis_client.close()
print("Redis Pub/Sub connection closed")
except Exception as e:
print(f"Error closing connection: {e}")
# 使用例とサンプルアプリケーション
def user_notification_handler(message):
"""ユーザー通知専用ハンドラー"""
try:
data = json.loads(message['data'])
user_id = data['data'].get('user_id')
notification_type = data['data'].get('type')
content = data['data'].get('content')
print(f"🔔 USER {user_id} | {notification_type.upper()}: {content}")
except Exception as e:
print(f"Error processing user notification: {e}")
def chat_message_handler(message):
"""チャットメッセージ専用ハンドラー"""
try:
data = json.loads(message['data'])
username = data['data'].get('username')
content = data['data'].get('message')
print(f"💬 {username}: {content}")
except Exception as e:
print(f"Error processing chat message: {e}")
# パブリッシャー例
def publisher_example():
manager = RedisPubSubManager()
try:
# 様々なメッセージタイプの発行
# ニュース発行
manager.publish_message('news:tech', {
'title': 'AI breakthrough in 2024',
'content': 'New AI model achieves human-level performance',
'category': 'technology'
}, 'news')
# チャットメッセージ発行
manager.publish_message('chat:general', {
'username': 'alice',
'message': 'Hello everyone!',
'room': 'general'
}, 'chat')
# ユーザー通知発行
manager.publish_message('user:notifications', {
'user_id': 12345,
'type': 'message',
'content': 'You have a new private message'
}, 'notification')
# システム通知発行
manager.publish_message('system:alerts', {
'level': 'warning',
'message': 'High CPU usage detected',
'timestamp': datetime.now().isoformat()
}, 'system_alert')
finally:
manager.close()
# サブスクライバー例
def subscriber_example():
manager = RedisPubSubManager()
try:
# 個別チャネル購読
manager.subscribe_channel('chat:general', chat_message_handler)
manager.subscribe_channel('user:notifications', user_notification_handler)
# パターン購読
manager.subscribe_pattern('news:*')
manager.subscribe_pattern('system:*')
# チャネル情報確認
manager.get_channel_info('chat:general', 'user:notifications')
# メッセージ受信開始
manager.start_listening()
except KeyboardInterrupt:
print("\nShutting down subscriber...")
finally:
manager.unsubscribe_all()
manager.close()
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == 'publish':
print("Running publisher example...")
publisher_example()
else:
print("Running subscriber example (Ctrl+C to stop)...")
subscriber_example()
Node.js実装(ioredis)
// npm install ioredis
const Redis = require('ioredis');
class RedisPubSubManager {
constructor(options = {}) {
this.publisherClient = new Redis({
host: options.host || 'localhost',
port: options.port || 6379,
db: options.db || 0,
password: options.password,
retryDelayOnFailover: 100,
enableReadyCheck: false,
maxRetriesPerRequest: null,
});
this.subscriberClient = new Redis({
host: options.host || 'localhost',
port: options.port || 6379,
db: options.db || 0,
password: options.password,
retryDelayOnFailover: 100,
enableReadyCheck: false,
maxRetriesPerRequest: null,
});
this.messageHandlers = new Map();
this.setupErrorHandling();
}
setupErrorHandling() {
this.publisherClient.on('error', (err) => {
console.error('Publisher Redis Error:', err);
});
this.subscriberClient.on('error', (err) => {
console.error('Subscriber Redis Error:', err);
});
this.publisherClient.on('connect', () => {
console.log('Publisher connected to Redis');
});
this.subscriberClient.on('connect', () => {
console.log('Subscriber connected to Redis');
});
}
async publishMessage(channel, message, metadata = {}) {
try {
const messageData = {
type: metadata.type || 'text',
timestamp: new Date().toISOString(),
source: metadata.source || 'node-app',
data: message
};
const receiverCount = await this.publisherClient.publish(
channel,
JSON.stringify(messageData)
);
console.log(`📤 Published to '${channel}': ${receiverCount} receivers`);
return receiverCount;
} catch (error) {
console.error(`Error publishing message to ${channel}:`, error);
return 0;
}
}
subscribeChannel(channel, messageHandler) {
try {
this.subscriberClient.subscribe(channel);
if (messageHandler) {
this.messageHandlers.set(channel, messageHandler);
}
console.log(`📥 Subscribed to channel: ${channel}`);
} catch (error) {
console.error(`Error subscribing to channel ${channel}:`, error);
}
}
subscribePattern(pattern, messageHandler) {
try {
this.subscriberClient.psubscribe(pattern);
if (messageHandler) {
this.messageHandlers.set(pattern, messageHandler);
}
console.log(`📥 Subscribed to pattern: ${pattern}`);
} catch (error) {
console.error(`Error subscribing to pattern ${pattern}:`, error);
}
}
startListening() {
// 通常のチャネルメッセージ
this.subscriberClient.on('message', (channel, message) => {
this.handleMessage('message', channel, null, message);
});
// パターンマッチメッセージ
this.subscriberClient.on('pmessage', (pattern, channel, message) => {
this.handleMessage('pmessage', channel, pattern, message);
});
// 購読確認
this.subscriberClient.on('subscribe', (channel, count) => {
console.log(`✅ Subscribed to '${channel}', total subscriptions: ${count}`);
});
this.subscriberClient.on('psubscribe', (pattern, count) => {
console.log(`✅ Pattern subscribed to '${pattern}', total subscriptions: ${count}`);
});
// 購読解除確認
this.subscriberClient.on('unsubscribe', (channel, count) => {
console.log(`❌ Unsubscribed from '${channel}', remaining: ${count}`);
});
this.subscriberClient.on('punsubscribe', (pattern, count) => {
console.log(`❌ Pattern unsubscribed from '${pattern}', remaining: ${count}`);
});
console.log('🎧 Message listener started...');
}
handleMessage(type, channel, pattern, message) {
try {
// 適切なハンドラーを検索
let handler = null;
if (type === 'message' && this.messageHandlers.has(channel)) {
handler = this.messageHandlers.get(channel);
} else if (type === 'pmessage' && this.messageHandlers.has(pattern)) {
handler = this.messageHandlers.get(pattern);
}
// メッセージデータのパース
let messageData;
try {
messageData = JSON.parse(message);
} catch (e) {
messageData = { data: message, timestamp: new Date().toISOString() };
}
if (handler) {
handler({
type,
channel,
pattern,
message: messageData,
rawMessage: message
});
} else {
this.defaultMessageHandler({ type, channel, pattern, message: messageData });
}
} catch (error) {
console.error('Error handling message:', error);
}
}
defaultMessageHandler({ type, channel, pattern, message }) {
const timestamp = new Date().toLocaleTimeString();
if (type === 'message') {
console.log(`[${timestamp}] 📨 ${channel}: ${JSON.stringify(message)}`);
} else if (type === 'pmessage') {
console.log(`[${timestamp}] 🔍 ${pattern} -> ${channel}: ${JSON.stringify(message)}`);
}
}
async getChannelInfo(...channels) {
try {
// アクティブチャネル一覧
const activeChannels = await this.publisherClient.pubsub('CHANNELS');
console.log('Active channels:', activeChannels);
// 特定チャネルの購読者数
if (channels.length > 0) {
const subscriberCounts = await this.publisherClient.pubsub('NUMSUB', ...channels);
for (let i = 0; i < subscriberCounts.length; i += 2) {
console.log(`Channel '${subscriberCounts[i]}': ${subscriberCounts[i + 1]} subscribers`);
}
}
// パターン購読数
const patternCount = await this.publisherClient.pubsub('NUMPAT');
console.log(`Pattern subscriptions: ${patternCount}`);
} catch (error) {
console.error('Error getting channel info:', error);
}
}
async unsubscribeAll() {
try {
await this.subscriberClient.unsubscribe();
await this.subscriberClient.punsubscribe();
this.messageHandlers.clear();
console.log('Unsubscribed from all channels and patterns');
} catch (error) {
console.error('Error unsubscribing:', error);
}
}
async close() {
try {
await this.unsubscribeAll();
await this.publisherClient.quit();
await this.subscriberClient.quit();
console.log('Redis Pub/Sub connections closed');
} catch (error) {
console.error('Error closing connections:', error);
}
}
}
// 特定用途のメッセージハンドラー例
function createChatHandler(roomName) {
return ({ channel, message }) => {
const { data } = message;
console.log(`💬 [${roomName}] ${data.username}: ${data.message}`);
};
}
function createNotificationHandler() {
return ({ channel, message }) => {
const { data } = message;
const emoji = data.level === 'error' ? '🚨' :
data.level === 'warning' ? '⚠️' : 'ℹ️';
console.log(`${emoji} ${data.level.toUpperCase()}: ${data.message}`);
};
}
function createUserActivityHandler() {
return ({ channel, message }) => {
const { data } = message;
console.log(`👤 User ${data.userId} performed action: ${data.action}`);
};
}
// パブリッシャー例
async function publisherExample() {
const manager = new RedisPubSubManager();
try {
// 様々なタイプのメッセージを発行
await manager.publishMessage('chat:general', {
username: 'alice',
message: 'Hello from Node.js!',
timestamp: new Date().toISOString()
}, { type: 'chat', source: 'node-publisher' });
await manager.publishMessage('notifications:system', {
level: 'info',
message: 'System maintenance completed',
component: 'web-server'
}, { type: 'notification' });
await manager.publishMessage('user:activity', {
userId: 12345,
action: 'login',
ip: '192.168.1.100',
userAgent: 'Node.js Client'
}, { type: 'user_activity' });
// パターンマッチングテスト
await manager.publishMessage('events:user:login', {
event: 'user_login',
details: { userId: 67890, location: 'Tokyo' }
});
await manager.publishMessage('events:order:created', {
event: 'order_created',
details: { orderId: 'ORD-001', amount: 150.00 }
});
} finally {
await manager.close();
}
}
// サブスクライバー例
async function subscriberExample() {
const manager = new RedisPubSubManager();
try {
// カスタムハンドラーでチャネル購読
manager.subscribeChannel('chat:general', createChatHandler('General'));
manager.subscribeChannel('notifications:system', createNotificationHandler());
manager.subscribeChannel('user:activity', createUserActivityHandler());
// パターン購読
manager.subscribePattern('events:*');
manager.subscribePattern('alerts:*');
// メッセージ受信開始
manager.startListening();
// チャネル情報確認
setTimeout(() => {
manager.getChannelInfo('chat:general', 'notifications:system', 'user:activity');
}, 1000);
// Graceful shutdown
process.on('SIGINT', async () => {
console.log('\n🛑 Shutting down subscriber...');
await manager.close();
process.exit(0);
});
// 継続実行
console.log('Press Ctrl+C to stop the subscriber...');
} catch (error) {
console.error('Error in subscriber:', error);
await manager.close();
}
}
// 実行例
if (require.main === module) {
const mode = process.argv[2];
if (mode === 'publish') {
console.log('🚀 Running publisher example...');
publisherExample().catch(console.error);
} else {
console.log('🎧 Running subscriber example...');
subscriberExample().catch(console.error);
}
}
module.exports = RedisPubSubManager;
パターンマッチングと高度な機能
# 高度なパターンマッチング例
redis-cli
# 階層的チャネル構造
SUBSCRIBE app:user:login
SUBSCRIBE app:user:logout
SUBSCRIBE app:order:created
SUBSCRIBE app:order:completed
# パターン購読で一括処理
PSUBSCRIBE app:user:* # ユーザー関連イベント全て
PSUBSCRIBE app:order:* # 注文関連イベント全て
PSUBSCRIBE app:*:error # 全モジュールのエラー
# 地域別チャネル
PSUBSCRIBE region:asia:*
PSUBSCRIBE region:europe:*
PSUBSCRIBE region:america:*
# レベル別通知
PSUBSCRIBE alerts:critical:*
PSUBSCRIBE alerts:warning:*
PSUBSCRIBE alerts:info:*
# タイムベースチャネル
PSUBSCRIBE events:2024:*
PSUBSCRIBE logs:hourly:*
PSUBSCRIBE stats:daily:*
# Sharded Pub/Sub(Redis 7.0+)
# 高性能なクラスター環境での使用
SSUBSCRIBE orders:asia
SSUBSCRIBE orders:europe
SSUBSCRIBE orders:america
SPUBLISH orders:asia "Order #A123 from Tokyo"
SPUBLISH orders:europe "Order #E456 from London"
# チャネル情報とモニタリング
PUBSUB CHANNELS # アクティブチャネル
PUBSUB CHANNELS app:user:* # パターンマッチするチャネル
PUBSUB NUMSUB app:user:login # 特定チャネルの購読者数
PUBSUB NUMPAT # パターン購読の総数
# Sharded Pub/Sub情報
PUBSUB SHARDCHANNELS # シャードチャネル一覧
PUBSUB SHARDNUMSUB orders:asia # シャードチャネル購読者数
実践的なユースケース実装
# リアルタイムチャットアプリケーション
import asyncio
import redis.asyncio as redis
import json
from datetime import datetime
class RealTimeChatApp:
def __init__(self):
self.redis = redis.Redis(host='localhost', port=6379, decode_responses=True)
self.pubsub = self.redis.pubsub()
self.active_users = set()
async def join_room(self, user_id, room_id):
"""ユーザーがチャットルームに参加"""
channel = f"chat:room:{room_id}"
# ルーム参加通知
join_message = {
'type': 'user_joined',
'user_id': user_id,
'timestamp': datetime.now().isoformat(),
'message': f"User {user_id} joined the room"
}
await self.redis.publish(channel, json.dumps(join_message))
await self.pubsub.subscribe(channel)
self.active_users.add(user_id)
print(f"User {user_id} joined room {room_id}")
async def send_message(self, user_id, room_id, message):
"""チャットメッセージ送信"""
channel = f"chat:room:{room_id}"
chat_message = {
'type': 'chat_message',
'user_id': user_id,
'room_id': room_id,
'message': message,
'timestamp': datetime.now().isoformat()
}
receivers = await self.redis.publish(channel, json.dumps(chat_message))
print(f"Message sent to {receivers} users in room {room_id}")
async def leave_room(self, user_id, room_id):
"""ユーザーがチャットルームから退出"""
channel = f"chat:room:{room_id}"
leave_message = {
'type': 'user_left',
'user_id': user_id,
'timestamp': datetime.now().isoformat(),
'message': f"User {user_id} left the room"
}
await self.redis.publish(channel, json.dumps(leave_message))
await self.pubsub.unsubscribe(channel)
self.active_users.discard(user_id)
print(f"User {user_id} left room {room_id}")
async def listen_messages(self):
"""メッセージ受信処理"""
async for message in self.pubsub.listen():
if message['type'] == 'message':
try:
data = json.loads(message['data'])
await self.handle_chat_message(data)
except json.JSONDecodeError:
print(f"Invalid message format: {message['data']}")
async def handle_chat_message(self, data):
"""チャットメッセージ処理"""
message_type = data.get('type')
if message_type == 'chat_message':
print(f"💬 [{data['room_id']}] {data['user_id']}: {data['message']}")
elif message_type == 'user_joined':
print(f"👋 {data['message']}")
elif message_type == 'user_left':
print(f"👋 {data['message']}")
async def close(self):
await self.pubsub.close()
await self.redis.close()
# イベント駆動システム
class EventDrivenSystem:
def __init__(self):
self.redis = redis.Redis(host='localhost', port=6379, decode_responses=True)
self.event_handlers = {}
def register_event_handler(self, event_type, handler):
"""イベントハンドラー登録"""
if event_type not in self.event_handlers:
self.event_handlers[event_type] = []
self.event_handlers[event_type].append(handler)
async def emit_event(self, event_type, event_data):
"""イベント発行"""
event = {
'type': event_type,
'data': event_data,
'timestamp': datetime.now().isoformat(),
'event_id': f"{event_type}_{int(datetime.now().timestamp() * 1000)}"
}
channel = f"events:{event_type}"
receivers = await self.redis.publish(channel, json.dumps(event))
print(f"Event '{event_type}' emitted to {receivers} listeners")
async def subscribe_events(self, *event_types):
"""イベント購読"""
pubsub = self.redis.pubsub()
for event_type in event_types:
channel = f"events:{event_type}"
await pubsub.subscribe(channel)
print(f"Subscribed to events: {event_type}")
async for message in pubsub.listen():
if message['type'] == 'message':
try:
event = json.loads(message['data'])
await self.handle_event(event)
except json.JSONDecodeError:
print(f"Invalid event format: {message['data']}")
async def handle_event(self, event):
"""イベント処理"""
event_type = event.get('type')
if event_type in self.event_handlers:
for handler in self.event_handlers[event_type]:
try:
await handler(event)
except Exception as e:
print(f"Error in event handler for {event_type}: {e}")
else:
print(f"No handler for event type: {event_type}")
# 使用例
async def chat_demo():
"""チャットアプリデモ"""
app = RealTimeChatApp()
try:
# ユーザーがルームに参加
await app.join_room("alice", "general")
await app.join_room("bob", "general")
# メッセージ送信
await app.send_message("alice", "general", "Hello everyone!")
await app.send_message("bob", "general", "Hi Alice!")
# メッセージ受信開始(実際のアプリではバックグラウンドで実行)
# await app.listen_messages()
finally:
await app.close()
async def event_demo():
"""イベント駆動システムデモ"""
system = EventDrivenSystem()
# イベントハンドラー登録
async def user_registered_handler(event):
data = event['data']
print(f"📧 Sending welcome email to {data['email']}")
async def order_created_handler(event):
data = event['data']
print(f"📦 Processing order {data['order_id']} for ${data['amount']}")
system.register_event_handler('user_registered', user_registered_handler)
system.register_event_handler('order_created', order_created_handler)
# イベント発行
await system.emit_event('user_registered', {
'user_id': 12345,
'email': '[email protected]',
'name': 'John Doe'
})
await system.emit_event('order_created', {
'order_id': 'ORD-001',
'user_id': 12345,
'amount': 99.99,
'items': ['Product A', 'Product B']
})
if __name__ == "__main__":
# asyncio.run(chat_demo())
asyncio.run(event_demo())
監視とパフォーマンス最適化
# Redis Pub/Sub監視スクリプト
cat > monitor-pubsub.sh << 'EOF'
#!/bin/bash
echo "=== Redis Pub/Sub Monitoring Report $(date) ==="
# Redis接続確認
echo "=== Redis Connection Status ==="
redis-cli ping > /dev/null 2>&1 && echo "Redis: CONNECTED" || echo "Redis: DISCONNECTED"
# アクティブチャネル数
echo "=== Active Channels ==="
ACTIVE_CHANNELS=$(redis-cli PUBSUB CHANNELS | wc -l)
echo "Total active channels: $ACTIVE_CHANNELS"
# パターン購読数
echo "=== Pattern Subscriptions ==="
PATTERN_SUBS=$(redis-cli PUBSUB NUMPAT)
echo "Pattern subscriptions: $PATTERN_SUBS"
# 主要チャネルの購読者数
echo "=== Channel Subscribers ==="
redis-cli PUBSUB NUMSUB chat:general news:tech system:alerts user:notifications | \
while read channel count; do
if [ -n "$channel" ] && [ -n "$count" ]; then
echo "Channel '$channel': $count subscribers"
fi
done
# Redis情報
echo "=== Redis Server Info ==="
redis-cli INFO clients | grep connected_clients
redis-cli INFO memory | grep used_memory_human
redis-cli INFO stats | grep total_commands_processed
# Pub/Sub統計
echo "=== Pub/Sub Statistics ==="
redis-cli INFO stats | grep pubsub
echo "=== Monitoring Complete ==="
EOF
chmod +x monitor-pubsub.sh
# パフォーマンステスト
# 大量メッセージ送信テスト
redis-cli --eval - 0 << 'EOF'
for i=1,10000 do
redis.call('PUBLISH', 'performance_test', 'Message ' .. i)
end
return 'Published 10000 messages'
EOF
# 購読者パフォーマンステスト
redis-cli --latency-history -i 1 PUBLISH test_channel "performance test"
# メモリ使用量監視
redis-cli --bigkeys
# クライアント接続監視
redis-cli CLIENT LIST
Redis Pub/Subは、そのシンプルさと高性能により、リアルタイムメッセージング、イベント駆動アーキテクチャ、チャットアプリケーション、ライブ通知システムなど幅広い用途で活用されています。特に1ミリ秒レベルの低遅延が要求される用途や、軽量で高速なメッセージ配信が必要なアプリケーションで威力を発揮します。