Akka Cache

ScalaJavaAkkaCache LibraryDistributed SystemsReactive

Library

Akka Cache

Overview

Akka Cache is a high-performance caching library provided as part of the Akka HTTP ecosystem.

Details

Akka Cache is a lightweight and fast caching library built into the Akka HTTP framework. It supports both Scala and Java and specializes in asynchronous and reactive application development. It adopts a Future/CompletableFuture-based API and has a mechanism to automatically solve the "Thundering Herd problem." By caching corresponding Future objects rather than actual values, when multiple requests for the same key arrive simultaneously, it waits for the first request to complete before returning the cached result. It uses an efficient LRU (Least Recently Used) based cache algorithm and allows flexible configuration including TTL (Time To Live), size limits, and manual invalidation. When combined with Akka Cluster, distributed caching can also be achieved.

Pros and Cons

Pros

  • Async Support: Future-based for high concurrency
  • Thundering Herd Protection: Automatically controls simultaneous requests for the same key
  • Lightweight: Minimal dependencies optimized for Akka HTTP
  • Type Safety: Safe API leveraging Scala/Java type systems
  • Flexible Configuration: Fine control over TTL, size limits, and invalidation policies
  • Distributed Support: Distributed caching through Akka Cluster integration
  • High Performance: Efficient memory usage with LRU algorithm

Cons

  • Akka Dependency: Difficult to use outside the Akka ecosystem
  • Memory Limited: Primarily in-memory caching only
  • Learning Curve: Requires understanding of reactive programming
  • Configuration Complexity: Can become complex in distributed environments
  • Debugging: Difficulty debugging asynchronous processing

Key Links

Code Examples

Adding Dependencies

// build.sbt (Scala)
val AkkaHttpVersion = "10.7.1"
libraryDependencies += "com.typesafe.akka" %% "akka-http-caching" % AkkaHttpVersion
<!-- Maven (Java) -->
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-http-caching_2.13</artifactId>
    <version>10.7.1</version>
</dependency>

Basic Cache Operations (Scala)

import akka.http.caching.scaladsl.Cache
import akka.http.caching.LfuCache
import scala.concurrent.Future
import scala.concurrent.duration._

implicit val system = ActorSystem("cache-system")
implicit val dispatcher = system.dispatcher

// Create cache
val cache: Cache[String, String] = LfuCache[String, String](
  maxCapacity = 256,
  timeToLive = 1.hour
)

// Expensive operation
def expensiveOperation(key: String): Future[String] = {
  Future {
    Thread.sleep(1000) // Simulate heavy processing
    s"Result for $key"
  }
}

// Get data using cache
val result: Future[String] = cache.getOrLoad("user:123", expensiveOperation)

// Direct cache operations
cache.put("direct:key", Future.successful("Direct Value"))

// Get from cache
val cachedValue: Option[Future[String]] = cache.get("user:123")

Basic Cache Operations (Java)

import akka.http.caching.javadsl.Cache;
import akka.http.caching.LfuCache;
import java.util.concurrent.CompletableFuture;
import java.time.Duration;

ActorSystem system = ActorSystem.create("cache-system");

// Create cache
Cache<String, String> cache = LfuCache.create(
    CachingSettings.create(system).withMaxCapacity(256)
                                 .withTimeToLive(Duration.ofHours(1))
);

// Define expensive operation
Function<String, CompletableFuture<String>> expensiveFunction = key -> 
    CompletableFuture.supplyAsync(() -> {
        try { Thread.sleep(1000); } catch (InterruptedException e) {}
        return "Result for " + key;
    });

// Get or load from cache
CompletableFuture<String> result = cache.getOrLoad("user:123", expensiveFunction);

// Direct cache operations
cache.put("direct:key", CompletableFuture.completedFuture("Direct Value"));

// Get from cache
Optional<CompletableFuture<String>> cachedValue = cache.get("user:123");

Custom Cache Configuration

import akka.http.caching.scaladsl.CachingSettings

// Create cache with custom settings
val customSettings = CachingSettings(system)
  .withMaxCapacity(1000)
  .withInitialCapacity(64)
  .withTimeToLive(30.minutes)
  .withTimeToIdle(10.minutes)

val customCache = LfuCache[String, UserData](customSettings)

// More detailed configuration
val advancedCache = LfuCache[String, ApiResponse](
  maxCapacity = 500,
  timeToLive = 1.hour,
  timeToIdle = 15.minutes
)

