Akka Streams

Library for reactive stream processing. Achieves safe stream processing through backpressure control. Declaratively builds complex data pipelines and efficiently manages asynchronous and concurrent processing.

Scalareactivestreamingbackpressureasynchronousconcurrent processingflow control

Framework

Akka Streams

Overview

Akka Streams is a library for reactive stream processing. It achieves safe stream processing through backpressure control and enables declarative construction of complex data pipelines with efficient asynchronous and concurrent processing management.

Details

Akka Streams is a reactive streaming processing library released in 2015. It implements the Reactive Streams specification and achieves memory-efficient and safe stream processing by providing appropriate backpressure control between data producers and consumers. Its key feature is backpressure control, which automatically performs flow control when data consumption speed falls below production speed, preventing OutOfMemoryError. Stream processing flows can be declaratively defined using Graph DSL, making complex data pipelines intuitive to build. By combining three basic components - Source, Flow, and Sink - flexible streaming processing is achieved. It supports asynchronous and parallel processing, achieving high throughput. Alpakka connectors facilitate integration with external systems like Kafka, Cassandra, Elasticsearch, AWS, HTTP, and more.

Pros and Cons

Pros

  • Backpressure Control: Safe stream processing through automatic flow control
  • Declarative Description: Intuitive pipeline definition with Graph DSL
  • High Performance: High throughput through asynchronous and parallel processing
  • Type Safety: Safe processing through Scala's type system
  • Rich Integration: External system integration via Alpakka
  • Scalability: Distributed processing and cluster support

Cons

  • Learning Curve: Requires understanding reactive programming concepts
  • Debugging Complexity: Difficult troubleshooting of asynchronous processing
  • Memory Usage: Memory consumption due to internal buffering
  • Error Handling: Complexity in error propagation

Main Use Cases

  • Real-time data processing
  • IoT data streaming
  • Log processing pipelines
  • ETL processing
  • Event-driven architecture
  • Microservice communication
  • WebSocket communication

Basic Usage

Adding Dependencies

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.8.0"

Basic Stream Processing

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.NotUsed

import scala.concurrent.Future

// Create ActorSystem
implicit val system: ActorSystem = ActorSystem("StreamSystem")

// Basic Source
val source: Source[Int, NotUsed] = Source(1 to 10)

// Basic Flow
val multiplyFlow: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 2)

// Basic Sink
val printSink: Sink[Int, Future[Done]] = Sink.foreach[Int](println)

// Build and run stream
val result: Future[Done] = source
  .via(multiplyFlow)
  .runWith(printSink)

// Handle result
result.onComplete {
  case Success(_) => println("Stream completed successfully")
  case Failure(ex) => println(s"Stream failed: ${ex.getMessage}")
}

Filtering and Transformation

import akka.stream.scaladsl.{Flow, Keep, Sink, Source}

// Stream with multiple operations
val complexStream = Source(1 to 100)
  .filter(_ % 2 == 0)  // Even numbers only
  .map(_ * 3)          // Multiply by 3
  .take(10)            // First 10 elements
  .fold(0)(_ + _)      // Calculate sum
  .toMat(Sink.head)(Keep.right)  // Get result

val sumFuture: Future[Int] = complexStream.run()

sumFuture.foreach(sum => println(s"Sum: $sum"))

Backpressure Control Example

import akka.stream.{Attributes, OverflowStrategy}
import akka.stream.scaladsl.{Flow, Sink, Source}

// Fast Source
val fastSource = Source(1 to 1000000)

// Slow Sink
val slowSink = Sink.foreach[Int] { x =>
  Thread.sleep(10)  // 10ms processing time
  println(s"Processed: $x")
}

// Buffering and overflow strategy
val bufferedStream = fastSource
  .buffer(100, OverflowStrategy.backpressure)
  .async  // Create asynchronous boundary
  .runWith(slowSink)

// Memory usage is controlled through backpressure

Parallel Processing

import akka.stream.scaladsl.{Flow, Sink, Source}

// Parallel processing for CPU-bound tasks
val cpuIntensiveFlow = Flow[Int].mapAsync(4) { number =>
  Future {
    // Simulate heavy computation
    Thread.sleep(100)
    number * number
  }
}

