Redis Pub/Sub

Redisの組み込みメッセージング機能。軽量なパブリッシュ・サブスクライブパターンを提供。リアルタイム通知、チャット、ライブアップデートに最適。

メッセージブローカーPub/Subリアルタイム非同期メッセージング高性能イベント駆動軽量スケーラブル

サーバー

Redis Pub/Sub

概要

Redis Pub/Subは、Redisに内蔵されたリアルタイムメッセージング機能で、Publish/Subscribe(発行/購読)パターンを実装したシンプルかつ高性能なメッセージングシステムです。2009年の導入以来、リアルタイム通信、イベント駆動アーキテクチャ、チャットアプリケーション、ライブ通知システムなど幅広い用途で活用されています。Redis本来の高速性を活かし、1ミリ秒レベルの低遅延でメッセージを配信でき、軽量でありながら大量のパブリッシャーとサブスクライバーを効率的に処理します。Redis 7.0では、クラスター環境での性能向上を図るSharded Pub/Sub機能が追加され、大規模分散システムでのスケーラビリティが大幅に改善されています。

詳細

Redis Pub/Sub 2025年版は、そのシンプルさと高性能により、リアルタイムメッセージングの定番ソリューションとして進化しています。基本的なfire-and-forgetメッセージ配信に加え、パターンマッチング機能により柔軟なチャネル購読が可能で、wildcard(ワイルドカード)を使用した動的なチャネル管理が実現できます。Redis 7.0で導入されたSharded Pub/Subは、チャネルをスロットに割り当てることでクラスター内でのメッセージ配信を最適化し、従来の全ノード配信から効率的なシャード内配信へと改善されています。永続化やメッセージ保証よりもリアルタイム性と軽量性を重視した設計により、IoTデータストリーミング、ゲームのリアルタイム同期、金融取引の即座通知など、速度が重要な用途で特に威力を発揮します。

主な特徴

  • リアルタイム配信: 1ミリ秒レベルの低遅延メッセージング
  • パターンマッチング: ワイルドカードを使用した柔軟なチャネル購読
  • 軽量設計: メモリ効率的でオーバーヘッドの少ない実装
  • 高スケーラビリティ: 多数のパブリッシャー・サブスクライバーを効率処理
  • Sharded Pub/Sub: クラスター環境での最適化されたメッセージ配信
  • 簡潔なAPI: 直感的で習得しやすいコマンド構造

メリット・デメリット

メリット

  • 極めて高速なメッセージ配信性能とリアルタイム性の実現
  • Redisの軽量性を活かしたメモリ効率の良いメッセージング
  • シンプルなAPIによる迅速な開発と容易な保守性
  • 多言語対応の豊富なクライアントライブラリエコシステム
  • パターンマッチングによる動的で柔軟なチャネル管理
  • 既存のRedisインフラとの完全統合による導入コストの削減

デメリット

  • メッセージ永続化とdelivery guaranteeの制限(fire-and-forget設計)
  • サブスクライバー不在時のメッセージ消失リスク
  • 複雑なメッセージルーティング機能の不足
  • トランザクション機能やデッドレターキューの未対応
  • 大容量メッセージ処理には不向き
  • Kafkaのような本格的なストリーム処理機能の制限

参考ページ

書き方の例

基本的なインストールとセットアップ

# Redis サーバーインストール(Ubuntu/Debian)
sudo apt update
sudo apt install redis-server

# CentOS/RHEL/Fedora
sudo dnf install redis

# macOS(Homebrew使用)
brew install redis

# Redisサーバー起動
redis-server

# 別のターミナルでRedis CLIに接続
redis-cli

# Redis サーバー状態確認
redis-cli ping
# PONG が返ればOK

# Docker を使用した起動
docker run --name redis-pubsub \
  -p 6379:6379 \
  -d redis:7-alpine

# Docker Compose設定例
cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    container_name: redis-pubsub
    ports:
      - "6379:6379"
    command: redis-server --appendonly yes
    volumes:
      - redis-data:/data
    restart: unless-stopped

volumes:
  redis-data:
EOF

docker-compose up -d

基本的なPub/Sub操作(Redis CLI)

