redis4cats

ScalaキャッシュライブラリRedisCats Effect関数型プログラミングFs2

キャッシュライブラリ

redis4cats

概要

redis4catsは、Cats Effect、Fs2、Lettuceの上に構築されたScala向けの純粋関数型Redisクライアントライブラリで、型安全で非同期なRedis操作を提供し、関数型プログラミングのパラダイムに完全に統合されています。

詳細

redis4cats(レディス・フォー・キャッツ)は、Cats Effect、Fs2、非同期JavaクライアントLettuceの上に構築されたScala向けのRedisクライアントライブラリです。1.x.xシリーズはCats Effect 3上に、0.x.xシリーズはCats Effect 2上に構築されています。APIは非常に安定しており、本番環境で重要な使用実績がありますが、現在のところバージョン間でのバイナリ互換性は保証されていません。高レベルで安全かつ純粋関数型のAPIを提供し、Redis Streams、スクリプティングAPI、接続管理、パイプライニング、トランザクション、Pub/Sub機能などRedisの包括的な機能をサポートしています。単一ノードおよびクラスター接続の両方をサポートし、接続プールの管理を自動化し、リソースの安全性を保証します。すべての操作は型安全で、Scalaの関数型プログラミング原則に従って設計されており、副作用をIOモナドで適切にカプセル化します。log4catsとの統合により、構造化ロギングもサポートしています。redis4catsはCats Effectエコシステムの一部として、他のTypelevelライブラリ(fs2、cats、circe等)との相互運用性が高く、リアクティブな非同期アプリケーション開発に最適化されています。

メリット・デメリット

メリット

  • 純粋関数型: 副作用のない型安全なRedis操作
  • Cats Effect統合: 非同期・並行処理の強力なサポート
  • Fs2統合: ストリーミング処理とRedis Streamsの自然な統合
  • リソース安全: 自動的な接続管理とクリーンアップ
  • 型安全性: コンパイル時の型チェックとエラー防止
  • Typelevelエコシステム: 他のTypelevelライブラリとの優れた相互運用性
  • 本番実績: 多くのプロダクション環境での使用実績

デメリット

  • 学習コスト: Cats Effect、Fs2の知識が必要
  • バイナリ互換性: バージョン間でのバイナリ互換性が保証されていない
  • 関数型限定: 手続き型プログラミングスタイルには不向き
  • 依存関係: 複数のTypelevelライブラリへの依存
  • Scala専用: 他のJVM言語での使用は制限される

主要リンク

書き方の例

インストールと基本設定

// build.sbt
libraryDependencies ++= Seq(
  "dev.profunktor" %% "redis4cats-effects" % "1.5.2",
  "dev.profunktor" %% "redis4cats-streams" % "1.5.2",
  "dev.profunktor" %% "redis4cats-log4cats" % "1.5.2"
)

// imports
import cats.effect.*
import cats.implicits.*
import dev.profunktor.redis4cats.Redis
import dev.profunktor.redis4cats.effect.Log.Stdout.given

基本的な接続と操作

import cats.effect.*
import dev.profunktor.redis4cats.Redis
import dev.profunktor.redis4cats.effect.Log.Stdout.given

object BasicRedisExample extends IOApp.Simple:
  val run: IO[Unit] = 
    Redis[IO].utf8("redis://localhost").use { redis =>
      for
        // 基本的なset/get操作
        _ <- redis.set("foo", "123")
        x <- redis.get("foo")
        _ <- IO.println(s"Value: $x") // Some("123")
        
        // 条件付き設定
        _ <- redis.setNx("foo", "should not happen")
        y <- redis.get("foo")
        _ <- IO.println(s"Value after setNx: $y") // Some("123")
        
        // 数値操作
        _ <- redis.set("counter", "0")
        newVal <- redis.incr("counter")
        _ <- IO.println(s"Counter: $newVal") // 1
        
        // 期限付きキー
        _ <- redis.setEx("temp", "temporary", 60.seconds)
        temp <- redis.get("temp")
        _ <- IO.println(s"Temp value: $temp")
      yield ()
    }

接続設定とクライアント管理

import cats.effect.*
import dev.profunktor.redis4cats.{Redis, RedisCommands}
import dev.profunktor.redis4cats.connection.*
import dev.profunktor.redis4cats.data.RedisCodec
import dev.profunktor.redis4cats.effect.MkRedis

