redis4cats
キャッシュライブラリ
redis4cats
概要
redis4catsは、Cats Effect、Fs2、Lettuceの上に構築されたScala向けの純粋関数型Redisクライアントライブラリで、型安全で非同期なRedis操作を提供し、関数型プログラミングのパラダイムに完全に統合されています。
詳細
redis4cats(レディス・フォー・キャッツ)は、Cats Effect、Fs2、非同期JavaクライアントLettuceの上に構築されたScala向けのRedisクライアントライブラリです。1.x.xシリーズはCats Effect 3上に、0.x.xシリーズはCats Effect 2上に構築されています。APIは非常に安定しており、本番環境で重要な使用実績がありますが、現在のところバージョン間でのバイナリ互換性は保証されていません。高レベルで安全かつ純粋関数型のAPIを提供し、Redis Streams、スクリプティングAPI、接続管理、パイプライニング、トランザクション、Pub/Sub機能などRedisの包括的な機能をサポートしています。単一ノードおよびクラスター接続の両方をサポートし、接続プールの管理を自動化し、リソースの安全性を保証します。すべての操作は型安全で、Scalaの関数型プログラミング原則に従って設計されており、副作用をIOモナドで適切にカプセル化します。log4catsとの統合により、構造化ロギングもサポートしています。redis4catsはCats Effectエコシステムの一部として、他のTypelevelライブラリ(fs2、cats、circe等)との相互運用性が高く、リアクティブな非同期アプリケーション開発に最適化されています。
メリット・デメリット
メリット
- 純粋関数型: 副作用のない型安全なRedis操作
- Cats Effect統合: 非同期・並行処理の強力なサポート
- Fs2統合: ストリーミング処理とRedis Streamsの自然な統合
- リソース安全: 自動的な接続管理とクリーンアップ
- 型安全性: コンパイル時の型チェックとエラー防止
- Typelevelエコシステム: 他のTypelevelライブラリとの優れた相互運用性
- 本番実績: 多くのプロダクション環境での使用実績
デメリット
- 学習コスト: Cats Effect、Fs2の知識が必要
- バイナリ互換性: バージョン間でのバイナリ互換性が保証されていない
- 関数型限定: 手続き型プログラミングスタイルには不向き
- 依存関係: 複数のTypelevelライブラリへの依存
- Scala専用: 他のJVM言語での使用は制限される
主要リンク
書き方の例
インストールと基本設定
// build.sbt
libraryDependencies ++= Seq(
"dev.profunktor" %% "redis4cats-effects" % "1.5.2",
"dev.profunktor" %% "redis4cats-streams" % "1.5.2",
"dev.profunktor" %% "redis4cats-log4cats" % "1.5.2"
)
// imports
import cats.effect.*
import cats.implicits.*
import dev.profunktor.redis4cats.Redis
import dev.profunktor.redis4cats.effect.Log.Stdout.given
基本的な接続と操作
import cats.effect.*
import dev.profunktor.redis4cats.Redis
import dev.profunktor.redis4cats.effect.Log.Stdout.given
object BasicRedisExample extends IOApp.Simple:
val run: IO[Unit] =
Redis[IO].utf8("redis://localhost").use { redis =>
for
// 基本的なset/get操作
_ <- redis.set("foo", "123")
x <- redis.get("foo")
_ <- IO.println(s"Value: $x") // Some("123")
// 条件付き設定
_ <- redis.setNx("foo", "should not happen")
y <- redis.get("foo")
_ <- IO.println(s"Value after setNx: $y") // Some("123")
// 数値操作
_ <- redis.set("counter", "0")
newVal <- redis.incr("counter")
_ <- IO.println(s"Counter: $newVal") // 1
// 期限付きキー
_ <- redis.setEx("temp", "temporary", 60.seconds)
temp <- redis.get("temp")
_ <- IO.println(s"Temp value: $temp")
yield ()
}
接続設定とクライアント管理
import cats.effect.*
import dev.profunktor.redis4cats.{Redis, RedisCommands}
import dev.profunktor.redis4cats.connection.*
import dev.profunktor.redis4cats.data.RedisCodec
import dev.profunktor.redis4cats.effect.MkRedis
object ConnectionExample extends IOApp.Simple:
val run: IO[Unit] =
// 高度な接続設定
val uri = RedisURI.create("redis://password@localhost:6379/2")
Redis[IO].fromUri(uri).use { redis =>
for
_ <- redis.ping
_ <- IO.println("Connected to Redis successfully")
// カスタムコーデックの使用
info <- redis.info
_ <- IO.println(s"Redis info: $info")
yield ()
}
// カスタムコーデックを使用した型安全な操作
case class User(id: String, name: String, email: String)
object TypeSafeRedisExample extends IOApp.Simple:
import io.circe.generic.auto.*
import dev.profunktor.redis4cats.circe.CirceCodec
given RedisCodec[String, User] = CirceCodec.fromJson[String, User]
val run: IO[Unit] =
Redis[IO].simple("redis://localhost").use { redis =>
val user = User("1", "Alice", "[email protected]")
for
_ <- redis.set("user:1", user)
retrievedUser <- redis.get("user:1")
_ <- IO.println(s"Retrieved user: $retrievedUser")
yield ()
}
パイプライニングとトランザクション
import cats.effect.*
import dev.profunktor.redis4cats.Redis
import dev.profunktor.redis4cats.pipeline.{Pipeline, TxPipeline}
import dev.profunktor.redis4cats.transactions.*
object PipelineExample extends IOApp.Simple:
val run: IO[Unit] =
Redis[IO].utf8("redis://localhost").use { redis =>
// パイプライニング
val pipeline = Pipeline(redis.pipeline).use { pipe =>
for
_ <- pipe.set("key1", "value1")
_ <- pipe.set("key2", "value2")
_ <- pipe.incr("counter")
_ <- pipe.get("key1")
_ <- pipe.get("key2")
yield ()
}
// トランザクション
val transaction = redis.multi.use { tx =>
for
_ <- tx.set("account:1", "100")
_ <- tx.set("account:2", "200")
_ <- tx.decrBy("account:1", 50)
_ <- tx.incrBy("account:2", 50)
yield ()
}
for
_ <- pipeline
_ <- IO.println("Pipeline executed")
_ <- transaction
_ <- IO.println("Transaction executed")
yield ()
}
リスト・セット・ハッシュ操作
object CollectionsExample extends IOApp.Simple:
val run: IO[Unit] =
Redis[IO].utf8("redis://localhost").use { redis =>
for
// リスト操作
_ <- redis.lPush("tasks", List("task1", "task2", "task3"))
_ <- redis.rPush("tasks", List("task4"))
tasks <- redis.lRange("tasks", 0, -1)
_ <- IO.println(s"Tasks: $tasks")
// セット操作
_ <- redis.sAdd("tags", Set("scala", "redis", "functional"))
members <- redis.sMembers("tags")
_ <- IO.println(s"Tags: $members")
// ハッシュ操作
userFields = Map(
"name" -> "John",
"email" -> "[email protected]",
"age" -> "30"
)
_ <- redis.hMSet("user:1", userFields)
userData <- redis.hGetAll("user:1")
_ <- IO.println(s"User data: $userData")
name <- redis.hGet("user:1", "name")
_ <- IO.println(s"User name: $name")
yield ()
}
Pub/Sub機能
import fs2.Stream
import dev.profunktor.redis4cats.pubsub.PubSub
object PubSubExample extends IOApp.Simple:
val run: IO[Unit] =
Redis[IO].utf8("redis://localhost").use { redis =>
PubSub.mkSubscriber(redis).use { subscriber =>
PubSub.mkPublisher(redis).use { publisher =>
val publisher1 = Stream.awakeEvery[IO](3.seconds)
.evalMap(_ => publisher.publish("notifications", "Hello World!"))
.take(5)
val subscriber1 = subscriber
.subscribe("notifications")
.evalMap(msg => IO.println(s"Received: ${msg.channel} -> ${msg.value}"))
.take(5)
// 並行実行
Stream(publisher1, subscriber1)
.parJoinUnbounded
.compile
.drain
}
}
}
Redis Streams
import dev.profunktor.redis4cats.streams.*
import dev.profunktor.redis4cats.data.*
object StreamsExample extends IOApp.Simple:
val run: IO[Unit] =
Redis[IO].utf8("redis://localhost").use { redis =>
val streamKey = StreamingKey("events")
val consumerGroup = ConsumerGroup("processors")
val consumer = Consumer("worker-1")
for
// ストリームへのメッセージ追加
msgId1 <- redis.xAdd(streamKey, Map("action" -> "user_login", "user_id" -> "123"))
msgId2 <- redis.xAdd(streamKey, Map("action" -> "user_logout", "user_id" -> "123"))
_ <- IO.println(s"Added messages: $msgId1, $msgId2")
// コンシューマグループ作成
_ <- redis.xGroupCreate(streamKey, consumerGroup, MessageId.earliest)
// メッセージ読み取り
messages <- redis.xReadGroup(consumerGroup, consumer, Map(streamKey -> MessageId.latest), 10)
_ <- messages.traverse_(msg =>
IO.println(s"Processing: ${msg.id} -> ${msg.body}") *>
redis.xAck(streamKey, consumerGroup, msg.id)
)
yield ()
}
エラーハンドリングとリトライ
import cats.effect.std.Console
import cats.syntax.all.*
import scala.concurrent.duration.*
object ErrorHandlingExample extends IOApp.Simple:
val run: IO[Unit] =
// リトライ機能付きRedis操作
def safeRedisOperation[A](operation: IO[A]): IO[Option[A]] =
operation.map(Some(_)).handleErrorWith { error =>
Console[IO].println(s"Redis error: ${error.getMessage}") *>
IO.pure(None)
}
// 指数バックオフ付きリトライ
def retryWithBackoff[A](operation: IO[A], maxRetries: Int = 3): IO[A] =
def retry(attempt: Int): IO[A] =
operation.handleErrorWith { error =>
if (attempt < maxRetries) {
val delay = (1 << attempt).seconds
Console[IO].println(s"Retry attempt $attempt after ${delay.toSeconds}s") *>
IO.sleep(delay) *> retry(attempt + 1)
} else {
IO.raiseError(error)
}
}
retry(0)
Redis[IO].utf8("redis://localhost").use { redis =>
for
// 安全な操作
result1 <- safeRedisOperation(redis.get("nonexistent"))
_ <- IO.println(s"Safe result: $result1")
// リトライ付き操作
result2 <- retryWithBackoff(redis.set("retry_key", "retry_value"))
_ <- IO.println(s"Retry result: $result2")
yield ()
}
接続プールと設定
import dev.profunktor.redis4cats.connection.*
import dev.profunktor.redis4cats.effect.MkRedis
import io.lettuce.core.{ClientOptions, SocketOptions}
object AdvancedConfigExample extends IOApp.Simple:
val run: IO[Unit] =
// 高度な接続設定
val socketOptions = SocketOptions.builder()
.connectTimeout(5.seconds)
.keepAlive(true)
.build()
val clientOptions = ClientOptions.builder()
.socketOptions(socketOptions)
.autoReconnect(true)
.build()
val redisUri = RedisURI.builder()
.withHost("localhost")
.withPort(6379)
.withDatabase(0)
.withPassword("password")
.withTimeout(5.seconds)
.build()
MkRedis[IO].fromUri(redisUri, clientOptions.some).use { redis =>
for
_ <- redis.ping
_ <- IO.println("Connected with advanced configuration")
// 接続情報の取得
info <- redis.clientList
_ <- IO.println(s"Client info: $info")
yield ()
}
ストリーミング処理とFs2統合
import fs2.*
import cats.effect.std.Queue
object StreamingExample extends IOApp.Simple:
val run: IO[Unit] =
Redis[IO].utf8("redis://localhost").use { redis =>
for
// キューとストリームの統合
queue <- Queue.unbounded[IO, String]
// 定期的なデータ生成
producer = Stream.awakeEvery[IO](1.second)
.zipWithIndex
.evalMap { case (_, idx) =>
val data = s"data-$idx"
redis.lPush("stream", List(data)) *>
queue.offer(data)
}
.take(10)
// データ消費とRedisからの読み取り
consumer = Stream.fromQueueUnterminated(queue)
.evalMap { data =>
redis.lPop("stream").flatMap {
case Some(value) => IO.println(s"Processed: $value")
case None => IO.println("No data in Redis")
}
}
.take(10)
// 並行ストリーミング
_ <- Stream(producer, consumer)
.parJoinUnbounded
.compile
.drain
yield ()
}