GroupCache

キャッシュライブラリGoGolang分散キャッシュGooglememcached代替thundering herd

GitHub概要

golang/groupcache

groupcache is a caching and cache-filling library, intended as a replacement for memcached in many cases.

スター13,262
ウォッチ470
フォーク1,393
作成日:2013年7月22日
言語:Go
ライセンス:Apache License 2.0

トピックス

なし

スター履歴

golang/groupcache Star History
データ取得日時: 2025/10/22 10:05

キャッシュライブラリ

GroupCache

概要

GroupCacheは、多くの場合でmemcachedプールの代替として設計された分散キャッシュおよびキャッシュ補完ライブラリです。ピアプロセス間で動作するデータ読み込み機構を提供し、キャッシュとde-duplication機能を持ちます。Googleのmemcache作者であるBrad Fitzpatrickによって開発されました。

詳細

GroupCacheは、クライアントライブラリでありながらサーバーでもあります。自身のピアに接続して分散キャッシュを形成し、すべてのアプリケーションインスタンスがキャッシュノードになるIn Code Distributed Cache(ICDC)を実現します。

memcachedが「キャッシュミス」に対して単純に失敗を返すのに対し、GroupCacheは複数のプロセス間でキャッシュフィルを調整し、レプリケートされたプロセスセット全体で1つのプロセス内の1つの読み込みのみがキャッシュを埋め、その値をすべての呼び出し元に多重化します。この設計により、thundering herd問題を防ぎ、非常に人気のあるキー/値によるマシンのCPUやNICの過負荷を防ぎます。

メリット・デメリット

メリット

  • Thundering herd保護による高効率なキャッシュフィル
  • 超人気アイテムの自動ミラーリングでホットスポット防止
  • 分散アーキテクチャでスケーラビリティ向上
  • Googleの本番環境での実績(dl.google.com、Blogger等)
  • プロセス内キャッシュとピア間通信の組み合わせで高性能
  • memcachedの作者による設計で信頼性が高い
  • 読み込み専用ワークロードに最適化

デメリット

  • キャッシュ有効期限や明示的な削除機能がない
  • CAS(Compare-and-Swap)やIncrement/Decrement機能なし
  • キャッシュの無効化や値の更新機能がない
  • 読み込み中心のアプリケーション以外には不向き
  • 設定やデプロイメントが他のキャッシュシステムより複雑
  • ピア間の通信設定と管理が必要

参考ページ

書き方の例

基本的なセットアップ

package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "github.com/golang/groupcache"
)

var (
    // グローバルなGroupCacheインスタンス
    thumbnails *groupcache.Group
)

func main() {
    // HTTPプールの設定(ピア間通信用)
    peers := groupcache.NewHTTPPool("http://localhost:8080")
    
    // ピアリストの設定
    peers.Set("http://localhost:8080", "http://localhost:8081", "http://localhost:8082")
    
    // グループの作成
    thumbnails = groupcache.NewGroup("thumbnails", 64<<20, groupcache.GetterFunc(
        func(ctx context.Context, key string, dest groupcache.Sink) error {
            // キャッシュミス時の処理(データ取得ロジック)
            thumbnail, err := generateThumbnail(key)
            if err != nil {
                return err
            }
            
            // データをsinkに設定
            dest.SetBytes(thumbnail)
            return nil
        },
    ))
    
    // HTTPサーバーの起動
    http.Handle("/_groupcache/", peers)
    log.Fatal(http.ListenAndServe(":8080", nil))
}

データ取得の実装

// サムネイル生成の例(実際のデータソースから取得)
func generateThumbnail(imagePath string) ([]byte, error) {
    // データベースやファイルシステムからの読み込み
    log.Printf("Generating thumbnail for: %s", imagePath)
    
    // 重い処理をシミュレート
    time.Sleep(100 * time.Millisecond)
    
    // 実際の処理では画像処理ライブラリを使用
    thumbnail := []byte(fmt.Sprintf("thumbnail_data_for_%s", imagePath))
    return thumbnail, nil
}

// GroupCacheからデータを取得
func getThumbnail(imagePath string) ([]byte, error) {
    var data []byte
    
    err := thumbnails.Get(context.Background(), imagePath, groupcache.AllocatingByteSliceSink(&data))
    if err != nil {
        return nil, err
    }
    
    return data, nil
}

HTTP API との統合

// RESTful APIとの統合例
func thumbnailHandler(w http.ResponseWriter, r *http.Request) {
    imagePath := r.URL.Query().Get("path")
    if imagePath == "" {
        http.Error(w, "Missing path parameter", http.StatusBadRequest)
        return
    }
    
    // GroupCacheから取得
    thumbnailData, err := getThumbnail(imagePath)
    if err != nil {
        http.Error(w, fmt.Sprintf("Error: %v", err), http.StatusInternalServerError)
        return
    }
    
    // レスポンスを返す
    w.Header().Set("Content-Type", "image/jpeg")
    w.Header().Set("Cache-Control", "public, max-age=3600")
    w.Write(thumbnailData)
}