object ConnectionExample extends IOApp.Simple:
  val run: IO[Unit] = 
    // 高度な接続設定
    val uri = RedisURI.create("redis://password@localhost:6379/2")
    
    Redis[IO].fromUri(uri).use { redis =>
      for
        _ <- redis.ping
        _ <- IO.println("Connected to Redis successfully")
        
        // カスタムコーデックの使用
        info <- redis.info
        _ <- IO.println(s"Redis info: $info")
      yield ()
    }

// カスタムコーデックを使用した型安全な操作
case class User(id: String, name: String, email: String)

object TypeSafeRedisExample extends IOApp.Simple:
  import io.circe.generic.auto.*
  import dev.profunktor.redis4cats.circe.CirceCodec
  
  given RedisCodec[String, User] = CirceCodec.fromJson[String, User]
  
  val run: IO[Unit] = 
    Redis[IO].simple("redis://localhost").use { redis =>
      val user = User("1", "Alice", "[email protected]")
      
      for
        _ <- redis.set("user:1", user)
        retrievedUser <- redis.get("user:1")
        _ <- IO.println(s"Retrieved user: $retrievedUser")
      yield ()
    }

パイプライニングとトランザクション

import cats.effect.*
import dev.profunktor.redis4cats.Redis
import dev.profunktor.redis4cats.pipeline.{Pipeline, TxPipeline}
import dev.profunktor.redis4cats.transactions.*

object PipelineExample extends IOApp.Simple:
  val run: IO[Unit] = 
    Redis[IO].utf8("redis://localhost").use { redis =>
      // パイプライニング
      val pipeline = Pipeline(redis.pipeline).use { pipe =>
        for
          _ <- pipe.set("key1", "value1")
          _ <- pipe.set("key2", "value2")
          _ <- pipe.incr("counter")
          _ <- pipe.get("key1")
          _ <- pipe.get("key2")
        yield ()
      }
      
      // トランザクション
      val transaction = redis.multi.use { tx =>
        for
          _ <- tx.set("account:1", "100")
          _ <- tx.set("account:2", "200")
          _ <- tx.decrBy("account:1", 50)
          _ <- tx.incrBy("account:2", 50)
        yield ()
      }
      
      for
        _ <- pipeline
        _ <- IO.println("Pipeline executed")
        _ <- transaction
        _ <- IO.println("Transaction executed")
      yield ()
    }

リスト・セット・ハッシュ操作

object CollectionsExample extends IOApp.Simple:
  val run: IO[Unit] = 
    Redis[IO].utf8("redis://localhost").use { redis =>
      for
        // リスト操作
        _ <- redis.lPush("tasks", List("task1", "task2", "task3"))
        _ <- redis.rPush("tasks", List("task4"))
        tasks <- redis.lRange("tasks", 0, -1)
        _ <- IO.println(s"Tasks: $tasks")
        
        // セット操作
        _ <- redis.sAdd("tags", Set("scala", "redis", "functional"))
        members <- redis.sMembers("tags")
        _ <- IO.println(s"Tags: $members")
        
        // ハッシュ操作
        userFields = Map(
          "name" -> "John",
          "email" -> "[email protected]",
          "age" -> "30"
        )
        _ <- redis.hMSet("user:1", userFields)
        userData <- redis.hGetAll("user:1")
        _ <- IO.println(s"User data: $userData")
        
        name <- redis.hGet("user:1", "name")
        _ <- IO.println(s"User name: $name")
      yield ()
    }

Pub/Sub機能

import fs2.Stream
import dev.profunktor.redis4cats.pubsub.PubSub

object PubSubExample extends IOApp.Simple:
  val run: IO[Unit] = 
    Redis[IO].utf8("redis://localhost").use { redis =>
      PubSub.mkSubscriber(redis).use { subscriber =>
        PubSub.mkPublisher(redis).use { publisher =>
          
          val publisher1 = Stream.awakeEvery[IO](3.seconds)
            .evalMap(_ => publisher.publish("notifications", "Hello World!"))
            .take(5)
          
          val subscriber1 = subscriber
            .subscribe("notifications")
            .evalMap(msg => IO.println(s"Received: ${msg.channel} -> ${msg.value}"))
            .take(5)
          
          // 並行実行
          Stream(publisher1, subscriber1)
            .parJoinUnbounded
            .compile
            .drain
        }
      }
    }

Redis Streams

import dev.profunktor.redis4cats.streams.*
import dev.profunktor.redis4cats.data.*