val parallelStream = Source(1 to 20)
  .via(cpuIntensiveFlow)
  .runWith(Sink.foreach(println))

Error Handling

import akka.stream.{ActorAttributes, Supervision}
import akka.stream.scaladsl.{Flow, Sink, Source}

// Flow that may cause errors
val riskyFlow = Flow[String].map { str =>
  if (str.isEmpty) throw new IllegalArgumentException("Empty string")
  str.toUpperCase
}

// Error handling strategy
val decider: Supervision.Decider = {
  case _: IllegalArgumentException => Supervision.Resume
  case _ => Supervision.Stop
}

val errorHandlingStream = Source(List("hello", "", "world", ""))
  .via(riskyFlow)
  .withAttributes(ActorAttributes.supervisionStrategy(decider))
  .runWith(Sink.foreach(println))

Kafka Integration (Alpakka)

import akka.kafka.{ConsumerSettings, ProducerSettings}
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.stream.scaladsl.{Flow, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}

// Kafka consumer settings
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
  .withBootstrapServers("localhost:9092")
  .withGroupId("my-group")
  .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

// Kafka producer settings
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
  .withBootstrapServers("localhost:9092")

// Message processing flow
val messageProcessingFlow = Flow[ConsumerRecord[String, String]]
  .map { record =>
    val processedValue = record.value().toUpperCase
    new ProducerRecord[String, String]("output-topic", record.key(), processedValue)
  }

// Kafka streaming pipeline
val kafkaStream = Consumer
  .plainSource(consumerSettings, Subscriptions.topics("input-topic"))
  .via(messageProcessingFlow)
  .runWith(Producer.plainSink(producerSettings))

HTTP Streaming

import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.scaladsl.{Flow, Sink, Source}

// HTTP client flow
val httpClientFlow: Flow[HttpRequest, HttpResponse, Any] = 
  Http().outgoingConnection("api.example.com", 443, useHttps = true)

// Batch HTTP request processing
val urlsToFetch = Source(1 to 100)
  .map(i => HttpRequest(uri = s"/api/data/$i"))
  .via(httpClientFlow)
  .mapAsync(4) { response =>
    response.entity.toStrict(5.seconds).map(_.data.utf8String)
  }
  .runWith(Sink.foreach(println))

WebSocket Streaming

import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}

// WebSocket message flow
val webSocketFlow: Flow[Message, Message, Any] = Flow[Message]
  .collect {
    case TextMessage.Strict(text) => 
      // Process message and reply
      TextMessage(s"Echo: $text")
  }

// WebSocket client
val webSocketRequest = WebSocketRequest(uri = "ws://localhost:8080/websocket")
val (upgradeResponse, closed) = Http().singleWebSocketRequest(webSocketRequest, webSocketFlow)

// Verify connection
upgradeResponse.map { upgrade =>
  if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
    println("WebSocket connection established")
  } else {
    println(s"Connection failed: ${upgrade.response.status}")
  }
}

Custom GraphStage

import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}

// Custom filter stage
class CustomFilter[T](predicate: T => Boolean) extends GraphStage[FlowShape[T, T]] {
  val in = Inlet[T]("CustomFilter.in")
  val out = Outlet[T]("CustomFilter.out")
  
  override val shape = FlowShape.of(in, out)
  
  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new GraphStageLogic(shape) {
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          val element = grab(in)
          if (predicate(element)) {
            push(out, element)
          } else {
            pull(in)
          }
        }
      })
      
      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          pull(in)
        }
      })
    }
  }
}

// Using custom stage
val customFilterFlow = Flow.fromGraph(new CustomFilter[Int](_ % 2 == 0))

Source(1 to 10)
  .via(customFilterFlow)
  .runWith(Sink.foreach(println))

Latest Trends (2025)

  • Akka 2.8 Series: Performance improvements and Java 21 support
  • Alpakka 4.0: New connectors and Cloud Native support
  • GraphQL Integration: Real-time GraphQL subscriptions
  • Kubernetes Integration: Cluster discovery and health checks
  • Metrics Integration: Prometheus and Grafana support

Summary

Akka Streams is established as a proven solution for reactive stream processing in 2025. With safe processing through backpressure control, declarative description, and high performance, it plays an important role in real-time data processing and event-driven architectures.