FS2

関数型プログラミングベースのストリーミングライブラリ。純粋関数型でリソース安全なストリーム処理を提供。Cats Effectとの統合により、副作用を型レベルで管理し、合成可能なストリーム処理を実現。

Scala関数型プログラミングストリーミング純粋関数型リソース安全Cats Effect合成可能

フレームワーク

FS2

概要

FS2は、関数型プログラミングベースのストリーミングライブラリです。純粋関数型でリソース安全なストリーム処理を提供し、Cats Effectとの統合により副作用を型レベルで管理します。

詳細

FS2(Functional Streams for Scala)は、Typelevelエコシステムの一部として開発された関数型ストリーミングライブラリです。2016年から開発が開始され、純粋関数型プログラミングの原則に基づいて設計されています。最大の特徴は、すべての副作用を型レベルで管理することで、実行時エラーを大幅に削減し、合成可能なストリーム処理を実現することです。Cats Effectとの深い統合により、IOモナドを使用した安全なリソース管理と例外処理を提供。ストリームの構築から実行まで、すべてがイミュータブルで参照透過性を保ちます。背圧制御やファイナライザーによる自動リソース解放など、実用的な機能も備えています。Http4s、Doobie、Circe等の他のTypelevelライブラリとシームレスに統合でき、関数型プログラミングスタイルでの本格的なアプリケーション開発が可能です。

メリット・デメリット

メリット

  • 型安全性: 副作用を型レベルで管理し、コンパイル時エラー検出
  • 合成可能性: 小さなストリームコンポーネントを組み合わせて複雑な処理を構築
  • リソース安全: 自動的なリソース管理とクリーンアップ
  • 参照透過性: 純粋関数型による予測可能な動作
  • 豊富なコンビネータ: 豊富なストリーム操作関数
  • Cats Effectの恩恵: 強力な並行処理とエラーハンドリング

デメリット

  • 学習コスト: 関数型プログラミングの深い理解が必要
  • パフォーマンス: 純粋関数型アプローチによるオーバーヘッド
  • エラーメッセージ: 型エラーのメッセージが複雑になる場合
  • エコシステム: 主流のScalaライブラリとの統合が限定的

主な使用事例

  • 関数型アプリケーションのデータ処理
  • ファイルI/Oストリーミング
  • HTTP/WebSocketストリーミング
  • データベースストリーミング
  • 並行処理パイプライン
  • リアルタイムデータ変換
  • ETL処理

基本的な使い方

依存関係の追加

libraryDependencies ++= Seq(
  "co.fs2" %% "fs2-core" % "3.9.0",
  "co.fs2" %% "fs2-io" % "3.9.0",
  "org.typelevel" %% "cats-effect" % "3.5.0"
)

基本的なストリーム処理

import fs2.Stream
import cats.effect.{IO, IOApp}

object BasicStreamExample extends IOApp.Simple {
  def run: IO[Unit] = {
    // 基本的なストリーム
    val numbers = Stream.range(1, 11)
    
    // ストリームの変換と実行
    val program = numbers
      .map(_ * 2)
      .filter(_ % 4 == 0)
      .evalMap(n => IO.println(s"Processing: $n"))
      .compile
      .drain
    
    program
  }
}

IOとの統合

import fs2.Stream
import cats.effect.{IO, IOApp}
import scala.concurrent.duration._

object IOIntegrationExample extends IOApp.Simple {
  def run: IO[Unit] = {
    // IOベースのストリーム処理
    val ioStream = Stream
      .repeatEval(IO.println("Tick").as(1))
      .metered(1.second)
      .take(5)
      .compile
      .drain
    
    // 並行処理
    val parallelStream = Stream
      .range(1, 21)
      .evalMap { n =>
        IO.sleep(100.millis) >> IO.println(s"Processed: $n")
      }
      .parEvalMap(4)(_ => IO.unit)  // 4つの並行処理
      .compile
      .drain
    
    ioStream >> parallelStream
  }
}

ファイル処理

import fs2.{Stream, text}
import fs2.io.file.{Files, Path}
import cats.effect.{IO, IOApp}

object FileProcessingExample extends IOApp.Simple {
  def run: IO[Unit] = {
    val inputPath = Path("input.txt")
    val outputPath = Path("output.txt")
    
    // ファイルの読み込み、処理、書き込み
    val fileProcessing = Files[IO]
      .readAll(inputPath)
      .through(text.utf8.decode)
      .through(text.lines)
      .filter(_.nonEmpty)
      .map(_.toUpperCase)
      .intersperse("\n")
      .through(text.utf8.encode)
      .through(Files[IO].writeAll(outputPath))
      .compile
      .drain
    
    fileProcessing
  }
}

リソース管理

import fs2.Stream
import cats.effect.{IO, IOApp, Resource}
import java.io.{BufferedReader, FileReader}

object ResourceManagementExample extends IOApp.Simple {
  def run: IO[Unit] = {
    // リソースの安全な管理
    val fileResource = Resource.make(
      IO(new BufferedReader(new FileReader("data.txt")))
    )(reader => IO(reader.close()))
    
    // ストリームでのリソース使用
    val streamWithResource = Stream
      .resource(fileResource)
      .flatMap { reader =>
        Stream
          .repeatEval(IO(Option(reader.readLine())))
          .unNoneTerminate
      }
      .evalMap(line => IO.println(s"Read: $line"))
      .compile
      .drain
    
    streamWithResource
  }
}

