redis4cats

ScalaCache LibraryRedisCats EffectFunctional ProgrammingFs2

Cache Library

redis4cats

Overview

redis4cats is a purely functional Redis client library for Scala built on top of Cats Effect, Fs2, and Lettuce, providing type-safe and asynchronous Redis operations fully integrated with functional programming paradigms.

Details

redis4cats is a Redis client library for Scala built on top of Cats Effect, Fs2, and the async Java client Lettuce. The 1.x.x series is built on Cats Effect 3 while the 0.x.x series is built on Cats Effect 2. The API is quite stable and heavily used in production, though binary compatibility is not guaranteed across versions for now. It provides a high-level, safe and pure functional API supporting comprehensive Redis functionality including Redis Streams, scripting API, connection management, pipelining, transactions, and Pub/Sub features. It supports both single-node and cluster connections, automates connection pool management, and ensures resource safety. All operations are type-safe and designed according to Scala's functional programming principles, properly encapsulating side effects in the IO monad. Integration with log4cats provides structured logging support. As part of the Cats Effect ecosystem, redis4cats offers high interoperability with other Typelevel libraries (fs2, cats, circe, etc.) and is optimized for reactive asynchronous application development.

Pros and Cons

Pros

  • Purely Functional: Side-effect-free, type-safe Redis operations
  • Cats Effect Integration: Powerful support for asynchronous and concurrent processing
  • Fs2 Integration: Natural integration with streaming processing and Redis Streams
  • Resource Safety: Automatic connection management and cleanup
  • Type Safety: Compile-time type checking and error prevention
  • Typelevel Ecosystem: Excellent interoperability with other Typelevel libraries
  • Production Ready: Proven track record in many production environments

Cons

  • Learning Curve: Requires knowledge of Cats Effect and Fs2
  • Binary Compatibility: No guarantee of binary compatibility across versions
  • Functional Only: Unsuitable for imperative programming styles
  • Dependencies: Dependency on multiple Typelevel libraries
  • Scala Specific: Limited use with other JVM languages

Key Links

Code Examples

Installation and Basic Setup

// build.sbt
libraryDependencies ++= Seq(
  "dev.profunktor" %% "redis4cats-effects" % "1.5.2",
  "dev.profunktor" %% "redis4cats-streams" % "1.5.2",
  "dev.profunktor" %% "redis4cats-log4cats" % "1.5.2"
)

// imports
import cats.effect.*
import cats.implicits.*
import dev.profunktor.redis4cats.Redis
import dev.profunktor.redis4cats.effect.Log.Stdout.given

Basic Connection and Operations

import cats.effect.*
import dev.profunktor.redis4cats.Redis
import dev.profunktor.redis4cats.effect.Log.Stdout.given

object BasicRedisExample extends IOApp.Simple:
  val run: IO[Unit] = 
    Redis[IO].utf8("redis://localhost").use { redis =>
      for
        // Basic set/get operations
        _ <- redis.set("foo", "123")
        x <- redis.get("foo")
        _ <- IO.println(s"Value: $x") // Some("123")
        
        // Conditional set
        _ <- redis.setNx("foo", "should not happen")
        y <- redis.get("foo")
        _ <- IO.println(s"Value after setNx: $y") // Some("123")
        
        // Numeric operations
        _ <- redis.set("counter", "0")
        newVal <- redis.incr("counter")
        _ <- IO.println(s"Counter: $newVal") // 1
        
        // Key with expiration
        _ <- redis.setEx("temp", "temporary", 60.seconds)
        temp <- redis.get("temp")
        _ <- IO.println(s"Temp value: $temp")
      yield ()
    }

Connection Configuration and Client Management

import cats.effect.*
import dev.profunktor.redis4cats.{Redis, RedisCommands}
import dev.profunktor.redis4cats.connection.*
import dev.profunktor.redis4cats.data.RedisCodec
import dev.profunktor.redis4cats.effect.MkRedis

object ConnectionExample extends IOApp.Simple:
  val run: IO[Unit] = 
    // Advanced connection configuration
    val uri = RedisURI.create("redis://password@localhost:6379/2")
    
    Redis[IO].fromUri(uri).use { redis =>
      for
        _ <- redis.ping
        _ <- IO.println("Connected to Redis successfully")
        
        // Using custom codec
        info <- redis.info
        _ <- IO.println(s"Redis info: $info")
      yield ()
    }