Cache Usage in HTTP Routing

import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route

val userCache = LfuCache[String, User](
  maxCapacity = 1000,
  timeToLive = 1.hour
)

def fetchUser(id: String): Future[User] = {
  // Fetch user from database
  userRepository.findById(id)
}

val routes: Route = {
  path("users" / Segment) { userId =>
    get {
      onSuccess(userCache.getOrLoad(userId, fetchUser)) { user =>
        complete(user)
      }
    }
  }
}

Complex Caching Patterns

case class CacheKey(userId: String, version: Int)
case class UserProfile(name: String, email: String, lastModified: Instant)

class UserService(implicit system: ActorSystem) {
  private implicit val dispatcher = system.dispatcher
  
  private val profileCache = LfuCache[CacheKey, UserProfile](
    maxCapacity = 2000,
    timeToLive = 2.hours
  )
  
  def getUserProfile(userId: String, version: Int): Future[UserProfile] = {
    val key = CacheKey(userId, version)
    
    profileCache.getOrLoad(key, _ => {
      // Fetch from database
      fetchFromDatabase(userId)
        .map(profile => {
          // Additional processing or enrichment
          enrichProfile(profile)
        })
    })
  }
  
  def invalidateUser(userId: String): Unit = {
    // Invalidate cache for specific user
    profileCache.keys.filter(_.userId == userId).foreach { key =>
      profileCache.remove(key)
    }
  }
  
  private def fetchFromDatabase(userId: String): Future[UserProfile] = {
    // Database operation
    Future {
      UserProfile("John Doe", "[email protected]", Instant.now())
    }
  }
  
  private def enrichProfile(profile: UserProfile): UserProfile = {
    // Additional profile processing
    profile
  }
}

Integration with Distributed Cache

import akka.cluster.ddata.Replicator._
import akka.cluster.ddata._

// Distributed cache using Akka Distributed Data
class DistributedCacheService(implicit system: ActorSystem) {
  private implicit val dispatcher = system.dispatcher
  private val replicator = DistributedData(system).replicator
  
  // Combination of local and distributed cache
  private val localCache = LfuCache[String, String](
    maxCapacity = 100,
    timeToLive = 5.minutes
  )
  
  def get(key: String): Future[Option[String]] = {
    // Check local cache first
    localCache.get(key) match {
      case Some(future) => future.map(Some(_))
      case None =>
        // Get from distributed cache
        val distributedKey = LWWRegisterKey[String](key)
        replicator ? Get(distributedKey, ReadLocal) flatMap {
          case g @ GetSuccess(`distributedKey`, _) =>
            val value = g.dataValue.value
            localCache.put(key, Future.successful(value))
            Future.successful(Some(value))
          case _ => Future.successful(None)
        }
    }
  }
  
  def put(key: String, value: String): Future[Unit] = {
    // Save to local cache
    localCache.put(key, Future.successful(value))
    
    // Save to distributed cache too
    val distributedKey = LWWRegisterKey[String](key)
    replicator ? Update(distributedKey, LWWRegister.empty[String], WriteLocal)(_ :+ value) map { _ =>
      ()
    }
  }
}

Performance Monitoring

import akka.http.caching.scaladsl.CachingSettings

class MonitoredCache[K, V](name: String)(implicit system: ActorSystem) {
  private implicit val dispatcher = system.dispatcher
  
  private val cache = LfuCache[K, V](
    maxCapacity = 1000,
    timeToLive = 1.hour
  )
  
  private var hitCount = 0L
  private var missCount = 0L
  
  def getOrLoad(key: K, loadFunction: K => Future[V]): Future[V] = {
    cache.get(key) match {
      case Some(future) =>
        hitCount += 1
        future
      case None =>
        missCount += 1
        cache.getOrLoad(key, loadFunction)
    }
  }
  
  def getStats: (Long, Long, Double) = {
    val total = hitCount + missCount
    val hitRate = if (total > 0) hitCount.toDouble / total else 0.0
    (hitCount, missCount, hitRate)
  }
  
  def logStats(): Unit = {
    val (hits, misses, hitRate) = getStats
    println(s"Cache[$name] - Hits: $hits, Misses: $misses, Hit Rate: ${(hitRate * 100).formatted("%.2f")}%")
  }
}