エラーハンドリング

import fs2.Stream
import cats.effect.{IO, IOApp}

object ErrorHandlingExample extends IOApp.Simple {
  def run: IO[Unit] = {
    // エラーが発生する可能性のあるストリーム
    val riskyStream = Stream
      .range(1, 11)
      .evalMap { n =>
        if (n == 5) IO.raiseError(new RuntimeException("Error at 5"))
        else IO.println(s"Processing: $n")
      }
    
    // エラーハンドリング
    val recoveredStream = riskyStream
      .handleErrorWith { error =>
        Stream.eval(IO.println(s"Error occurred: ${error.getMessage}"))
      }
      .compile
      .drain
    
    // 再試行
    val retryStream = riskyStream
      .retry(3)
      .compile
      .drain
    
    recoveredStream >> retryStream
  }
}

並行処理

import fs2.Stream
import cats.effect.{IO, IOApp}
import scala.concurrent.duration._

object ConcurrentProcessingExample extends IOApp.Simple {
  def run: IO[Unit] = {
    // 並行ストリーム処理
    val producer = Stream
      .repeatEval(IO.println("Producing item"))
      .metered(500.millis)
      .take(10)
    
    val consumer = Stream
      .repeatEval(IO.println("Consuming item"))
      .metered(1.second)
      .take(5)
    
    // 並行実行
    val concurrentStreams = Stream(producer, consumer)
      .parJoin(2)
      .compile
      .drain
    
    concurrentStreams
  }
}

HTTP クライアント(Http4s統合)

import fs2.Stream
import cats.effect.{IO, IOApp}
import org.http4s.client.{Client, JavaNetClientBuilder}
import org.http4s.{Request, Uri}

object HttpClientExample extends IOApp.Simple {
  def run: IO[Unit] = {
    JavaNetClientBuilder[IO].create.use { client =>
      val urls = List(
        "https://api.example.com/data/1",
        "https://api.example.com/data/2",
        "https://api.example.com/data/3"
      )
      
      // 並行HTTP リクエスト
      val httpRequests = Stream
        .emits(urls)
        .evalMap(url => IO.fromEither(Uri.fromString(url)))
        .map(uri => Request[IO](uri = uri))
        .parEvalMap(3) { request =>
          client.expect[String](request)
            .handleErrorWith(error => IO(s"Error: ${error.getMessage}"))
        }
        .evalMap(response => IO.println(s"Response: $response"))
        .compile
        .drain
      
      httpRequests
    }
  }
}

データベース処理(Doobie統合)

import fs2.Stream
import cats.effect.{IO, IOApp}
import doobie._
import doobie.implicits._

object DatabaseExample extends IOApp.Simple {
  case class User(id: Int, name: String, email: String)
  
  // データベース接続
  val transactor = Transactor.fromDriverManager[IO](
    "org.postgresql.Driver",
    "jdbc:postgresql://localhost/mydb",
    "user",
    "password"
  )
  
  def run: IO[Unit] = {
    // ストリーミングクエリ
    val usersStream = sql"SELECT id, name, email FROM users"
      .query[User]
      .stream
      .transact(transactor)
    
    // データの処理
    val processedUsers = usersStream
      .filter(_.name.startsWith("J"))
      .map(user => user.copy(email = user.email.toLowerCase))
      .evalMap(user => IO.println(s"User: $user"))
      .compile
      .drain
    
    processedUsers
  }
}

カスタムストリーム変換

import fs2.{Pipe, Stream}
import cats.effect.{IO, IOApp}

object CustomTransformationExample extends IOApp.Simple {
  // カスタムパイプの定義
  def batchPipe[A](batchSize: Int): Pipe[IO, A, List[A]] = {
    _.chunkN(batchSize).map(_.toList)
  }
  
  def logPipe[A]: Pipe[IO, A, A] = {
    _.evalMap { a =>
      IO.println(s"Processing: $a").as(a)
    }
  }
  
  def run: IO[Unit] = {
    val program = Stream
      .range(1, 21)
      .through(logPipe)
      .through(batchPipe(3))
      .evalMap(batch => IO.println(s"Batch: $batch"))
      .compile
      .drain
    
    program
  }
}

最新のトレンド(2025年)

  • FS2 3.9系: パフォーマンス向上とCats Effect 3.5対応
  • Scala 3対応: 最新のScala機能との統合
  • gRPCサポート: ストリーミングgRPCサービス
  • Observability: OpenTelemetry統合
  • WebAssembly: Scala.js経由でのブラウザ実行

まとめ

FS2は2025年において、関数型プログラミングでのストリーミング処理の標準的な選択肢として確立されています。型安全性、合成可能性、リソース安全性により、堅牢で保守しやすいストリーミングアプリケーションを構築できます。Typelevelエコシステムとの統合により、本格的な関数型アプリケーション開発が可能です。