# チャネル購読(サブスクライバー)
# ターミナル1で実行
redis-cli
SUBSCRIBE news sport technology

# 複数チャネル同時購読
SUBSCRIBE chat:room1 chat:room2 chat:room3

# パターン購読(ワイルドカード使用)
PSUBSCRIBE news:*
PSUBSCRIBE chat:room*
PSUBSCRIBE user:*:notifications

# メッセージ発行(パブリッシャー)
# ターミナル2で実行
redis-cli
PUBLISH news "Breaking: New technology breakthrough announced"
PUBLISH sport "Football match result: Team A 3-1 Team B"
PUBLISH chat:room1 "Hello everyone in room 1!"

# パターンにマッチするチャネルへの発行
PUBLISH news:tech "Latest in AI development"
PUBLISH news:sports "Olympic games update"
PUBLISH user:123:notifications "You have a new message"

# 購読解除
UNSUBSCRIBE news sport
PUNSUBSCRIBE news:*

# チャネル情報確認
PUBSUB CHANNELS           # アクティブなチャネル一覧
PUBSUB NUMSUB news sport  # 特定チャネルの購読者数
PUBSUB NUMPAT             # パターン購読数

# Sharded Pub/Sub(Redis 7.0+)
SSUBSCRIBE orders:shard1  # シャード購読
SPUBLISH orders:shard1 "Order #12345 processed"  # シャード発行
SUNSUBSCRIBE orders:shard1  # シャード購読解除

Python実装(redis-py)

# pip install redis

import redis
import json
import threading
import time
from datetime import datetime

class RedisPubSubManager:
    def __init__(self, host='localhost', port=6379, db=0, password=None):
        self.redis_client = redis.Redis(
            host=host,
            port=port,
            db=db,
            password=password,
            decode_responses=True,
            socket_keepalive=True,
            socket_keepalive_options={}
        )
        self.pubsub = self.redis_client.pubsub()
        self.subscribed_channels = set()
        self.message_handlers = {}
        
    def publish_message(self, channel, message, message_type='text'):
        """メッセージ発行"""
        try:
            if isinstance(message, dict):
                message_data = {
                    'type': message_type,
                    'timestamp': datetime.now().isoformat(),
                    'data': message
                }
                message_str = json.dumps(message_data)
            else:
                message_str = str(message)
            
            # メッセージ発行
            receivers = self.redis_client.publish(channel, message_str)
            print(f"Message published to '{channel}': {receivers} receivers")
            return receivers
            
        except Exception as e:
            print(f"Error publishing message: {e}")
            return 0
    
    def subscribe_channel(self, channel, message_handler=None):
        """チャネル購読"""
        try:
            self.pubsub.subscribe(channel)
            self.subscribed_channels.add(channel)
            
            if message_handler:
                self.message_handlers[channel] = message_handler
                
            print(f"Subscribed to channel: {channel}")
            
        except Exception as e:
            print(f"Error subscribing to channel {channel}: {e}")
    
    def subscribe_pattern(self, pattern, message_handler=None):
        """パターン購読"""
        try:
            self.pubsub.psubscribe(pattern)
            
            if message_handler:
                self.message_handlers[pattern] = message_handler
                
            print(f"Subscribed to pattern: {pattern}")
            
        except Exception as e:
            print(f"Error subscribing to pattern {pattern}: {e}")
    
    def default_message_handler(self, message):
        """デフォルトメッセージハンドラー"""
        if message['type'] == 'message':
            channel = message['channel']
            data = message['data']
            
            # JSON形式の解析を試行
            try:
                parsed_data = json.loads(data)
                print(f"[{channel}] {parsed_data}")
            except json.JSONDecodeError:
                print(f"[{channel}] {data}")
                
        elif message['type'] == 'pmessage':
            pattern = message['pattern']
            channel = message['channel']
            data = message['data']
            print(f"[Pattern: {pattern}] [{channel}] {data}")
            
        elif message['type'] in ['subscribe', 'psubscribe']:
            print(f"Subscribed to {message['channel']}, total: {message['data']} subscriptions")
            
        elif message['type'] in ['unsubscribe', 'punsubscribe']:
            print(f"Unsubscribed from {message['channel']}, remaining: {message['data']} subscriptions")
    
    def start_listening(self, timeout=1.0):
        """メッセージ受信開始"""
        print("Starting message listener...")
        
        try:
            for message in self.pubsub.listen():
                if message is not None:
                    # チャネル固有のハンドラーがある場合
                    channel = message.get('channel')
                    pattern = message.get('pattern')
                    
                    handler = None
                    if channel and channel in self.message_handlers:
                        handler = self.message_handlers[channel]
                    elif pattern and pattern in self.message_handlers:
                        handler = self.message_handlers[pattern]
                    
                    if handler:
                        try:
                            handler(message)
                        except Exception as e:
                            print(f"Error in message handler: {e}")
                    else:
                        self.default_message_handler(message)
                        
        except KeyboardInterrupt:
            print("\nStopping message listener...")
            self.close()
    
    def unsubscribe_all(self):
        """全チャネル購読解除"""
        try:
            self.pubsub.unsubscribe()
            self.pubsub.punsubscribe()
            self.subscribed_channels.clear()
            self.message_handlers.clear()
            print("Unsubscribed from all channels and patterns")
            
        except Exception as e:
            print(f"Error unsubscribing: {e}")
    
    def get_channel_info(self, *channels):
        """チャネル情報取得"""
        try:
            # アクティブチャネル一覧
            active_channels = self.redis_client.pubsub_channels()
            print(f"Active channels: {active_channels}")
            
            # 特定チャネルの購読者数
            if channels:
                subscriber_counts = self.redis_client.pubsub_numsub(*channels)
                for channel, count in subscriber_counts:
                    print(f"Channel '{channel}': {count} subscribers")
            
            # パターン購読数
            pattern_count = self.redis_client.pubsub_numpat()
            print(f"Pattern subscriptions: {pattern_count}")
            
        except Exception as e:
            print(f"Error getting channel info: {e}")
    
    def close(self):
        """接続クローズ"""
        try:
            self.pubsub.close()
            self.redis_client.close()
            print("Redis Pub/Sub connection closed")
            
        except Exception as e:
            print(f"Error closing connection: {e}")