// Type-safe operations with custom codec
case class User(id: String, name: String, email: String)

object TypeSafeRedisExample extends IOApp.Simple:
  import io.circe.generic.auto.*
  import dev.profunktor.redis4cats.circe.CirceCodec
  
  given RedisCodec[String, User] = CirceCodec.fromJson[String, User]
  
  val run: IO[Unit] = 
    Redis[IO].simple("redis://localhost").use { redis =>
      val user = User("1", "Alice", "[email protected]")
      
      for
        _ <- redis.set("user:1", user)
        retrievedUser <- redis.get("user:1")
        _ <- IO.println(s"Retrieved user: $retrievedUser")
      yield ()
    }

Pipelining and Transactions

import cats.effect.*
import dev.profunktor.redis4cats.Redis
import dev.profunktor.redis4cats.pipeline.{Pipeline, TxPipeline}
import dev.profunktor.redis4cats.transactions.*

object PipelineExample extends IOApp.Simple:
  val run: IO[Unit] = 
    Redis[IO].utf8("redis://localhost").use { redis =>
      // Pipelining
      val pipeline = Pipeline(redis.pipeline).use { pipe =>
        for
          _ <- pipe.set("key1", "value1")
          _ <- pipe.set("key2", "value2")
          _ <- pipe.incr("counter")
          _ <- pipe.get("key1")
          _ <- pipe.get("key2")
        yield ()
      }
      
      // Transactions
      val transaction = redis.multi.use { tx =>
        for
          _ <- tx.set("account:1", "100")
          _ <- tx.set("account:2", "200")
          _ <- tx.decrBy("account:1", 50)
          _ <- tx.incrBy("account:2", 50)
        yield ()
      }
      
      for
        _ <- pipeline
        _ <- IO.println("Pipeline executed")
        _ <- transaction
        _ <- IO.println("Transaction executed")
      yield ()
    }

List, Set, and Hash Operations

object CollectionsExample extends IOApp.Simple:
  val run: IO[Unit] = 
    Redis[IO].utf8("redis://localhost").use { redis =>
      for
        // List operations
        _ <- redis.lPush("tasks", List("task1", "task2", "task3"))
        _ <- redis.rPush("tasks", List("task4"))
        tasks <- redis.lRange("tasks", 0, -1)
        _ <- IO.println(s"Tasks: $tasks")
        
        // Set operations
        _ <- redis.sAdd("tags", Set("scala", "redis", "functional"))
        members <- redis.sMembers("tags")
        _ <- IO.println(s"Tags: $members")
        
        // Hash operations
        userFields = Map(
          "name" -> "John",
          "email" -> "[email protected]",
          "age" -> "30"
        )
        _ <- redis.hMSet("user:1", userFields)
        userData <- redis.hGetAll("user:1")
        _ <- IO.println(s"User data: $userData")
        
        name <- redis.hGet("user:1", "name")
        _ <- IO.println(s"User name: $name")
      yield ()
    }

Pub/Sub Functionality

import fs2.Stream
import dev.profunktor.redis4cats.pubsub.PubSub

object PubSubExample extends IOApp.Simple:
  val run: IO[Unit] = 
    Redis[IO].utf8("redis://localhost").use { redis =>
      PubSub.mkSubscriber(redis).use { subscriber =>
        PubSub.mkPublisher(redis).use { publisher =>
          
          val publisher1 = Stream.awakeEvery[IO](3.seconds)
            .evalMap(_ => publisher.publish("notifications", "Hello World!"))
            .take(5)
          
          val subscriber1 = subscriber
            .subscribe("notifications")
            .evalMap(msg => IO.println(s"Received: ${msg.channel} -> ${msg.value}"))
            .take(5)
          
          // Concurrent execution
          Stream(publisher1, subscriber1)
            .parJoinUnbounded
            .compile
            .drain
        }
      }
    }

Redis Streams

import dev.profunktor.redis4cats.streams.*
import dev.profunktor.redis4cats.data.*

