Akka Streams

リアクティブストリーミング処理のためのライブラリ。背圧制御(バックプレッシャー)により安全なストリーム処理を実現。複雑なデータパイプラインを宣言的に構築し、非同期・並行処理を効率的に管理。

Scalaリアクティブストリーミング背圧制御非同期並行処理バックプレッシャー

フレームワーク

Akka Streams

概要

Akka Streamsは、リアクティブストリーミング処理のためのライブラリです。背圧制御(バックプレッシャー)により安全なストリーム処理を実現し、複雑なデータパイプラインを宣言的に構築できます。

詳細

Akka Streams(アッカストリームス)は、2015年にリリースされたリアクティブストリーミング処理ライブラリです。Reactive Streamsの仕様を実装し、データの生産者と消費者の間で適切な背圧制御を行うことで、メモリ効率的で安全なストリーム処理を実現します。最大の特徴は背圧制御機能で、データの消費速度が生産速度を下回った場合に自動的にフロー制御を行い、OutOfMemoryErrorを防ぎます。Graph DSLを使用してストリーム処理フローを宣言的に定義でき、複雑なデータパイプラインも直感的に構築。Source、Flow、Sinkの3つの基本構成要素を組み合わせることで、柔軟なストリーミング処理を実現します。非同期・並列処理をサポートし、高いスループットを達成。Alpakkaコネクタにより、Kafka、Cassandra、Elasticsearch、AWS、HTTP等の外部システムとの統合が容易です。

メリット・デメリット

メリット

  • 背圧制御: 自動的なフロー制御により安全なストリーム処理
  • 宣言的な記述: Graph DSLによる直感的なパイプライン定義
  • 高性能: 非同期・並列処理による高スループット
  • 型安全性: Scalaの型システムによる安全な処理
  • 豊富な統合: Alpakkaによる外部システム連携
  • スケーラビリティ: 分散処理とクラスター対応

デメリット

  • 学習コスト: リアクティブプログラミングの概念習得が必要
  • デバッグの複雑性: 非同期処理のトラブルシューティング困難
  • メモリ使用量: 内部バッファリングによるメモリ消費
  • エラーハンドリング: エラー伝播の複雑性

主な使用事例

  • リアルタイムデータ処理
  • IoTデータストリーミング
  • ログ処理パイプライン
  • ETL処理
  • イベントドリブンアーキテクチャ
  • マイクロサービス間の通信
  • WebSocket通信

基本的な使い方

依存関係の追加

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.8.0"

基本的なストリーム処理

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.NotUsed

import scala.concurrent.Future

// ActorSystemの作成
implicit val system: ActorSystem = ActorSystem("StreamSystem")

// 基本的なSource
val source: Source[Int, NotUsed] = Source(1 to 10)

// 基本的なFlow
val multiplyFlow: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 2)

// 基本的なSink
val printSink: Sink[Int, Future[Done]] = Sink.foreach[Int](println)

// ストリームの構築と実行
val result: Future[Done] = source
  .via(multiplyFlow)
  .runWith(printSink)

// 結果の処理
result.onComplete {
  case Success(_) => println("Stream completed successfully")
  case Failure(ex) => println(s"Stream failed: ${ex.getMessage}")
}

フィルタリングと変換

import akka.stream.scaladsl.{Flow, Keep, Sink, Source}

// 複数の処理を組み合わせたストリーム
val complexStream = Source(1 to 100)
  .filter(_ % 2 == 0)  // 偶数のみ
  .map(_ * 3)          // 3倍
  .take(10)            // 最初の10個
  .fold(0)(_ + _)      // 合計を計算
  .toMat(Sink.head)(Keep.right)  // 結果を取得

val sumFuture: Future[Int] = complexStream.run()

sumFuture.foreach(sum => println(s"Sum: $sum"))

背圧制御の例

import akka.stream.{Attributes, OverflowStrategy}
import akka.stream.scaladsl.{Flow, Sink, Source}

// 高速なSource
val fastSource = Source(1 to 1000000)

// 遅いSink
val slowSink = Sink.foreach[Int] { x =>
  Thread.sleep(10)  // 処理に10ms
  println(s"Processed: $x")
}

// バッファリングとオーバーフロー戦略
val bufferedStream = fastSource
  .buffer(100, OverflowStrategy.backpressure)
  .async  // 非同期境界を作成
  .runWith(slowSink)

// 背圧制御により、メモリ使用量が制御される

並列処理

import akka.stream.scaladsl.{Flow, Sink, Source}

// CPUバウンドなタスクの並列処理
val cpuIntensiveFlow = Flow[Int].mapAsync(4) { number =>
  Future {
    // 重い計算処理のシミュレーション
    Thread.sleep(100)
    number * number
  }
}