# 使用例とサンプルアプリケーション
def user_notification_handler(message):
    """ユーザー通知専用ハンドラー"""
    try:
        data = json.loads(message['data'])
        user_id = data['data'].get('user_id')
        notification_type = data['data'].get('type')
        content = data['data'].get('content')
        
        print(f"🔔 USER {user_id} | {notification_type.upper()}: {content}")
        
    except Exception as e:
        print(f"Error processing user notification: {e}")

def chat_message_handler(message):
    """チャットメッセージ専用ハンドラー"""
    try:
        data = json.loads(message['data'])
        username = data['data'].get('username')
        content = data['data'].get('message')
        
        print(f"💬 {username}: {content}")
        
    except Exception as e:
        print(f"Error processing chat message: {e}")

# パブリッシャー例
def publisher_example():
    manager = RedisPubSubManager()
    
    try:
        # 様々なメッセージタイプの発行
        # ニュース発行
        manager.publish_message('news:tech', {
            'title': 'AI breakthrough in 2024',
            'content': 'New AI model achieves human-level performance',
            'category': 'technology'
        }, 'news')
        
        # チャットメッセージ発行
        manager.publish_message('chat:general', {
            'username': 'alice',
            'message': 'Hello everyone!',
            'room': 'general'
        }, 'chat')
        
        # ユーザー通知発行
        manager.publish_message('user:notifications', {
            'user_id': 12345,
            'type': 'message',
            'content': 'You have a new private message'
        }, 'notification')
        
        # システム通知発行
        manager.publish_message('system:alerts', {
            'level': 'warning',
            'message': 'High CPU usage detected',
            'timestamp': datetime.now().isoformat()
        }, 'system_alert')
        
    finally:
        manager.close()

