FS2
関数型プログラミングベースのストリーミングライブラリ。純粋関数型でリソース安全なストリーム処理を提供。Cats Effectとの統合により、副作用を型レベルで管理し、合成可能なストリーム処理を実現。
フレームワーク
FS2
概要
FS2は、関数型プログラミングベースのストリーミングライブラリです。純粋関数型でリソース安全なストリーム処理を提供し、Cats Effectとの統合により副作用を型レベルで管理します。
詳細
FS2(Functional Streams for Scala)は、Typelevelエコシステムの一部として開発された関数型ストリーミングライブラリです。2016年から開発が開始され、純粋関数型プログラミングの原則に基づいて設計されています。最大の特徴は、すべての副作用を型レベルで管理することで、実行時エラーを大幅に削減し、合成可能なストリーム処理を実現することです。Cats Effectとの深い統合により、IOモナドを使用した安全なリソース管理と例外処理を提供。ストリームの構築から実行まで、すべてがイミュータブルで参照透過性を保ちます。背圧制御やファイナライザーによる自動リソース解放など、実用的な機能も備えています。Http4s、Doobie、Circe等の他のTypelevelライブラリとシームレスに統合でき、関数型プログラミングスタイルでの本格的なアプリケーション開発が可能です。
メリット・デメリット
メリット
- 型安全性: 副作用を型レベルで管理し、コンパイル時エラー検出
- 合成可能性: 小さなストリームコンポーネントを組み合わせて複雑な処理を構築
- リソース安全: 自動的なリソース管理とクリーンアップ
- 参照透過性: 純粋関数型による予測可能な動作
- 豊富なコンビネータ: 豊富なストリーム操作関数
- Cats Effectの恩恵: 強力な並行処理とエラーハンドリング
デメリット
- 学習コスト: 関数型プログラミングの深い理解が必要
- パフォーマンス: 純粋関数型アプローチによるオーバーヘッド
- エラーメッセージ: 型エラーのメッセージが複雑になる場合
- エコシステム: 主流のScalaライブラリとの統合が限定的
主な使用事例
- 関数型アプリケーションのデータ処理
- ファイルI/Oストリーミング
- HTTP/WebSocketストリーミング
- データベースストリーミング
- 並行処理パイプライン
- リアルタイムデータ変換
- ETL処理
基本的な使い方
依存関係の追加
libraryDependencies ++= Seq(
"co.fs2" %% "fs2-core" % "3.9.0",
"co.fs2" %% "fs2-io" % "3.9.0",
"org.typelevel" %% "cats-effect" % "3.5.0"
)
基本的なストリーム処理
import fs2.Stream
import cats.effect.{IO, IOApp}
object BasicStreamExample extends IOApp.Simple {
def run: IO[Unit] = {
// 基本的なストリーム
val numbers = Stream.range(1, 11)
// ストリームの変換と実行
val program = numbers
.map(_ * 2)
.filter(_ % 4 == 0)
.evalMap(n => IO.println(s"Processing: $n"))
.compile
.drain
program
}
}
IOとの統合
import fs2.Stream
import cats.effect.{IO, IOApp}
import scala.concurrent.duration._
object IOIntegrationExample extends IOApp.Simple {
def run: IO[Unit] = {
// IOベースのストリーム処理
val ioStream = Stream
.repeatEval(IO.println("Tick").as(1))
.metered(1.second)
.take(5)
.compile
.drain
// 並行処理
val parallelStream = Stream
.range(1, 21)
.evalMap { n =>
IO.sleep(100.millis) >> IO.println(s"Processed: $n")
}
.parEvalMap(4)(_ => IO.unit) // 4つの並行処理
.compile
.drain
ioStream >> parallelStream
}
}
ファイル処理
import fs2.{Stream, text}
import fs2.io.file.{Files, Path}
import cats.effect.{IO, IOApp}
object FileProcessingExample extends IOApp.Simple {
def run: IO[Unit] = {
val inputPath = Path("input.txt")
val outputPath = Path("output.txt")
// ファイルの読み込み、処理、書き込み
val fileProcessing = Files[IO]
.readAll(inputPath)
.through(text.utf8.decode)
.through(text.lines)
.filter(_.nonEmpty)
.map(_.toUpperCase)
.intersperse("\n")
.through(text.utf8.encode)
.through(Files[IO].writeAll(outputPath))
.compile
.drain
fileProcessing
}
}
リソース管理
import fs2.Stream
import cats.effect.{IO, IOApp, Resource}
import java.io.{BufferedReader, FileReader}
object ResourceManagementExample extends IOApp.Simple {
def run: IO[Unit] = {
// リソースの安全な管理
val fileResource = Resource.make(
IO(new BufferedReader(new FileReader("data.txt")))
)(reader => IO(reader.close()))
// ストリームでのリソース使用
val streamWithResource = Stream
.resource(fileResource)
.flatMap { reader =>
Stream
.repeatEval(IO(Option(reader.readLine())))
.unNoneTerminate
}
.evalMap(line => IO.println(s"Read: $line"))
.compile
.drain
streamWithResource
}
}
エラーハンドリング
import fs2.Stream
import cats.effect.{IO, IOApp}
object ErrorHandlingExample extends IOApp.Simple {
def run: IO[Unit] = {
// エラーが発生する可能性のあるストリーム
val riskyStream = Stream
.range(1, 11)
.evalMap { n =>
if (n == 5) IO.raiseError(new RuntimeException("Error at 5"))
else IO.println(s"Processing: $n")
}
// エラーハンドリング
val recoveredStream = riskyStream
.handleErrorWith { error =>
Stream.eval(IO.println(s"Error occurred: ${error.getMessage}"))
}
.compile
.drain
// 再試行
val retryStream = riskyStream
.retry(3)
.compile
.drain
recoveredStream >> retryStream
}
}
並行処理
import fs2.Stream
import cats.effect.{IO, IOApp}
import scala.concurrent.duration._
object ConcurrentProcessingExample extends IOApp.Simple {
def run: IO[Unit] = {
// 並行ストリーム処理
val producer = Stream
.repeatEval(IO.println("Producing item"))
.metered(500.millis)
.take(10)
val consumer = Stream
.repeatEval(IO.println("Consuming item"))
.metered(1.second)
.take(5)
// 並行実行
val concurrentStreams = Stream(producer, consumer)
.parJoin(2)
.compile
.drain
concurrentStreams
}
}
HTTP クライアント(Http4s統合)
import fs2.Stream
import cats.effect.{IO, IOApp}
import org.http4s.client.{Client, JavaNetClientBuilder}
import org.http4s.{Request, Uri}
object HttpClientExample extends IOApp.Simple {
def run: IO[Unit] = {
JavaNetClientBuilder[IO].create.use { client =>
val urls = List(
"https://api.example.com/data/1",
"https://api.example.com/data/2",
"https://api.example.com/data/3"
)
// 並行HTTP リクエスト
val httpRequests = Stream
.emits(urls)
.evalMap(url => IO.fromEither(Uri.fromString(url)))
.map(uri => Request[IO](uri = uri))
.parEvalMap(3) { request =>
client.expect[String](request)
.handleErrorWith(error => IO(s"Error: ${error.getMessage}"))
}
.evalMap(response => IO.println(s"Response: $response"))
.compile
.drain
httpRequests
}
}
}
データベース処理(Doobie統合)
import fs2.Stream
import cats.effect.{IO, IOApp}
import doobie._
import doobie.implicits._
object DatabaseExample extends IOApp.Simple {
case class User(id: Int, name: String, email: String)
// データベース接続
val transactor = Transactor.fromDriverManager[IO](
"org.postgresql.Driver",
"jdbc:postgresql://localhost/mydb",
"user",
"password"
)
def run: IO[Unit] = {
// ストリーミングクエリ
val usersStream = sql"SELECT id, name, email FROM users"
.query[User]
.stream
.transact(transactor)
// データの処理
val processedUsers = usersStream
.filter(_.name.startsWith("J"))
.map(user => user.copy(email = user.email.toLowerCase))
.evalMap(user => IO.println(s"User: $user"))
.compile
.drain
processedUsers
}
}
カスタムストリーム変換
import fs2.{Pipe, Stream}
import cats.effect.{IO, IOApp}
object CustomTransformationExample extends IOApp.Simple {
// カスタムパイプの定義
def batchPipe[A](batchSize: Int): Pipe[IO, A, List[A]] = {
_.chunkN(batchSize).map(_.toList)
}
def logPipe[A]: Pipe[IO, A, A] = {
_.evalMap { a =>
IO.println(s"Processing: $a").as(a)
}
}
def run: IO[Unit] = {
val program = Stream
.range(1, 21)
.through(logPipe)
.through(batchPipe(3))
.evalMap(batch => IO.println(s"Batch: $batch"))
.compile
.drain
program
}
}
最新のトレンド(2025年)
- FS2 3.9系: パフォーマンス向上とCats Effect 3.5対応
- Scala 3対応: 最新のScala機能との統合
- gRPCサポート: ストリーミングgRPCサービス
- Observability: OpenTelemetry統合
- WebAssembly: Scala.js経由でのブラウザ実行
まとめ
FS2は2025年において、関数型プログラミングでのストリーミング処理の標準的な選択肢として確立されています。型安全性、合成可能性、リソース安全性により、堅牢で保守しやすいストリーミングアプリケーションを構築できます。Typelevelエコシステムとの統合により、本格的な関数型アプリケーション開発が可能です。