object StreamsExample extends IOApp.Simple:
  val run: IO[Unit] = 
    Redis[IO].utf8("redis://localhost").use { redis =>
      val streamKey = StreamingKey("events")
      val consumerGroup = ConsumerGroup("processors")
      val consumer = Consumer("worker-1")
      
      for
        // Add messages to stream
        msgId1 <- redis.xAdd(streamKey, Map("action" -> "user_login", "user_id" -> "123"))
        msgId2 <- redis.xAdd(streamKey, Map("action" -> "user_logout", "user_id" -> "123"))
        _ <- IO.println(s"Added messages: $msgId1, $msgId2")
        
        // Create consumer group
        _ <- redis.xGroupCreate(streamKey, consumerGroup, MessageId.earliest)
        
        // Read messages
        messages <- redis.xReadGroup(consumerGroup, consumer, Map(streamKey -> MessageId.latest), 10)
        _ <- messages.traverse_(msg => 
          IO.println(s"Processing: ${msg.id} -> ${msg.body}") *>
          redis.xAck(streamKey, consumerGroup, msg.id)
        )
      yield ()
    }

Error Handling and Retry

import cats.effect.std.Console
import cats.syntax.all.*
import scala.concurrent.duration.*

object ErrorHandlingExample extends IOApp.Simple:
  val run: IO[Unit] = 
    // Safe Redis operation with error handling
    def safeRedisOperation[A](operation: IO[A]): IO[Option[A]] =
      operation.map(Some(_)).handleErrorWith { error =>
        Console[IO].println(s"Redis error: ${error.getMessage}") *>
        IO.pure(None)
      }
    
    // Retry with exponential backoff
    def retryWithBackoff[A](operation: IO[A], maxRetries: Int = 3): IO[A] =
      def retry(attempt: Int): IO[A] =
        operation.handleErrorWith { error =>
          if (attempt < maxRetries) {
            val delay = (1 << attempt).seconds
            Console[IO].println(s"Retry attempt $attempt after ${delay.toSeconds}s") *>
            IO.sleep(delay) *> retry(attempt + 1)
          } else {
            IO.raiseError(error)
          }
        }
      retry(0)
    
    Redis[IO].utf8("redis://localhost").use { redis =>
      for
        // Safe operation
        result1 <- safeRedisOperation(redis.get("nonexistent"))
        _ <- IO.println(s"Safe result: $result1")
        
        // Operation with retry
        result2 <- retryWithBackoff(redis.set("retry_key", "retry_value"))
        _ <- IO.println(s"Retry result: $result2")
      yield ()
    }

Connection Pool and Configuration

import dev.profunktor.redis4cats.connection.*
import dev.profunktor.redis4cats.effect.MkRedis
import io.lettuce.core.{ClientOptions, SocketOptions}

object AdvancedConfigExample extends IOApp.Simple:
  val run: IO[Unit] = 
    // Advanced connection configuration
    val socketOptions = SocketOptions.builder()
      .connectTimeout(5.seconds)
      .keepAlive(true)
      .build()
    
    val clientOptions = ClientOptions.builder()
      .socketOptions(socketOptions)
      .autoReconnect(true)
      .build()
    
    val redisUri = RedisURI.builder()
      .withHost("localhost")
      .withPort(6379)
      .withDatabase(0)
      .withPassword("password")
      .withTimeout(5.seconds)
      .build()
    
    MkRedis[IO].fromUri(redisUri, clientOptions.some).use { redis =>
      for
        _ <- redis.ping
        _ <- IO.println("Connected with advanced configuration")
        
        // Get connection info
        info <- redis.clientList
        _ <- IO.println(s"Client info: $info")
      yield ()
    }

Streaming Processing and Fs2 Integration

import fs2.*
import cats.effect.std.Queue

object StreamingExample extends IOApp.Simple:
  val run: IO[Unit] = 
    Redis[IO].utf8("redis://localhost").use { redis =>
      for
        // Queue and stream integration
        queue <- Queue.unbounded[IO, String]
        
        // Periodic data generation
        producer = Stream.awakeEvery[IO](1.second)
          .zipWithIndex
          .evalMap { case (_, idx) =>
            val data = s"data-$idx"
            redis.lPush("stream", List(data)) *> 
            queue.offer(data)
          }
          .take(10)
        
        // Data consumption and Redis reading
        consumer = Stream.fromQueueUnterminated(queue)
          .evalMap { data =>
            redis.lPop("stream").flatMap {
              case Some(value) => IO.println(s"Processed: $value")
              case None => IO.println("No data in Redis")
            }
          }
          .take(10)
        
        // Concurrent streaming
        _ <- Stream(producer, consumer)
          .parJoinUnbounded
          .compile
          .drain
      yield ()
    }