# サブスクライバー例
def subscriber_example():
    manager = RedisPubSubManager()
    
    try:
        # 個別チャネル購読
        manager.subscribe_channel('chat:general', chat_message_handler)
        manager.subscribe_channel('user:notifications', user_notification_handler)
        
        # パターン購読
        manager.subscribe_pattern('news:*')
        manager.subscribe_pattern('system:*')
        
        # チャネル情報確認
        manager.get_channel_info('chat:general', 'user:notifications')
        
        # メッセージ受信開始
        manager.start_listening()
        
    except KeyboardInterrupt:
        print("\nShutting down subscriber...")
    finally:
        manager.unsubscribe_all()
        manager.close()

if __name__ == "__main__":
    import sys
    
    if len(sys.argv) > 1 and sys.argv[1] == 'publish':
        print("Running publisher example...")
        publisher_example()
    else:
        print("Running subscriber example (Ctrl+C to stop)...")
        subscriber_example()

Node.js実装(ioredis)

// npm install ioredis

const Redis = require('ioredis');

class RedisPubSubManager {
    constructor(options = {}) {
        this.publisherClient = new Redis({
            host: options.host || 'localhost',
            port: options.port || 6379,
            db: options.db || 0,
            password: options.password,
            retryDelayOnFailover: 100,
            enableReadyCheck: false,
            maxRetriesPerRequest: null,
        });
        
        this.subscriberClient = new Redis({
            host: options.host || 'localhost',
            port: options.port || 6379,
            db: options.db || 0,
            password: options.password,
            retryDelayOnFailover: 100,
            enableReadyCheck: false,
            maxRetriesPerRequest: null,
        });
        
        this.messageHandlers = new Map();
        this.setupErrorHandling();
    }
    
    setupErrorHandling() {
        this.publisherClient.on('error', (err) => {
            console.error('Publisher Redis Error:', err);
        });
        
        this.subscriberClient.on('error', (err) => {
            console.error('Subscriber Redis Error:', err);
        });
        
        this.publisherClient.on('connect', () => {
            console.log('Publisher connected to Redis');
        });
        
        this.subscriberClient.on('connect', () => {
            console.log('Subscriber connected to Redis');
        });
    }
    
    async publishMessage(channel, message, metadata = {}) {
        try {
            const messageData = {
                type: metadata.type || 'text',
                timestamp: new Date().toISOString(),
                source: metadata.source || 'node-app',
                data: message
            };
            
            const receiverCount = await this.publisherClient.publish(
                channel, 
                JSON.stringify(messageData)
            );
            
            console.log(`📤 Published to '${channel}': ${receiverCount} receivers`);
            return receiverCount;
            
        } catch (error) {
            console.error(`Error publishing message to ${channel}:`, error);
            return 0;
        }
    }
    
    subscribeChannel(channel, messageHandler) {
        try {
            this.subscriberClient.subscribe(channel);
            
            if (messageHandler) {
                this.messageHandlers.set(channel, messageHandler);
            }
            
            console.log(`📥 Subscribed to channel: ${channel}`);
            
        } catch (error) {
            console.error(`Error subscribing to channel ${channel}:`, error);
        }
    }
    
    subscribePattern(pattern, messageHandler) {
        try {
            this.subscriberClient.psubscribe(pattern);
            
            if (messageHandler) {
                this.messageHandlers.set(pattern, messageHandler);
            }
            
            console.log(`📥 Subscribed to pattern: ${pattern}`);
            
        } catch (error) {
            console.error(`Error subscribing to pattern ${pattern}:`, error);
        }
    }
    
    startListening() {
        // 通常のチャネルメッセージ
        this.subscriberClient.on('message', (channel, message) => {
            this.handleMessage('message', channel, null, message);
        });
        
        // パターンマッチメッセージ
        this.subscriberClient.on('pmessage', (pattern, channel, message) => {
            this.handleMessage('pmessage', channel, pattern, message);
        });
        
        // 購読確認
        this.subscriberClient.on('subscribe', (channel, count) => {
            console.log(`✅ Subscribed to '${channel}', total subscriptions: ${count}`);
        });
        
        this.subscriberClient.on('psubscribe', (pattern, count) => {
            console.log(`✅ Pattern subscribed to '${pattern}', total subscriptions: ${count}`);
        });
        
        // 購読解除確認
        this.subscriberClient.on('unsubscribe', (channel, count) => {
            console.log(`❌ Unsubscribed from '${channel}', remaining: ${count}`);
        });
        
        this.subscriberClient.on('punsubscribe', (pattern, count) => {
            console.log(`❌ Pattern unsubscribed from '${pattern}', remaining: ${count}`);
        });
        
        console.log('🎧 Message listener started...');
    }
    
