redis4cats
Cache Library
redis4cats
Overview
redis4cats is a purely functional Redis client library for Scala built on top of Cats Effect, Fs2, and Lettuce, providing type-safe and asynchronous Redis operations fully integrated with functional programming paradigms.
Details
redis4cats is a Redis client library for Scala built on top of Cats Effect, Fs2, and the async Java client Lettuce. The 1.x.x series is built on Cats Effect 3 while the 0.x.x series is built on Cats Effect 2. The API is quite stable and heavily used in production, though binary compatibility is not guaranteed across versions for now. It provides a high-level, safe and pure functional API supporting comprehensive Redis functionality including Redis Streams, scripting API, connection management, pipelining, transactions, and Pub/Sub features. It supports both single-node and cluster connections, automates connection pool management, and ensures resource safety. All operations are type-safe and designed according to Scala's functional programming principles, properly encapsulating side effects in the IO monad. Integration with log4cats provides structured logging support. As part of the Cats Effect ecosystem, redis4cats offers high interoperability with other Typelevel libraries (fs2, cats, circe, etc.) and is optimized for reactive asynchronous application development.
Pros and Cons
Pros
- Purely Functional: Side-effect-free, type-safe Redis operations
- Cats Effect Integration: Powerful support for asynchronous and concurrent processing
- Fs2 Integration: Natural integration with streaming processing and Redis Streams
- Resource Safety: Automatic connection management and cleanup
- Type Safety: Compile-time type checking and error prevention
- Typelevel Ecosystem: Excellent interoperability with other Typelevel libraries
- Production Ready: Proven track record in many production environments
Cons
- Learning Curve: Requires knowledge of Cats Effect and Fs2
- Binary Compatibility: No guarantee of binary compatibility across versions
- Functional Only: Unsuitable for imperative programming styles
- Dependencies: Dependency on multiple Typelevel libraries
- Scala Specific: Limited use with other JVM languages
Key Links
- redis4cats GitHub Repository
- redis4cats Official Documentation
- Cats Effect Official Site
- Fs2 Official Documentation
- Redis Official Documentation
Code Examples
Installation and Basic Setup
// build.sbt
libraryDependencies ++= Seq(
"dev.profunktor" %% "redis4cats-effects" % "1.5.2",
"dev.profunktor" %% "redis4cats-streams" % "1.5.2",
"dev.profunktor" %% "redis4cats-log4cats" % "1.5.2"
)
// imports
import cats.effect.*
import cats.implicits.*
import dev.profunktor.redis4cats.Redis
import dev.profunktor.redis4cats.effect.Log.Stdout.given
Basic Connection and Operations
import cats.effect.*
import dev.profunktor.redis4cats.Redis
import dev.profunktor.redis4cats.effect.Log.Stdout.given
object BasicRedisExample extends IOApp.Simple:
val run: IO[Unit] =
Redis[IO].utf8("redis://localhost").use { redis =>
for
// Basic set/get operations
_ <- redis.set("foo", "123")
x <- redis.get("foo")
_ <- IO.println(s"Value: $x") // Some("123")
// Conditional set
_ <- redis.setNx("foo", "should not happen")
y <- redis.get("foo")
_ <- IO.println(s"Value after setNx: $y") // Some("123")
// Numeric operations
_ <- redis.set("counter", "0")
newVal <- redis.incr("counter")
_ <- IO.println(s"Counter: $newVal") // 1
// Key with expiration
_ <- redis.setEx("temp", "temporary", 60.seconds)
temp <- redis.get("temp")
_ <- IO.println(s"Temp value: $temp")
yield ()
}
Connection Configuration and Client Management
import cats.effect.*
import dev.profunktor.redis4cats.{Redis, RedisCommands}
import dev.profunktor.redis4cats.connection.*
import dev.profunktor.redis4cats.data.RedisCodec
import dev.profunktor.redis4cats.effect.MkRedis
object ConnectionExample extends IOApp.Simple:
val run: IO[Unit] =
// Advanced connection configuration
val uri = RedisURI.create("redis://password@localhost:6379/2")
Redis[IO].fromUri(uri).use { redis =>
for
_ <- redis.ping
_ <- IO.println("Connected to Redis successfully")
// Using custom codec
info <- redis.info
_ <- IO.println(s"Redis info: $info")
yield ()
}
// Type-safe operations with custom codec
case class User(id: String, name: String, email: String)
object TypeSafeRedisExample extends IOApp.Simple:
import io.circe.generic.auto.*
import dev.profunktor.redis4cats.circe.CirceCodec
given RedisCodec[String, User] = CirceCodec.fromJson[String, User]
val run: IO[Unit] =
Redis[IO].simple("redis://localhost").use { redis =>
val user = User("1", "Alice", "[email protected]")
for
_ <- redis.set("user:1", user)
retrievedUser <- redis.get("user:1")
_ <- IO.println(s"Retrieved user: $retrievedUser")
yield ()
}
Pipelining and Transactions
import cats.effect.*
import dev.profunktor.redis4cats.Redis
import dev.profunktor.redis4cats.pipeline.{Pipeline, TxPipeline}
import dev.profunktor.redis4cats.transactions.*
object PipelineExample extends IOApp.Simple:
val run: IO[Unit] =
Redis[IO].utf8("redis://localhost").use { redis =>
// Pipelining
val pipeline = Pipeline(redis.pipeline).use { pipe =>
for
_ <- pipe.set("key1", "value1")
_ <- pipe.set("key2", "value2")
_ <- pipe.incr("counter")
_ <- pipe.get("key1")
_ <- pipe.get("key2")
yield ()
}
// Transactions
val transaction = redis.multi.use { tx =>
for
_ <- tx.set("account:1", "100")
_ <- tx.set("account:2", "200")
_ <- tx.decrBy("account:1", 50)
_ <- tx.incrBy("account:2", 50)
yield ()
}
for
_ <- pipeline
_ <- IO.println("Pipeline executed")
_ <- transaction
_ <- IO.println("Transaction executed")
yield ()
}
List, Set, and Hash Operations
object CollectionsExample extends IOApp.Simple:
val run: IO[Unit] =
Redis[IO].utf8("redis://localhost").use { redis =>
for
// List operations
_ <- redis.lPush("tasks", List("task1", "task2", "task3"))
_ <- redis.rPush("tasks", List("task4"))
tasks <- redis.lRange("tasks", 0, -1)
_ <- IO.println(s"Tasks: $tasks")
// Set operations
_ <- redis.sAdd("tags", Set("scala", "redis", "functional"))
members <- redis.sMembers("tags")
_ <- IO.println(s"Tags: $members")
// Hash operations
userFields = Map(
"name" -> "John",
"email" -> "[email protected]",
"age" -> "30"
)
_ <- redis.hMSet("user:1", userFields)
userData <- redis.hGetAll("user:1")
_ <- IO.println(s"User data: $userData")
name <- redis.hGet("user:1", "name")
_ <- IO.println(s"User name: $name")
yield ()
}
Pub/Sub Functionality
import fs2.Stream
import dev.profunktor.redis4cats.pubsub.PubSub
object PubSubExample extends IOApp.Simple:
val run: IO[Unit] =
Redis[IO].utf8("redis://localhost").use { redis =>
PubSub.mkSubscriber(redis).use { subscriber =>
PubSub.mkPublisher(redis).use { publisher =>
val publisher1 = Stream.awakeEvery[IO](3.seconds)
.evalMap(_ => publisher.publish("notifications", "Hello World!"))
.take(5)
val subscriber1 = subscriber
.subscribe("notifications")
.evalMap(msg => IO.println(s"Received: ${msg.channel} -> ${msg.value}"))
.take(5)
// Concurrent execution
Stream(publisher1, subscriber1)
.parJoinUnbounded
.compile
.drain
}
}
}
Redis Streams
import dev.profunktor.redis4cats.streams.*
import dev.profunktor.redis4cats.data.*
object StreamsExample extends IOApp.Simple:
val run: IO[Unit] =
Redis[IO].utf8("redis://localhost").use { redis =>
val streamKey = StreamingKey("events")
val consumerGroup = ConsumerGroup("processors")
val consumer = Consumer("worker-1")
for
// Add messages to stream
msgId1 <- redis.xAdd(streamKey, Map("action" -> "user_login", "user_id" -> "123"))
msgId2 <- redis.xAdd(streamKey, Map("action" -> "user_logout", "user_id" -> "123"))
_ <- IO.println(s"Added messages: $msgId1, $msgId2")
// Create consumer group
_ <- redis.xGroupCreate(streamKey, consumerGroup, MessageId.earliest)
// Read messages
messages <- redis.xReadGroup(consumerGroup, consumer, Map(streamKey -> MessageId.latest), 10)
_ <- messages.traverse_(msg =>
IO.println(s"Processing: ${msg.id} -> ${msg.body}") *>
redis.xAck(streamKey, consumerGroup, msg.id)
)
yield ()
}
Error Handling and Retry
import cats.effect.std.Console
import cats.syntax.all.*
import scala.concurrent.duration.*
object ErrorHandlingExample extends IOApp.Simple:
val run: IO[Unit] =
// Safe Redis operation with error handling
def safeRedisOperation[A](operation: IO[A]): IO[Option[A]] =
operation.map(Some(_)).handleErrorWith { error =>
Console[IO].println(s"Redis error: ${error.getMessage}") *>
IO.pure(None)
}
// Retry with exponential backoff
def retryWithBackoff[A](operation: IO[A], maxRetries: Int = 3): IO[A] =
def retry(attempt: Int): IO[A] =
operation.handleErrorWith { error =>
if (attempt < maxRetries) {
val delay = (1 << attempt).seconds
Console[IO].println(s"Retry attempt $attempt after ${delay.toSeconds}s") *>
IO.sleep(delay) *> retry(attempt + 1)
} else {
IO.raiseError(error)
}
}
retry(0)
Redis[IO].utf8("redis://localhost").use { redis =>
for
// Safe operation
result1 <- safeRedisOperation(redis.get("nonexistent"))
_ <- IO.println(s"Safe result: $result1")
// Operation with retry
result2 <- retryWithBackoff(redis.set("retry_key", "retry_value"))
_ <- IO.println(s"Retry result: $result2")
yield ()
}
Connection Pool and Configuration
import dev.profunktor.redis4cats.connection.*
import dev.profunktor.redis4cats.effect.MkRedis
import io.lettuce.core.{ClientOptions, SocketOptions}
object AdvancedConfigExample extends IOApp.Simple:
val run: IO[Unit] =
// Advanced connection configuration
val socketOptions = SocketOptions.builder()
.connectTimeout(5.seconds)
.keepAlive(true)
.build()
val clientOptions = ClientOptions.builder()
.socketOptions(socketOptions)
.autoReconnect(true)
.build()
val redisUri = RedisURI.builder()
.withHost("localhost")
.withPort(6379)
.withDatabase(0)
.withPassword("password")
.withTimeout(5.seconds)
.build()
MkRedis[IO].fromUri(redisUri, clientOptions.some).use { redis =>
for
_ <- redis.ping
_ <- IO.println("Connected with advanced configuration")
// Get connection info
info <- redis.clientList
_ <- IO.println(s"Client info: $info")
yield ()
}
Streaming Processing and Fs2 Integration
import fs2.*
import cats.effect.std.Queue
object StreamingExample extends IOApp.Simple:
val run: IO[Unit] =
Redis[IO].utf8("redis://localhost").use { redis =>
for
// Queue and stream integration
queue <- Queue.unbounded[IO, String]
// Periodic data generation
producer = Stream.awakeEvery[IO](1.second)
.zipWithIndex
.evalMap { case (_, idx) =>
val data = s"data-$idx"
redis.lPush("stream", List(data)) *>
queue.offer(data)
}
.take(10)
// Data consumption and Redis reading
consumer = Stream.fromQueueUnterminated(queue)
.evalMap { data =>
redis.lPop("stream").flatMap {
case Some(value) => IO.println(s"Processed: $value")
case None => IO.println("No data in Redis")
}
}
.take(10)
// Concurrent streaming
_ <- Stream(producer, consumer)
.parJoinUnbounded
.compile
.drain
yield ()
}