object StreamsExample extends IOApp.Simple:
  val run: IO[Unit] = 
    Redis[IO].utf8("redis://localhost").use { redis =>
      val streamKey = StreamingKey("events")
      val consumerGroup = ConsumerGroup("processors")
      val consumer = Consumer("worker-1")
      
      for
        // ストリームへのメッセージ追加
        msgId1 <- redis.xAdd(streamKey, Map("action" -> "user_login", "user_id" -> "123"))
        msgId2 <- redis.xAdd(streamKey, Map("action" -> "user_logout", "user_id" -> "123"))
        _ <- IO.println(s"Added messages: $msgId1, $msgId2")
        
        // コンシューマグループ作成
        _ <- redis.xGroupCreate(streamKey, consumerGroup, MessageId.earliest)
        
        // メッセージ読み取り
        messages <- redis.xReadGroup(consumerGroup, consumer, Map(streamKey -> MessageId.latest), 10)
        _ <- messages.traverse_(msg => 
          IO.println(s"Processing: ${msg.id} -> ${msg.body}") *>
          redis.xAck(streamKey, consumerGroup, msg.id)
        )
      yield ()
    }

エラーハンドリングとリトライ

import cats.effect.std.Console
import cats.syntax.all.*
import scala.concurrent.duration.*

object ErrorHandlingExample extends IOApp.Simple:
  val run: IO[Unit] = 
    // リトライ機能付きRedis操作
    def safeRedisOperation[A](operation: IO[A]): IO[Option[A]] =
      operation.map(Some(_)).handleErrorWith { error =>
        Console[IO].println(s"Redis error: ${error.getMessage}") *>
        IO.pure(None)
      }
    
    // 指数バックオフ付きリトライ
    def retryWithBackoff[A](operation: IO[A], maxRetries: Int = 3): IO[A] =
      def retry(attempt: Int): IO[A] =
        operation.handleErrorWith { error =>
          if (attempt < maxRetries) {
            val delay = (1 << attempt).seconds
            Console[IO].println(s"Retry attempt $attempt after ${delay.toSeconds}s") *>
            IO.sleep(delay) *> retry(attempt + 1)
          } else {
            IO.raiseError(error)
          }
        }
      retry(0)
    
    Redis[IO].utf8("redis://localhost").use { redis =>
      for
        // 安全な操作
        result1 <- safeRedisOperation(redis.get("nonexistent"))
        _ <- IO.println(s"Safe result: $result1")
        
        // リトライ付き操作
        result2 <- retryWithBackoff(redis.set("retry_key", "retry_value"))
        _ <- IO.println(s"Retry result: $result2")
      yield ()
    }

接続プールと設定

import dev.profunktor.redis4cats.connection.*
import dev.profunktor.redis4cats.effect.MkRedis
import io.lettuce.core.{ClientOptions, SocketOptions}

object AdvancedConfigExample extends IOApp.Simple:
  val run: IO[Unit] = 
    // 高度な接続設定
    val socketOptions = SocketOptions.builder()
      .connectTimeout(5.seconds)
      .keepAlive(true)
      .build()
    
    val clientOptions = ClientOptions.builder()
      .socketOptions(socketOptions)
      .autoReconnect(true)
      .build()
    
    val redisUri = RedisURI.builder()
      .withHost("localhost")
      .withPort(6379)
      .withDatabase(0)
      .withPassword("password")
      .withTimeout(5.seconds)
      .build()
    
    MkRedis[IO].fromUri(redisUri, clientOptions.some).use { redis =>
      for
        _ <- redis.ping
        _ <- IO.println("Connected with advanced configuration")
        
        // 接続情報の取得
        info <- redis.clientList
        _ <- IO.println(s"Client info: $info")
      yield ()
    }

ストリーミング処理とFs2統合

import fs2.*
import cats.effect.std.Queue

object StreamingExample extends IOApp.Simple:
  val run: IO[Unit] = 
    Redis[IO].utf8("redis://localhost").use { redis =>
      for
        // キューとストリームの統合
        queue <- Queue.unbounded[IO, String]
        
        // 定期的なデータ生成
        producer = Stream.awakeEvery[IO](1.second)
          .zipWithIndex
          .evalMap { case (_, idx) =>
            val data = s"data-$idx"
            redis.lPush("stream", List(data)) *> 
            queue.offer(data)
          }
          .take(10)
        
        // データ消費とRedisからの読み取り
        consumer = Stream.fromQueueUnterminated(queue)
          .evalMap { data =>
            redis.lPop("stream").flatMap {
              case Some(value) => IO.println(s"Processed: $value")
              case None => IO.println("No data in Redis")
            }
          }
          .take(10)
        
        // 並行ストリーミング
        _ <- Stream(producer, consumer)
          .parJoinUnbounded
          .compile
          .drain
      yield ()
    }