    handleMessage(type, channel, pattern, message) {
        try {
            // 適切なハンドラーを検索
            let handler = null;
            
            if (type === 'message' && this.messageHandlers.has(channel)) {
                handler = this.messageHandlers.get(channel);
            } else if (type === 'pmessage' && this.messageHandlers.has(pattern)) {
                handler = this.messageHandlers.get(pattern);
            }
            
            // メッセージデータのパース
            let messageData;
            try {
                messageData = JSON.parse(message);
            } catch (e) {
                messageData = { data: message, timestamp: new Date().toISOString() };
            }
            
            if (handler) {
                handler({
                    type,
                    channel,
                    pattern,
                    message: messageData,
                    rawMessage: message
                });
            } else {
                this.defaultMessageHandler({ type, channel, pattern, message: messageData });
            }
            
        } catch (error) {
            console.error('Error handling message:', error);
        }
    }
    
    defaultMessageHandler({ type, channel, pattern, message }) {
        const timestamp = new Date().toLocaleTimeString();
        
        if (type === 'message') {
            console.log(`[${timestamp}] 📨 ${channel}: ${JSON.stringify(message)}`);
        } else if (type === 'pmessage') {
            console.log(`[${timestamp}] 🔍 ${pattern} -> ${channel}: ${JSON.stringify(message)}`);
        }
    }
    
    async getChannelInfo(...channels) {
        try {
            // アクティブチャネル一覧
            const activeChannels = await this.publisherClient.pubsub('CHANNELS');
            console.log('Active channels:', activeChannels);
            
            // 特定チャネルの購読者数
            if (channels.length > 0) {
                const subscriberCounts = await this.publisherClient.pubsub('NUMSUB', ...channels);
                for (let i = 0; i < subscriberCounts.length; i += 2) {
                    console.log(`Channel '${subscriberCounts[i]}': ${subscriberCounts[i + 1]} subscribers`);
                }
            }
            
            // パターン購読数
            const patternCount = await this.publisherClient.pubsub('NUMPAT');
            console.log(`Pattern subscriptions: ${patternCount}`);
            
        } catch (error) {
            console.error('Error getting channel info:', error);
        }
    }
    
    async unsubscribeAll() {
        try {
            await this.subscriberClient.unsubscribe();
            await this.subscriberClient.punsubscribe();
            this.messageHandlers.clear();
            console.log('Unsubscribed from all channels and patterns');
        } catch (error) {
            console.error('Error unsubscribing:', error);
        }
    }
    
    async close() {
        try {
            await this.unsubscribeAll();
            await this.publisherClient.quit();
            await this.subscriberClient.quit();
            console.log('Redis Pub/Sub connections closed');
        } catch (error) {
            console.error('Error closing connections:', error);
        }
    }
}

// 特定用途のメッセージハンドラー例
function createChatHandler(roomName) {
    return ({ channel, message }) => {
        const { data } = message;
        console.log(`💬 [${roomName}] ${data.username}: ${data.message}`);
    };
}

function createNotificationHandler() {
    return ({ channel, message }) => {
        const { data } = message;
        const emoji = data.level === 'error' ? '🚨' : 
                     data.level === 'warning' ? '⚠️' : 'ℹ️';
        console.log(`${emoji} ${data.level.toUpperCase()}: ${data.message}`);
    };
}

function createUserActivityHandler() {
    return ({ channel, message }) => {
        const { data } = message;
        console.log(`👤 User ${data.userId} performed action: ${data.action}`);
    };
}

