Akka Cache

ScalaJavaAkkaキャッシュライブラリ分散システムリアクティブ

ライブラリ

Akka Cache

概要

Akka Cacheは、Akka HTTPエコシステムの一部として提供される高性能なキャッシュライブラリです。

詳細

Akka Cacheは、Akka HTTPフレームワークに組み込まれている軽量で高速なキャッシュライブラリです。ScalaとJavaの両方をサポートし、非同期・リアクティブなアプリケーション開発に特化しています。Future/CompletableFutureベースのAPIを採用し、「Thundering Herd問題」を自動的に解決する仕組みを持っています。実際の値ではなく、対応するFutureオブジェクトをキャッシュすることで、同じキーに対する複数のリクエストが同時に到着した場合でも、最初のリクエストの完了を待ってからキャッシュされた結果を返します。LRU(Least Recently Used)をベースとした効率的なキャッシュアルゴリズムを使用し、TTL(Time To Live)、サイズ制限、手動での無効化などの柔軟な設定が可能です。Akka Clusterと組み合わせることで分散キャッシングも実現できます。

メリット・デメリット

メリット

  • 非同期対応: Futureベースで高い並行性を実現
  • Thundering Herd対策: 同一キーへの同時リクエストを自動的に制御
  • 軽量: Akka HTTPに最適化された最小限の依存関係
  • 型安全: Scala/Java の型システムを活用した安全な API
  • 柔軟な設定: TTL、サイズ制限、無効化ポリシーの細かな制御
  • 分散対応: Akka Clusterとの統合による分散キャッシング
  • 高性能: LRUアルゴリズムによる効率的なメモリ使用

デメリット

  • Akka依存: Akkaエコシステム外での利用が困難
  • メモリ限定: 基本的にインメモリキャッシュのみ
  • 学習コスト: リアクティブプログラミングの理解が必要
  • 設定の複雑さ: 分散環境では設定が複雑になる可能性
  • デバッグ: 非同期処理によるデバッグの難しさ

主要リンク

書き方の例

依存関係の追加

// build.sbt (Scala)
val AkkaHttpVersion = "10.7.1"
libraryDependencies += "com.typesafe.akka" %% "akka-http-caching" % AkkaHttpVersion
<!-- Maven (Java) -->
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-http-caching_2.13</artifactId>
    <version>10.7.1</version>
</dependency>

基本的なキャッシュ操作(Scala)

import akka.http.caching.scaladsl.Cache
import akka.http.caching.LfuCache
import scala.concurrent.Future
import scala.concurrent.duration._

implicit val system = ActorSystem("cache-system")
implicit val dispatcher = system.dispatcher

// キャッシュの作成
val cache: Cache[String, String] = LfuCache[String, String](
  maxCapacity = 256,
  timeToLive = 1.hour
)

// キャッシュからの取得または計算
def expensiveOperation(key: String): Future[String] = {
  Future {
    Thread.sleep(1000) // 重い処理をシミュレート
    s"Result for $key"
  }
}

// キャッシュを使用したデータ取得
val result: Future[String] = cache.getOrLoad("user:123", expensiveOperation)

// 直接的なキャッシュ操作
cache.put("direct:key", Future.successful("Direct Value"))

// キャッシュの取得
val cachedValue: Option[Future[String]] = cache.get("user:123")

基本的なキャッシュ操作(Java)

import akka.http.caching.javadsl.Cache;
import akka.http.caching.LfuCache;
import java.util.concurrent.CompletableFuture;
import java.time.Duration;

ActorSystem system = ActorSystem.create("cache-system");

// キャッシュの作成
Cache<String, String> cache = LfuCache.create(
    CachingSettings.create(system).withMaxCapacity(256)
                                 .withTimeToLive(Duration.ofHours(1))
);

// 重い処理の定義
Function<String, CompletableFuture<String>> expensiveFunction = key -> 
    CompletableFuture.supplyAsync(() -> {
        try { Thread.sleep(1000); } catch (InterruptedException e) {}
        return "Result for " + key;
    });

// キャッシュからの取得または計算
CompletableFuture<String> result = cache.getOrLoad("user:123", expensiveFunction);

// 直接的なキャッシュ操作
cache.put("direct:key", CompletableFuture.completedFuture("Direct Value"));

// キャッシュの取得
Optional<CompletableFuture<String>> cachedValue = cache.get("user:123");

カスタムキャッシュ設定

import akka.http.caching.scaladsl.CachingSettings

// カスタム設定でキャッシュを作成
val customSettings = CachingSettings(system)
  .withMaxCapacity(1000)
  .withInitialCapacity(64)
  .withTimeToLive(30.minutes)
  .withTimeToIdle(10.minutes)