func main() {
    // GroupCacheの設定(前述の通り)
    setupGroupCache()
    
    // APIエンドポイントの登録
    http.HandleFunc("/api/thumbnail", thumbnailHandler)
    http.Handle("/_groupcache/", peers)
    
    log.Printf("Server starting on :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

複数グループの管理

var (
    userCache     *groupcache.Group
    productCache  *groupcache.Group
    configCache   *groupcache.Group
)

func initializeCaches() {
    // ユーザー情報キャッシュ
    userCache = groupcache.NewGroup("users", 32<<20, groupcache.GetterFunc(
        func(ctx context.Context, key string, dest groupcache.Sink) error {
            user, err := fetchUserFromDatabase(key)
            if err != nil {
                return err
            }
            
            userData, _ := json.Marshal(user)
            dest.SetBytes(userData)
            return nil
        },
    ))
    
    // 商品情報キャッシュ
    productCache = groupcache.NewGroup("products", 64<<20, groupcache.GetterFunc(
        func(ctx context.Context, key string, dest groupcache.Sink) error {
            product, err := fetchProductFromAPI(key)
            if err != nil {
                return err
            }
            
            productData, _ := json.Marshal(product)
            dest.SetBytes(productData)
            return nil
        },
    ))
    
    // 設定情報キャッシュ(変更頻度が低いデータ)
    configCache = groupcache.NewGroup("config", 8<<20, groupcache.GetterFunc(
        func(ctx context.Context, key string, dest groupcache.Sink) error {
            config, err := loadConfigFromFile(key)
            if err != nil {
                return err
            }
            
            dest.SetString(config)
            return nil
        },
    ))
}

ピア検出とクラスター管理

// 動的ピア検出の例
type PeerManager struct {
    pool  *groupcache.HTTPPool
    peers []string
}

func NewPeerManager(selfAddr string) *PeerManager {
    return &PeerManager{
        pool: groupcache.NewHTTPPool(selfAddr),
    }
}

func (pm *PeerManager) UpdatePeers(newPeers []string) {
    pm.peers = newPeers
    pm.pool.Set(pm.peers...)
    log.Printf("Updated peers: %v", pm.peers)
}

// Kubernetes環境での動的ピア検出
func (pm *PeerManager) DiscoverKubernetesPeers() {
    // Kubernetes APIを使用してPodの検出
    pods, err := getGroupCachePods()
    if err != nil {
        log.Printf("Error discovering peers: %v", err)
        return
    }
    
    var peerAddrs []string
    for _, pod := range pods {
        if pod.Status.Phase == "Running" {
            addr := fmt.Sprintf("http://%s:8080", pod.Status.PodIP)
            peerAddrs = append(peerAddrs, addr)
        }
    }
    
    pm.UpdatePeers(peerAddrs)
}

統計情報とモニタリング

// GroupCache統計情報の取得
func getGroupCacheStats() map[string]groupcache.Stats {
    stats := make(map[string]groupcache.Stats)
    
    // 各グループの統計を収集
    if userCache != nil {
        stats["users"] = userCache.Stats
    }
    if productCache != nil {
        stats["products"] = productCache.Stats
    }
    
    return stats
}

// 統計情報をHTTPエンドポイントで公開
func statsHandler(w http.ResponseWriter, r *http.Request) {
    stats := getGroupCacheStats()
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(stats)
}

// ヘルスチェックエンドポイント
func healthHandler(w http.ResponseWriter, r *http.Request) {
    // GroupCacheの健康状態をチェック
    stats := getGroupCacheStats()
    
    healthy := true
    for groupName, stat := range stats {
        // 基本的なヘルスチェック
        if stat.Gets.Get() == 0 && time.Since(startTime) > 5*time.Minute {
            log.Printf("Group %s has no requests", groupName)
            healthy = false
        }
    }
    
    if healthy {
        w.WriteHeader(http.StatusOK)
        w.Write([]byte("OK"))
    } else {
        w.WriteHeader(http.StatusServiceUnavailable)
        w.Write([]byte("Not healthy"))
    }
}

エラーハンドリングとフォールバック

// ロバストなデータ取得実装
func robustGetThumbnail(imagePath string) ([]byte, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    var data []byte
    err := thumbnails.Get(ctx, imagePath, groupcache.AllocatingByteSliceSink(&data))
    
    switch err {
    case nil:
        return data, nil
    case context.DeadlineExceeded:
        log.Printf("Timeout getting thumbnail for %s", imagePath)
        // フォールバック:デフォルト画像を返す
        return getDefaultThumbnail(), nil
    default:
        log.Printf("Error getting thumbnail for %s: %v", imagePath, err)
        return nil, err
    }
}

// サーキットブレーカーパターンの実装
type CircuitBreaker struct {
    failures int
    lastFailure time.Time
    threshold int
    timeout time.Duration
}

func (cb *CircuitBreaker) Call(fn func() error) error {
    if cb.failures >= cb.threshold {
        if time.Since(cb.lastFailure) < cb.timeout {
            return fmt.Errorf("circuit breaker open")
        }
        // 回復を試行
        cb.failures = 0
    }
    
    err := fn()
    if err != nil {
        cb.failures++
        cb.lastFailure = time.Now()
        return err
    }
    
    cb.failures = 0
    return nil
}