// パブリッシャー例
async function publisherExample() {
    const manager = new RedisPubSubManager();
    
    try {
        // 様々なタイプのメッセージを発行
        await manager.publishMessage('chat:general', {
            username: 'alice',
            message: 'Hello from Node.js!',
            timestamp: new Date().toISOString()
        }, { type: 'chat', source: 'node-publisher' });
        
        await manager.publishMessage('notifications:system', {
            level: 'info',
            message: 'System maintenance completed',
            component: 'web-server'
        }, { type: 'notification' });
        
        await manager.publishMessage('user:activity', {
            userId: 12345,
            action: 'login',
            ip: '192.168.1.100',
            userAgent: 'Node.js Client'
        }, { type: 'user_activity' });
        
        // パターンマッチングテスト
        await manager.publishMessage('events:user:login', {
            event: 'user_login',
            details: { userId: 67890, location: 'Tokyo' }
        });
        
        await manager.publishMessage('events:order:created', {
            event: 'order_created',
            details: { orderId: 'ORD-001', amount: 150.00 }
        });
        
    } finally {
        await manager.close();
    }
}

// サブスクライバー例
async function subscriberExample() {
    const manager = new RedisPubSubManager();
    
    try {
        // カスタムハンドラーでチャネル購読
        manager.subscribeChannel('chat:general', createChatHandler('General'));
        manager.subscribeChannel('notifications:system', createNotificationHandler());
        manager.subscribeChannel('user:activity', createUserActivityHandler());
        
        // パターン購読
        manager.subscribePattern('events:*');
        manager.subscribePattern('alerts:*');
        
        // メッセージ受信開始
        manager.startListening();
        
        // チャネル情報確認
        setTimeout(() => {
            manager.getChannelInfo('chat:general', 'notifications:system', 'user:activity');
        }, 1000);
        
        // Graceful shutdown
        process.on('SIGINT', async () => {
            console.log('\n🛑 Shutting down subscriber...');
            await manager.close();
            process.exit(0);
        });
        
        // 継続実行
        console.log('Press Ctrl+C to stop the subscriber...');
        
    } catch (error) {
        console.error('Error in subscriber:', error);
        await manager.close();
    }
}

// 実行例
if (require.main === module) {
    const mode = process.argv[2];
    
    if (mode === 'publish') {
        console.log('🚀 Running publisher example...');
        publisherExample().catch(console.error);
    } else {
        console.log('🎧 Running subscriber example...');
        subscriberExample().catch(console.error);
    }
}

module.exports = RedisPubSubManager;

パターンマッチングと高度な機能

# 高度なパターンマッチング例
redis-cli

# 階層的チャネル構造
SUBSCRIBE app:user:login
SUBSCRIBE app:user:logout
SUBSCRIBE app:order:created
SUBSCRIBE app:order:completed

# パターン購読で一括処理
PSUBSCRIBE app:user:*      # ユーザー関連イベント全て
PSUBSCRIBE app:order:*     # 注文関連イベント全て
PSUBSCRIBE app:*:error     # 全モジュールのエラー

# 地域別チャネル
PSUBSCRIBE region:asia:*
PSUBSCRIBE region:europe:*
PSUBSCRIBE region:america:*

# レベル別通知
PSUBSCRIBE alerts:critical:*
PSUBSCRIBE alerts:warning:*
PSUBSCRIBE alerts:info:*

# タイムベースチャネル
PSUBSCRIBE events:2024:*
PSUBSCRIBE logs:hourly:*
PSUBSCRIBE stats:daily:*

# Sharded Pub/Sub(Redis 7.0+)
# 高性能なクラスター環境での使用
SSUBSCRIBE orders:asia
SSUBSCRIBE orders:europe  
SSUBSCRIBE orders:america

SPUBLISH orders:asia "Order #A123 from Tokyo"
SPUBLISH orders:europe "Order #E456 from London"

# チャネル情報とモニタリング
PUBSUB CHANNELS                    # アクティブチャネル
PUBSUB CHANNELS app:user:*         # パターンマッチするチャネル
PUBSUB NUMSUB app:user:login       # 特定チャネルの購読者数
PUBSUB NUMPAT                      # パターン購読の総数

# Sharded Pub/Sub情報
PUBSUB SHARDCHANNELS              # シャードチャネル一覧
PUBSUB SHARDNUMSUB orders:asia    # シャードチャネル購読者数

実践的なユースケース実装

# リアルタイムチャットアプリケーション
import asyncio
import redis.asyncio as redis
import json
from datetime import datetime

