Akka Cache
Library
Akka Cache
Overview
Akka Cache is a high-performance caching library provided as part of the Akka HTTP ecosystem.
Details
Akka Cache is a lightweight and fast caching library built into the Akka HTTP framework. It supports both Scala and Java and specializes in asynchronous and reactive application development. It adopts a Future/CompletableFuture-based API and has a mechanism to automatically solve the "Thundering Herd problem." By caching corresponding Future objects rather than actual values, when multiple requests for the same key arrive simultaneously, it waits for the first request to complete before returning the cached result. It uses an efficient LRU (Least Recently Used) based cache algorithm and allows flexible configuration including TTL (Time To Live), size limits, and manual invalidation. When combined with Akka Cluster, distributed caching can also be achieved.
Pros and Cons
Pros
- Async Support: Future-based for high concurrency
- Thundering Herd Protection: Automatically controls simultaneous requests for the same key
- Lightweight: Minimal dependencies optimized for Akka HTTP
- Type Safety: Safe API leveraging Scala/Java type systems
- Flexible Configuration: Fine control over TTL, size limits, and invalidation policies
- Distributed Support: Distributed caching through Akka Cluster integration
- High Performance: Efficient memory usage with LRU algorithm
Cons
- Akka Dependency: Difficult to use outside the Akka ecosystem
- Memory Limited: Primarily in-memory caching only
- Learning Curve: Requires understanding of reactive programming
- Configuration Complexity: Can become complex in distributed environments
- Debugging: Difficulty debugging asynchronous processing
Key Links
Code Examples
Adding Dependencies
// build.sbt (Scala)
val AkkaHttpVersion = "10.7.1"
libraryDependencies += "com.typesafe.akka" %% "akka-http-caching" % AkkaHttpVersion
<!-- Maven (Java) -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-caching_2.13</artifactId>
<version>10.7.1</version>
</dependency>
Basic Cache Operations (Scala)
import akka.http.caching.scaladsl.Cache
import akka.http.caching.LfuCache
import scala.concurrent.Future
import scala.concurrent.duration._
implicit val system = ActorSystem("cache-system")
implicit val dispatcher = system.dispatcher
// Create cache
val cache: Cache[String, String] = LfuCache[String, String](
maxCapacity = 256,
timeToLive = 1.hour
)
// Expensive operation
def expensiveOperation(key: String): Future[String] = {
Future {
Thread.sleep(1000) // Simulate heavy processing
s"Result for $key"
}
}
// Get data using cache
val result: Future[String] = cache.getOrLoad("user:123", expensiveOperation)
// Direct cache operations
cache.put("direct:key", Future.successful("Direct Value"))
// Get from cache
val cachedValue: Option[Future[String]] = cache.get("user:123")
Basic Cache Operations (Java)
import akka.http.caching.javadsl.Cache;
import akka.http.caching.LfuCache;
import java.util.concurrent.CompletableFuture;
import java.time.Duration;
ActorSystem system = ActorSystem.create("cache-system");
// Create cache
Cache<String, String> cache = LfuCache.create(
CachingSettings.create(system).withMaxCapacity(256)
.withTimeToLive(Duration.ofHours(1))
);
// Define expensive operation
Function<String, CompletableFuture<String>> expensiveFunction = key ->
CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) {}
return "Result for " + key;
});
// Get or load from cache
CompletableFuture<String> result = cache.getOrLoad("user:123", expensiveFunction);
// Direct cache operations
cache.put("direct:key", CompletableFuture.completedFuture("Direct Value"));
// Get from cache
Optional<CompletableFuture<String>> cachedValue = cache.get("user:123");
Custom Cache Configuration
import akka.http.caching.scaladsl.CachingSettings
// Create cache with custom settings
val customSettings = CachingSettings(system)
.withMaxCapacity(1000)
.withInitialCapacity(64)
.withTimeToLive(30.minutes)
.withTimeToIdle(10.minutes)
val customCache = LfuCache[String, UserData](customSettings)
// More detailed configuration
val advancedCache = LfuCache[String, ApiResponse](
maxCapacity = 500,
timeToLive = 1.hour,
timeToIdle = 15.minutes
)
Cache Usage in HTTP Routing
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
val userCache = LfuCache[String, User](
maxCapacity = 1000,
timeToLive = 1.hour
)
def fetchUser(id: String): Future[User] = {
// Fetch user from database
userRepository.findById(id)
}
val routes: Route = {
path("users" / Segment) { userId =>
get {
onSuccess(userCache.getOrLoad(userId, fetchUser)) { user =>
complete(user)
}
}
}
}
Complex Caching Patterns
case class CacheKey(userId: String, version: Int)
case class UserProfile(name: String, email: String, lastModified: Instant)
class UserService(implicit system: ActorSystem) {
private implicit val dispatcher = system.dispatcher
private val profileCache = LfuCache[CacheKey, UserProfile](
maxCapacity = 2000,
timeToLive = 2.hours
)
def getUserProfile(userId: String, version: Int): Future[UserProfile] = {
val key = CacheKey(userId, version)
profileCache.getOrLoad(key, _ => {
// Fetch from database
fetchFromDatabase(userId)
.map(profile => {
// Additional processing or enrichment
enrichProfile(profile)
})
})
}
def invalidateUser(userId: String): Unit = {
// Invalidate cache for specific user
profileCache.keys.filter(_.userId == userId).foreach { key =>
profileCache.remove(key)
}
}
private def fetchFromDatabase(userId: String): Future[UserProfile] = {
// Database operation
Future {
UserProfile("John Doe", "[email protected]", Instant.now())
}
}
private def enrichProfile(profile: UserProfile): UserProfile = {
// Additional profile processing
profile
}
}
Integration with Distributed Cache
import akka.cluster.ddata.Replicator._
import akka.cluster.ddata._
// Distributed cache using Akka Distributed Data
class DistributedCacheService(implicit system: ActorSystem) {
private implicit val dispatcher = system.dispatcher
private val replicator = DistributedData(system).replicator
// Combination of local and distributed cache
private val localCache = LfuCache[String, String](
maxCapacity = 100,
timeToLive = 5.minutes
)
def get(key: String): Future[Option[String]] = {
// Check local cache first
localCache.get(key) match {
case Some(future) => future.map(Some(_))
case None =>
// Get from distributed cache
val distributedKey = LWWRegisterKey[String](key)
replicator ? Get(distributedKey, ReadLocal) flatMap {
case g @ GetSuccess(`distributedKey`, _) =>
val value = g.dataValue.value
localCache.put(key, Future.successful(value))
Future.successful(Some(value))
case _ => Future.successful(None)
}
}
}
def put(key: String, value: String): Future[Unit] = {
// Save to local cache
localCache.put(key, Future.successful(value))
// Save to distributed cache too
val distributedKey = LWWRegisterKey[String](key)
replicator ? Update(distributedKey, LWWRegister.empty[String], WriteLocal)(_ :+ value) map { _ =>
()
}
}
}
Performance Monitoring
import akka.http.caching.scaladsl.CachingSettings
class MonitoredCache[K, V](name: String)(implicit system: ActorSystem) {
private implicit val dispatcher = system.dispatcher
private val cache = LfuCache[K, V](
maxCapacity = 1000,
timeToLive = 1.hour
)
private var hitCount = 0L
private var missCount = 0L
def getOrLoad(key: K, loadFunction: K => Future[V]): Future[V] = {
cache.get(key) match {
case Some(future) =>
hitCount += 1
future
case None =>
missCount += 1
cache.getOrLoad(key, loadFunction)
}
}
def getStats: (Long, Long, Double) = {
val total = hitCount + missCount
val hitRate = if (total > 0) hitCount.toDouble / total else 0.0
(hitCount, missCount, hitRate)
}
def logStats(): Unit = {
val (hits, misses, hitRate) = getStats
println(s"Cache[$name] - Hits: $hits, Misses: $misses, Hit Rate: ${(hitRate * 100).formatted("%.2f")}%")
}
}