Akka Streams
リアクティブストリーミング処理のためのライブラリ。背圧制御(バックプレッシャー)により安全なストリーム処理を実現。複雑なデータパイプラインを宣言的に構築し、非同期・並行処理を効率的に管理。
フレームワーク
Akka Streams
概要
Akka Streamsは、リアクティブストリーミング処理のためのライブラリです。背圧制御(バックプレッシャー)により安全なストリーム処理を実現し、複雑なデータパイプラインを宣言的に構築できます。
詳細
Akka Streams(アッカストリームス)は、2015年にリリースされたリアクティブストリーミング処理ライブラリです。Reactive Streamsの仕様を実装し、データの生産者と消費者の間で適切な背圧制御を行うことで、メモリ効率的で安全なストリーム処理を実現します。最大の特徴は背圧制御機能で、データの消費速度が生産速度を下回った場合に自動的にフロー制御を行い、OutOfMemoryErrorを防ぎます。Graph DSLを使用してストリーム処理フローを宣言的に定義でき、複雑なデータパイプラインも直感的に構築。Source、Flow、Sinkの3つの基本構成要素を組み合わせることで、柔軟なストリーミング処理を実現します。非同期・並列処理をサポートし、高いスループットを達成。Alpakkaコネクタにより、Kafka、Cassandra、Elasticsearch、AWS、HTTP等の外部システムとの統合が容易です。
メリット・デメリット
メリット
- 背圧制御: 自動的なフロー制御により安全なストリーム処理
- 宣言的な記述: Graph DSLによる直感的なパイプライン定義
- 高性能: 非同期・並列処理による高スループット
- 型安全性: Scalaの型システムによる安全な処理
- 豊富な統合: Alpakkaによる外部システム連携
- スケーラビリティ: 分散処理とクラスター対応
デメリット
- 学習コスト: リアクティブプログラミングの概念習得が必要
- デバッグの複雑性: 非同期処理のトラブルシューティング困難
- メモリ使用量: 内部バッファリングによるメモリ消費
- エラーハンドリング: エラー伝播の複雑性
主な使用事例
- リアルタイムデータ処理
- IoTデータストリーミング
- ログ処理パイプライン
- ETL処理
- イベントドリブンアーキテクチャ
- マイクロサービス間の通信
- WebSocket通信
基本的な使い方
依存関係の追加
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.8.0"
基本的なストリーム処理
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.NotUsed
import scala.concurrent.Future
// ActorSystemの作成
implicit val system: ActorSystem = ActorSystem("StreamSystem")
// 基本的なSource
val source: Source[Int, NotUsed] = Source(1 to 10)
// 基本的なFlow
val multiplyFlow: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 2)
// 基本的なSink
val printSink: Sink[Int, Future[Done]] = Sink.foreach[Int](println)
// ストリームの構築と実行
val result: Future[Done] = source
.via(multiplyFlow)
.runWith(printSink)
// 結果の処理
result.onComplete {
case Success(_) => println("Stream completed successfully")
case Failure(ex) => println(s"Stream failed: ${ex.getMessage}")
}
フィルタリングと変換
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
// 複数の処理を組み合わせたストリーム
val complexStream = Source(1 to 100)
.filter(_ % 2 == 0) // 偶数のみ
.map(_ * 3) // 3倍
.take(10) // 最初の10個
.fold(0)(_ + _) // 合計を計算
.toMat(Sink.head)(Keep.right) // 結果を取得
val sumFuture: Future[Int] = complexStream.run()
sumFuture.foreach(sum => println(s"Sum: $sum"))
背圧制御の例
import akka.stream.{Attributes, OverflowStrategy}
import akka.stream.scaladsl.{Flow, Sink, Source}
// 高速なSource
val fastSource = Source(1 to 1000000)
// 遅いSink
val slowSink = Sink.foreach[Int] { x =>
Thread.sleep(10) // 処理に10ms
println(s"Processed: $x")
}
// バッファリングとオーバーフロー戦略
val bufferedStream = fastSource
.buffer(100, OverflowStrategy.backpressure)
.async // 非同期境界を作成
.runWith(slowSink)
// 背圧制御により、メモリ使用量が制御される
並列処理
import akka.stream.scaladsl.{Flow, Sink, Source}
// CPUバウンドなタスクの並列処理
val cpuIntensiveFlow = Flow[Int].mapAsync(4) { number =>
Future {
// 重い計算処理のシミュレーション
Thread.sleep(100)
number * number
}
}
val parallelStream = Source(1 to 20)
.via(cpuIntensiveFlow)
.runWith(Sink.foreach(println))
エラーハンドリング
import akka.stream.{ActorAttributes, Supervision}
import akka.stream.scaladsl.{Flow, Sink, Source}
// エラーが発生する可能性のある処理
val riskyFlow = Flow[String].map { str =>
if (str.isEmpty) throw new IllegalArgumentException("Empty string")
str.toUpperCase
}
// エラー処理戦略
val decider: Supervision.Decider = {
case _: IllegalArgumentException => Supervision.Resume
case _ => Supervision.Stop
}
val errorHandlingStream = Source(List("hello", "", "world", ""))
.via(riskyFlow)
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runWith(Sink.foreach(println))
Kafkaとの統合(Alpakka)
import akka.kafka.{ConsumerSettings, ProducerSettings}
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.stream.scaladsl.{Flow, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
// Kafkaコンシューマーの設定
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("my-group")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// Kafkaプロデューサーの設定
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
// メッセージ処理フロー
val messageProcessingFlow = Flow[ConsumerRecord[String, String]]
.map { record =>
val processedValue = record.value().toUpperCase
new ProducerRecord[String, String]("output-topic", record.key(), processedValue)
}
// Kafkaストリーミングパイプライン
val kafkaStream = Consumer
.plainSource(consumerSettings, Subscriptions.topics("input-topic"))
.via(messageProcessingFlow)
.runWith(Producer.plainSink(producerSettings))
HTTPストリーミング
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.scaladsl.{Flow, Sink, Source}
// HTTPクライアントのフロー
val httpClientFlow: Flow[HttpRequest, HttpResponse, Any] =
Http().outgoingConnection("api.example.com", 443, useHttps = true)
// バッチHTTPリクエスト処理
val urlsToFetch = Source(1 to 100)
.map(i => HttpRequest(uri = s"/api/data/$i"))
.via(httpClientFlow)
.mapAsync(4) { response =>
response.entity.toStrict(5.seconds).map(_.data.utf8String)
}
.runWith(Sink.foreach(println))
WebSocketストリーミング
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
// WebSocketメッセージのフロー
val webSocketFlow: Flow[Message, Message, Any] = Flow[Message]
.collect {
case TextMessage.Strict(text) =>
// メッセージを処理して返信
TextMessage(s"Echo: $text")
}
// WebSocketクライアント
val webSocketRequest = WebSocketRequest(uri = "ws://localhost:8080/websocket")
val (upgradeResponse, closed) = Http().singleWebSocketRequest(webSocketRequest, webSocketFlow)
// 接続の確認
upgradeResponse.map { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
println("WebSocket connection established")
} else {
println(s"Connection failed: ${upgrade.response.status}")
}
}
カスタムGraphStage
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
// カスタムフィルタステージ
class CustomFilter[T](predicate: T => Boolean) extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("CustomFilter.in")
val out = Outlet[T]("CustomFilter.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val element = grab(in)
if (predicate(element)) {
push(out, element)
} else {
pull(in)
}
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
}
// カスタムステージの使用
val customFilterFlow = Flow.fromGraph(new CustomFilter[Int](_ % 2 == 0))
Source(1 to 10)
.via(customFilterFlow)
.runWith(Sink.foreach(println))
最新のトレンド(2025年)
- Akka 2.8系: パフォーマンス向上とJava 21対応
- Alpakka 4.0: 新しいコネクタとCloud Native対応
- GraphQL統合: リアルタイムGraphQLサブスクリプション
- Kubernetes連携: クラスター発見とヘルスチェック
- メトリクス統合: Prometheus、Grafana対応
まとめ
Akka Streamsは2025年において、リアクティブストリーミング処理の実証済みソリューションとして確立されています。背圧制御による安全な処理、宣言的な記述、高いパフォーマンスにより、リアルタイムデータ処理やイベントドリブンアーキテクチャで重要な役割を果たしています。