class RealTimeChatApp:
    def __init__(self):
        self.redis = redis.Redis(host='localhost', port=6379, decode_responses=True)
        self.pubsub = self.redis.pubsub()
        self.active_users = set()
        
    async def join_room(self, user_id, room_id):
        """ユーザーがチャットルームに参加"""
        channel = f"chat:room:{room_id}"
        
        # ルーム参加通知
        join_message = {
            'type': 'user_joined',
            'user_id': user_id,
            'timestamp': datetime.now().isoformat(),
            'message': f"User {user_id} joined the room"
        }
        
        await self.redis.publish(channel, json.dumps(join_message))
        await self.pubsub.subscribe(channel)
        self.active_users.add(user_id)
        
        print(f"User {user_id} joined room {room_id}")
    
    async def send_message(self, user_id, room_id, message):
        """チャットメッセージ送信"""
        channel = f"chat:room:{room_id}"
        
        chat_message = {
            'type': 'chat_message',
            'user_id': user_id,
            'room_id': room_id,
            'message': message,
            'timestamp': datetime.now().isoformat()
        }
        
        receivers = await self.redis.publish(channel, json.dumps(chat_message))
        print(f"Message sent to {receivers} users in room {room_id}")
        
    async def leave_room(self, user_id, room_id):
        """ユーザーがチャットルームから退出"""
        channel = f"chat:room:{room_id}"
        
        leave_message = {
            'type': 'user_left',
            'user_id': user_id,
            'timestamp': datetime.now().isoformat(),
            'message': f"User {user_id} left the room"
        }
        
        await self.redis.publish(channel, json.dumps(leave_message))
        await self.pubsub.unsubscribe(channel)
        self.active_users.discard(user_id)
        
        print(f"User {user_id} left room {room_id}")
    
    async def listen_messages(self):
        """メッセージ受信処理"""
        async for message in self.pubsub.listen():
            if message['type'] == 'message':
                try:
                    data = json.loads(message['data'])
                    await self.handle_chat_message(data)
                except json.JSONDecodeError:
                    print(f"Invalid message format: {message['data']}")
    
    async def handle_chat_message(self, data):
        """チャットメッセージ処理"""
        message_type = data.get('type')
        
        if message_type == 'chat_message':
            print(f"💬 [{data['room_id']}] {data['user_id']}: {data['message']}")
        elif message_type == 'user_joined':
            print(f"👋 {data['message']}")
        elif message_type == 'user_left':
            print(f"👋 {data['message']}")
    
    async def close(self):
        await self.pubsub.close()
        await self.redis.close()

# イベント駆動システム
class EventDrivenSystem:
    def __init__(self):
        self.redis = redis.Redis(host='localhost', port=6379, decode_responses=True)
        self.event_handlers = {}
        
    def register_event_handler(self, event_type, handler):
        """イベントハンドラー登録"""
        if event_type not in self.event_handlers:
            self.event_handlers[event_type] = []
        self.event_handlers[event_type].append(handler)
        
    async def emit_event(self, event_type, event_data):
        """イベント発行"""
        event = {
            'type': event_type,
            'data': event_data,
            'timestamp': datetime.now().isoformat(),
            'event_id': f"{event_type}_{int(datetime.now().timestamp() * 1000)}"
        }
        
        channel = f"events:{event_type}"
        receivers = await self.redis.publish(channel, json.dumps(event))
        print(f"Event '{event_type}' emitted to {receivers} listeners")
        
    async def subscribe_events(self, *event_types):
        """イベント購読"""
        pubsub = self.redis.pubsub()
        
        for event_type in event_types:
            channel = f"events:{event_type}"
            await pubsub.subscribe(channel)
            print(f"Subscribed to events: {event_type}")
        
        async for message in pubsub.listen():
            if message['type'] == 'message':
                try:
                    event = json.loads(message['data'])
                    await self.handle_event(event)
                except json.JSONDecodeError:
                    print(f"Invalid event format: {message['data']}")
    
    async def handle_event(self, event):
        """イベント処理"""
        event_type = event.get('type')
        
        if event_type in self.event_handlers:
            for handler in self.event_handlers[event_type]:
                try:
                    await handler(event)
                except Exception as e:
                    print(f"Error in event handler for {event_type}: {e}")
        else:
            print(f"No handler for event type: {event_type}")