val customCache = LfuCache[String, UserData](customSettings)

// より詳細な設定
val advancedCache = LfuCache[String, ApiResponse](
  maxCapacity = 500,
  timeToLive = 1.hour,
  timeToIdle = 15.minutes
)

HTTP ルーティングでのキャッシュ活用

import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route

val userCache = LfuCache[String, User](
  maxCapacity = 1000,
  timeToLive = 1.hour
)

def fetchUser(id: String): Future[User] = {
  // データベースからユーザーを取得
  userRepository.findById(id)
}

val routes: Route = {
  path("users" / Segment) { userId =>
    get {
      onSuccess(userCache.getOrLoad(userId, fetchUser)) { user =>
        complete(user)
      }
    }
  }
}

複雑なキャッシュパターン

case class CacheKey(userId: String, version: Int)
case class UserProfile(name: String, email: String, lastModified: Instant)

class UserService(implicit system: ActorSystem) {
  private implicit val dispatcher = system.dispatcher
  
  private val profileCache = LfuCache[CacheKey, UserProfile](
    maxCapacity = 2000,
    timeToLive = 2.hours
  )
  
  def getUserProfile(userId: String, version: Int): Future[UserProfile] = {
    val key = CacheKey(userId, version)
    
    profileCache.getOrLoad(key, _ => {
      // データベースから取得
      fetchFromDatabase(userId)
        .map(profile => {
          // 追加の処理やエンリッチメント
          enrichProfile(profile)
        })
    })
  }
  
  def invalidateUser(userId: String): Unit = {
    // 特定ユーザーのキャッシュを無効化
    profileCache.keys.filter(_.userId == userId).foreach { key =>
      profileCache.remove(key)
    }
  }
  
  private def fetchFromDatabase(userId: String): Future[UserProfile] = {
    // データベース操作
    Future {
      UserProfile("John Doe", "[email protected]", Instant.now())
    }
  }
  
  private def enrichProfile(profile: UserProfile): UserProfile = {
    // プロファイルの追加処理
    profile
  }
}

分散キャッシュとの統合

import akka.cluster.ddata.Replicator._
import akka.cluster.ddata._

// Akka Distributed Dataを使用した分散キャッシュ
class DistributedCacheService(implicit system: ActorSystem) {
  private implicit val dispatcher = system.dispatcher
  private val replicator = DistributedData(system).replicator
  
  // ローカルキャッシュと分散キャッシュの組み合わせ
  private val localCache = LfuCache[String, String](
    maxCapacity = 100,
    timeToLive = 5.minutes
  )
  
  def get(key: String): Future[Option[String]] = {
    // まずローカルキャッシュを確認
    localCache.get(key) match {
      case Some(future) => future.map(Some(_))
      case None =>
        // 分散キャッシュから取得
        val distributedKey = LWWRegisterKey[String](key)
        replicator ? Get(distributedKey, ReadLocal) flatMap {
          case g @ GetSuccess(`distributedKey`, _) =>
            val value = g.dataValue.value
            localCache.put(key, Future.successful(value))
            Future.successful(Some(value))
          case _ => Future.successful(None)
        }
    }
  }
  
  def put(key: String, value: String): Future[Unit] = {
    // ローカルキャッシュに保存
    localCache.put(key, Future.successful(value))
    
    // 分散キャッシュにも保存
    val distributedKey = LWWRegisterKey[String](key)
    replicator ? Update(distributedKey, LWWRegister.empty[String], WriteLocal)(_ :+ value) map { _ =>
      ()
    }
  }
}

パフォーマンスモニタリング

import akka.http.caching.scaladsl.CachingSettings

class MonitoredCache[K, V](name: String)(implicit system: ActorSystem) {
  private implicit val dispatcher = system.dispatcher
  
  private val cache = LfuCache[K, V](
    maxCapacity = 1000,
    timeToLive = 1.hour
  )
  
  private var hitCount = 0L
  private var missCount = 0L
  
  def getOrLoad(key: K, loadFunction: K => Future[V]): Future[V] = {
    cache.get(key) match {
      case Some(future) =>
        hitCount += 1
        future
      case None =>
        missCount += 1
        cache.getOrLoad(key, loadFunction)
    }
  }
  
  def getStats: (Long, Long, Double) = {
    val total = hitCount + missCount
    val hitRate = if (total > 0) hitCount.toDouble / total else 0.0
    (hitCount, missCount, hitRate)
  }
  
  def logStats(): Unit = {
    val (hits, misses, hitRate) = getStats
    println(s"Cache[$name] - Hits: $hits, Misses: $misses, Hit Rate: ${(hitRate * 100).formatted("%.2f")}%")
  }
}