val parallelStream = Source(1 to 20)
  .via(cpuIntensiveFlow)
  .runWith(Sink.foreach(println))

エラーハンドリング

import akka.stream.{ActorAttributes, Supervision}
import akka.stream.scaladsl.{Flow, Sink, Source}

// エラーが発生する可能性のある処理
val riskyFlow = Flow[String].map { str =>
  if (str.isEmpty) throw new IllegalArgumentException("Empty string")
  str.toUpperCase
}

// エラー処理戦略
val decider: Supervision.Decider = {
  case _: IllegalArgumentException => Supervision.Resume
  case _ => Supervision.Stop
}

val errorHandlingStream = Source(List("hello", "", "world", ""))
  .via(riskyFlow)
  .withAttributes(ActorAttributes.supervisionStrategy(decider))
  .runWith(Sink.foreach(println))

Kafkaとの統合(Alpakka)

import akka.kafka.{ConsumerSettings, ProducerSettings}
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.stream.scaladsl.{Flow, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}

// Kafkaコンシューマーの設定
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
  .withBootstrapServers("localhost:9092")
  .withGroupId("my-group")
  .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

// Kafkaプロデューサーの設定
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
  .withBootstrapServers("localhost:9092")

// メッセージ処理フロー
val messageProcessingFlow = Flow[ConsumerRecord[String, String]]
  .map { record =>
    val processedValue = record.value().toUpperCase
    new ProducerRecord[String, String]("output-topic", record.key(), processedValue)
  }

// Kafkaストリーミングパイプライン
val kafkaStream = Consumer
  .plainSource(consumerSettings, Subscriptions.topics("input-topic"))
  .via(messageProcessingFlow)
  .runWith(Producer.plainSink(producerSettings))

HTTPストリーミング

import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.scaladsl.{Flow, Sink, Source}

// HTTPクライアントのフロー
val httpClientFlow: Flow[HttpRequest, HttpResponse, Any] = 
  Http().outgoingConnection("api.example.com", 443, useHttps = true)

// バッチHTTPリクエスト処理
val urlsToFetch = Source(1 to 100)
  .map(i => HttpRequest(uri = s"/api/data/$i"))
  .via(httpClientFlow)
  .mapAsync(4) { response =>
    response.entity.toStrict(5.seconds).map(_.data.utf8String)
  }
  .runWith(Sink.foreach(println))

WebSocketストリーミング

import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}

// WebSocketメッセージのフロー
val webSocketFlow: Flow[Message, Message, Any] = Flow[Message]
  .collect {
    case TextMessage.Strict(text) => 
      // メッセージを処理して返信
      TextMessage(s"Echo: $text")
  }

// WebSocketクライアント
val webSocketRequest = WebSocketRequest(uri = "ws://localhost:8080/websocket")
val (upgradeResponse, closed) = Http().singleWebSocketRequest(webSocketRequest, webSocketFlow)

// 接続の確認
upgradeResponse.map { upgrade =>
  if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
    println("WebSocket connection established")
  } else {
    println(s"Connection failed: ${upgrade.response.status}")
  }
}

カスタムGraphStage

import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}

// カスタムフィルタステージ
class CustomFilter[T](predicate: T => Boolean) extends GraphStage[FlowShape[T, T]] {
  val in = Inlet[T]("CustomFilter.in")
  val out = Outlet[T]("CustomFilter.out")
  
  override val shape = FlowShape.of(in, out)
  
  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new GraphStageLogic(shape) {
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          val element = grab(in)
          if (predicate(element)) {
            push(out, element)
          } else {
            pull(in)
          }
        }
      })
      
      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          pull(in)
        }
      })
    }
  }
}

// カスタムステージの使用
val customFilterFlow = Flow.fromGraph(new CustomFilter[Int](_ % 2 == 0))

Source(1 to 10)
  .via(customFilterFlow)
  .runWith(Sink.foreach(println))

最新のトレンド(2025年)

  • Akka 2.8系: パフォーマンス向上とJava 21対応
  • Alpakka 4.0: 新しいコネクタとCloud Native対応
  • GraphQL統合: リアルタイムGraphQLサブスクリプション
  • Kubernetes連携: クラスター発見とヘルスチェック
  • メトリクス統合: Prometheus、Grafana対応

まとめ

Akka Streamsは2025年において、リアクティブストリーミング処理の実証済みソリューションとして確立されています。背圧制御による安全な処理、宣言的な記述、高いパフォーマンスにより、リアルタイムデータ処理やイベントドリブンアーキテクチャで重要な役割を果たしています。