# 使用例
async def chat_demo():
    """チャットアプリデモ"""
    app = RealTimeChatApp()
    
    try:
        # ユーザーがルームに参加
        await app.join_room("alice", "general")
        await app.join_room("bob", "general")
        
        # メッセージ送信
        await app.send_message("alice", "general", "Hello everyone!")
        await app.send_message("bob", "general", "Hi Alice!")
        
        # メッセージ受信開始(実際のアプリではバックグラウンドで実行)
        # await app.listen_messages()
        
    finally:
        await app.close()

async def event_demo():
    """イベント駆動システムデモ"""
    system = EventDrivenSystem()
    
    # イベントハンドラー登録
    async def user_registered_handler(event):
        data = event['data']
        print(f"📧 Sending welcome email to {data['email']}")
    
    async def order_created_handler(event):
        data = event['data']
        print(f"📦 Processing order {data['order_id']} for ${data['amount']}")
    
    system.register_event_handler('user_registered', user_registered_handler)
    system.register_event_handler('order_created', order_created_handler)
    
    # イベント発行
    await system.emit_event('user_registered', {
        'user_id': 12345,
        'email': '[email protected]',
        'name': 'John Doe'
    })
    
    await system.emit_event('order_created', {
        'order_id': 'ORD-001',
        'user_id': 12345,
        'amount': 99.99,
        'items': ['Product A', 'Product B']
    })

if __name__ == "__main__":
    # asyncio.run(chat_demo())
    asyncio.run(event_demo())

監視とパフォーマンス最適化

# Redis Pub/Sub監視スクリプト
cat > monitor-pubsub.sh << 'EOF'
#!/bin/bash

echo "=== Redis Pub/Sub Monitoring Report $(date) ==="

# Redis接続確認
echo "=== Redis Connection Status ==="
redis-cli ping > /dev/null 2>&1 && echo "Redis: CONNECTED" || echo "Redis: DISCONNECTED"

# アクティブチャネル数
echo "=== Active Channels ==="
ACTIVE_CHANNELS=$(redis-cli PUBSUB CHANNELS | wc -l)
echo "Total active channels: $ACTIVE_CHANNELS"

# パターン購読数
echo "=== Pattern Subscriptions ==="
PATTERN_SUBS=$(redis-cli PUBSUB NUMPAT)
echo "Pattern subscriptions: $PATTERN_SUBS"

# 主要チャネルの購読者数
echo "=== Channel Subscribers ==="
redis-cli PUBSUB NUMSUB chat:general news:tech system:alerts user:notifications | \
while read channel count; do
    if [ -n "$channel" ] && [ -n "$count" ]; then
        echo "Channel '$channel': $count subscribers"
    fi
done

# Redis情報
echo "=== Redis Server Info ==="
redis-cli INFO clients | grep connected_clients
redis-cli INFO memory | grep used_memory_human
redis-cli INFO stats | grep total_commands_processed

# Pub/Sub統計
echo "=== Pub/Sub Statistics ==="
redis-cli INFO stats | grep pubsub

echo "=== Monitoring Complete ==="
EOF

chmod +x monitor-pubsub.sh

# パフォーマンステスト
# 大量メッセージ送信テスト
redis-cli --eval - 0 << 'EOF'
for i=1,10000 do
    redis.call('PUBLISH', 'performance_test', 'Message ' .. i)
end
return 'Published 10000 messages'
EOF

# 購読者パフォーマンステスト
redis-cli --latency-history -i 1 PUBLISH test_channel "performance test"

# メモリ使用量監視
redis-cli --bigkeys

# クライアント接続監視
redis-cli CLIENT LIST

Redis Pub/Subは、そのシンプルさと高性能により、リアルタイムメッセージング、イベント駆動アーキテクチャ、チャットアプリケーション、ライブ通知システムなど幅広い用途で活用されています。特に1ミリ秒レベルの低遅延が要求される用途や、軽量で高速なメッセージ配信が必要なアプリケーションで威力を発揮します。