Akka HTTP
HTTP server and client library for Scala/Java. High-performance async HTTP communication based on Akka actor model, streaming support, intuitive API design with DSL. Mature solution optimized for microservices, high-load systems, and real-time communication.
GitHub Overview
akka/akka-http
The Streaming-first HTTP server/module of Akka
Topics
Star History
Library
Akka HTTP
Overview
Akka HTTP is "a high-performance HTTP client and server library for Scala/Java" developed as an asynchronous streaming HTTP toolkit based on Akka Streams. With a "streaming-first" architecture, it achieves constant memory usage even for huge requests and responses, providing high concurrency and performance through backpressure functionality. Designed not as a web framework but as a more general toolkit for providing and consuming HTTP-based services, it is optimized for building enterprise-level reactive applications.
Details
Akka HTTP 2025 edition has established itself as the definitive high-performance HTTP library in the Scala/Java ecosystem. Based on the powerful streaming capabilities of Akka Streams, it realizes fully asynchronous and non-blocking HTTP communication, comprehensively providing features necessary for building large-scale distributed systems. Current version 10.7.1 is optimized for modern microservice architectures with full Scala 3 support, Java 17+ compatibility, and enhanced JSON integration (spray-json, circe, etc.). The actor model-based design achieves high fault tolerance and scalability.
Key Features
- Streaming-First Design: Complete streaming processing of request/response bodies
- 3-Level Client API: Flexible control at Request/Host/Connection levels
- Fully Asynchronous Non-Blocking: High-performance processing based on Akka Streams
- Backpressure Support: Automatic load control and memory efficiency optimization
- Comprehensive JSON Integration: Rich choices including spray-json, circe, play-json
- Scala 3/Java 17 Support: Complete integration with latest language features
Pros and Cons
Pros
- Exceptional performance and concurrency processing capabilities through Akka Stream foundation
- Constant memory usage for efficient processing of huge data through streaming design
- Flexible implementation choices suited to different use cases through 3 API levels
- High fault tolerance and distributed system support through actor model
- Rich language features and ecosystem utilization with Scala 3/Java dual support
- Enterprise-level track record and stable maintenance system
Cons
- Relatively high learning cost for Akka/Scala ecosystem
- Commercial usage considerations due to BSL license transition since 2022
- Excessive functionality when lightweight HTTP client is needed
- Understanding of actor model and streaming concepts required
- Complexity of configuration and tuning (especially in large-scale environments)
- Relatively large library size due to many dependencies
Reference Pages
Code Examples
Installation and Basic Setup
// Dependency addition in build.sbt
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-http" % "10.7.1",
"com.typesafe.akka" %% "akka-actor-typed" % "2.10.5",
"com.typesafe.akka" %% "akka-stream" % "2.10.5",
"com.typesafe.akka" %% "akka-http-spray-json" % "10.7.1"
)
// Basic imports (Scala)
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.Materializer
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
// ActorSystem initialization
implicit val system: ActorSystem[Nothing] = ActorSystem(Behaviors.empty, "SingleRequest")
implicit val materializer: Materializer = Materializer(system)
implicit val executionContext: ExecutionContext = system.executionContext
// Maven dependencies (pom.xml)
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http_2.13</artifactId>
<version>10.7.1</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor-typed_2.13</artifactId>
<version>2.10.5</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream_2.13</artifactId>
<version>2.10.5</version>
</dependency>
// Basic imports (Java)
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.http.javadsl.Http;
import akka.http.javadsl.model.*;
import akka.http.javadsl.unmarshalling.Unmarshaller;
import akka.stream.Materializer;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
// ActorSystem initialization
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "SingleRequest");
Basic HTTP Requests (GET/POST/PUT/DELETE)
import akka.http.scaladsl.client.RequestBuilding._
// Basic GET request
val responseFuture: Future[HttpResponse] =
Http().singleRequest(HttpRequest(uri = "https://api.example.com/users"))
// Or using RequestBuilding import
val getResponse: Future[HttpResponse] =
Http().singleRequest(Get("https://api.example.com/users"))
// Response handling
responseFuture.onComplete {
case Success(response) =>
println(s"Status: ${response.status}")
println(s"Headers: ${response.headers}")
// Get response body as string
val entityFuture: Future[String] =
Unmarshal(response.entity).to[String]
entityFuture.foreach(body => println(s"Body: $body"))
case Failure(exception) =>
println(s"Request failed: ${exception.getMessage}")
}
// POST request (sending JSON)
import spray.json._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
case class User(name: String, email: String, age: Int)
object JsonFormats extends DefaultJsonProtocol {
implicit val userFormat = jsonFormat3(User)
}
import JsonFormats._
val userData = User("John Doe", "[email protected]", 30)
val postResponse: Future[HttpResponse] = Http().singleRequest(
HttpRequest(
method = HttpMethods.POST,
uri = "https://api.example.com/users",
entity = HttpEntity(
ContentTypes.`application/json`,
userData.toJson.compactPrint
),
headers = List(
headers.Authorization(headers.OAuth2BearerToken("your-token"))
)
)
)
// PUT request (data update)
val updatedData = User("Jane Doe", "[email protected]", 31)
val putResponse: Future[HttpResponse] = Http().singleRequest(
Put("https://api.example.com/users/123")
.withEntity(HttpEntity(ContentTypes.`application/json`, updatedData.toJson.compactPrint))
.withHeaders(headers.Authorization(headers.OAuth2BearerToken("your-token")))
)
// DELETE request
val deleteResponse: Future[HttpResponse] = Http().singleRequest(
Delete("https://api.example.com/users/123")
.withHeaders(headers.Authorization(headers.OAuth2BearerToken("your-token")))
)
// Request with query parameters
import akka.http.scaladsl.model.Uri.Query
val queryParams = Query("page" -> "1", "limit" -> "10", "sort" -> "created_at")
val uriWithParams = Uri("https://api.example.com/users").withQuery(queryParams)
val getWithParams: Future[HttpResponse] = Http().singleRequest(Get(uriWithParams))
// Basic GET request (Java)
CompletionStage<HttpResponse> responseFuture =
Http.get(system).singleRequest(HttpRequest.create("https://api.example.com/users"));
// Response handling
responseFuture.thenAccept(response -> {
System.out.println("Status: " + response.status());
System.out.println("Headers: " + response.getHeaders());
// Get response body as string
CompletionStage<String> entityFuture =
Unmarshaller.entityToString().unmarshal(response.entity(), system);
entityFuture.thenAccept(body -> System.out.println("Body: " + body));
});
// POST request (sending JSON)
String jsonData = "{\"name\":\"John Doe\",\"email\":\"[email protected]\",\"age\":30}";
HttpRequest postRequest = HttpRequest.POST("https://api.example.com/users")
.withEntity(ContentTypes.APPLICATION_JSON, jsonData)
.addHeader(Authorization.oauth2("your-token"));
CompletionStage<HttpResponse> postResponse = Http.get(system).singleRequest(postRequest);
// PUT request (data update)
String updatedJsonData = "{\"name\":\"Jane Doe\",\"email\":\"[email protected]\",\"age\":31}";
HttpRequest putRequest = HttpRequest.PUT("https://api.example.com/users/123")
.withEntity(ContentTypes.APPLICATION_JSON, updatedJsonData)
.addHeader(Authorization.oauth2("your-token"));
CompletionStage<HttpResponse> putResponse = Http.get(system).singleRequest(putRequest);
// DELETE request
HttpRequest deleteRequest = HttpRequest.DELETE("https://api.example.com/users/123")
.addHeader(Authorization.oauth2("your-token"));
CompletionStage<HttpResponse> deleteResponse = Http.get(system).singleRequest(deleteRequest);
Advanced Configuration and Customization (Headers, Authentication, Timeout, etc.)
import akka.http.scaladsl.settings.ConnectionPoolSettings
import scala.concurrent.duration._
// Custom header configuration
val customHeaders = List(
headers.`User-Agent`("MyApp/1.0 (Akka HTTP)"),
headers.Accept(MediaRanges.`application/json`),
headers.`Accept-Language`(Language("en-US"), Language("ja-JP")),
headers.RawHeader("X-API-Version", "v2"),
headers.RawHeader("X-Request-ID", "req-12345")
)
val requestWithHeaders = Get("https://api.example.com/data").withHeaders(customHeaders)
// Basic authentication
import akka.http.scaladsl.model.headers.BasicHttpCredentials
val basicAuth = headers.Authorization(BasicHttpCredentials("username", "password"))
val authenticatedRequest = Get("https://api.example.com/private").withHeaders(basicAuth)
// Bearer Token authentication
val bearerAuth = headers.Authorization(headers.OAuth2BearerToken("your-jwt-token"))
val tokenRequest = Get("https://api.example.com/protected").withHeaders(bearerAuth)
// Custom authentication header
val apiKeyAuth = headers.RawHeader("X-API-Key", "your-api-key")
val apiKeyRequest = Get("https://api.example.com/data").withHeaders(apiKeyAuth)
// Connection Pool settings with timeout configuration
val poolSettings = ConnectionPoolSettings(system)
.withConnectionSettings(
ClientConnectionSettings(system)
.withConnectingTimeout(5.seconds)
.withIdleTimeout(10.seconds)
)
.withMaxConnections(32)
.withMaxRetries(3)
.withMaxOpenRequests(256)
// Advanced configuration with Host-Level API
import akka.stream.scaladsl.{Flow, Sink, Source}
val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] =
Http().outgoingConnection("api.example.com", 443, useHttps = true)
val hostConnectionPool: Flow[(HttpRequest, Int), (util.Try[HttpResponse], Int), Http.HostConnectionPool] =
Http().cachedHostConnectionPool[Int]("api.example.com", 443, useHttps = true, settings = poolSettings)
// SSL/TLS configuration
import akka.http.scaladsl.{ClientTransport, ConnectionContext, HttpsConnectionContext}
import javax.net.ssl.SSLContext
val httpsContext: HttpsConnectionContext = ConnectionContext.httpsClient((host, port) => {
val sslContext = SSLContext.getDefault
val engine = sslContext.createSSLEngine(host, port)
engine.setUseClientMode(true)
engine
})
val secureRequest = Http().singleRequest(
Get("https://secure-api.example.com/data"),
connectionContext = httpsContext
)
// Proxy configuration
val proxyTransport = ClientTransport.httpsProxy(
java.net.InetSocketAddress.createUnresolved("proxy.example.com", 8080)
)
val proxySettings = ConnectionPoolSettings(system).withTransport(proxyTransport)
// Cookie management
import akka.http.scaladsl.model.headers.Cookie
val cookieHeader = Cookie("session_id" -> "abc123", "user_pref" -> "dark_mode")
val requestWithCookies = Get("https://api.example.com/user-data").withHeaders(cookieHeader)
Error Handling and Retry Functionality
import akka.pattern.retry
import akka.http.scaladsl.model.StatusCodes
import scala.util.Random
// Comprehensive error handling
def safeRequest(request: HttpRequest): Future[HttpResponse] = {
Http().singleRequest(request).flatMap { response =>
response.status match {
case StatusCodes.OK | StatusCodes.Created | StatusCodes.Accepted =>
Future.successful(response)
case StatusCodes.Unauthorized =>
Future.failed(new SecurityException("Authentication error: Please check your token"))
case StatusCodes.Forbidden =>
Future.failed(new SecurityException("Permission error: Access denied"))
case StatusCodes.NotFound =>
Future.failed(new IllegalArgumentException("Not found: Resource does not exist"))
case StatusCodes.TooManyRequests =>
Future.failed(new RuntimeException("Rate limit: Please wait before retrying"))
case status if status.isServerError =>
Future.failed(new RuntimeException(s"Server error: ${status.value}"))
case status =>
Future.failed(new RuntimeException(s"Unexpected status: ${status.value}"))
}
}.recoverWith {
case ex: akka.stream.StreamTcpException =>
Future.failed(new RuntimeException(s"Connection error: ${ex.getMessage}"))
case ex: java.util.concurrent.TimeoutException =>
Future.failed(new RuntimeException(s"Timeout error: ${ex.getMessage}"))
case ex =>
Future.failed(new RuntimeException(s"Unexpected error: ${ex.getMessage}"))
}
}
// Usage example
val request = Get("https://api.example.com/data")
safeRequest(request).onComplete {
case Success(response) =>
Unmarshal(response.entity).to[String].foreach(println)
case Failure(exception) =>
println(s"Request failed: ${exception.getMessage}")
}
// Request with retry functionality
def requestWithRetry(
request: HttpRequest,
maxRetries: Int = 3,
backoffFactor: Int = 1
): Future[HttpResponse] = {
def attemptRequest(attempt: Int): Future[HttpResponse] = {
Http().singleRequest(request).flatMap { response =>
if (response.status.isSuccess) {
Future.successful(response)
} else if (attempt < maxRetries && shouldRetry(response.status)) {
val waitTime = backoffFactor * Math.pow(2, attempt).toInt
println(s"Attempt ${attempt + 1} failed. Retrying in ${waitTime} seconds...")
akka.pattern.after(waitTime.seconds, system.scheduler) {
attemptRequest(attempt + 1)
}
} else {
Future.failed(new RuntimeException(s"Maximum attempts reached: ${response.status}"))
}
}.recoverWith {
case ex if attempt < maxRetries =>
val waitTime = backoffFactor * Math.pow(2, attempt).toInt
println(s"Error in attempt ${attempt + 1}. Retrying in ${waitTime} seconds...")
akka.pattern.after(waitTime.seconds, system.scheduler) {
attemptRequest(attempt + 1)
}
case ex =>
Future.failed(ex)
}
}
attemptRequest(0)
}
def shouldRetry(status: StatusCode): Boolean = {
status == StatusCodes.TooManyRequests ||
status == StatusCodes.InternalServerError ||
status == StatusCodes.BadGateway ||
status == StatusCodes.ServiceUnavailable ||
status == StatusCodes.GatewayTimeout
}
// Usage example
val unstableRequest = Get("https://api.example.com/unstable")
requestWithRetry(unstableRequest, maxRetries = 3, backoffFactor = 1).onComplete {
case Success(response) =>
println(s"Request successful: ${response.status}")
case Failure(exception) =>
println(s"Finally failed: ${exception.getMessage}")
}
// Circuit breaker pattern
import akka.pattern.CircuitBreaker
import scala.concurrent.duration._
val circuitBreaker = new CircuitBreaker(
scheduler = system.scheduler,
maxFailures = 5,
callTimeout = 10.seconds,
resetTimeout = 1.minute
)
def protectedRequest(request: HttpRequest): Future[HttpResponse] = {
circuitBreaker.withCircuitBreaker(Http().singleRequest(request))
}
// Bulkhead pattern (dedicated thread pool)
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
val httpClientExecutionContext = ExecutionContext.fromExecutor(
Executors.newFixedThreadPool(10, (r: Runnable) => {
val t = new Thread(r, "http-client-thread")
t.setDaemon(true)
t
})
)
def isolatedRequest(request: HttpRequest): Future[HttpResponse] = {
Future(Http().singleRequest(request))(httpClientExecutionContext).flatten
}
Concurrent Processing and Streaming
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.NotUsed
// Parallel fetching of multiple URLs
val urls = List(
"https://api.example.com/users",
"https://api.example.com/posts",
"https://api.example.com/comments",
"https://api.example.com/categories"
)
// Parallel processing with Source.future
val parallelRequests: Source[HttpResponse, NotUsed] = Source(urls)
.mapAsync(parallelism = 4) { url =>
Http().singleRequest(Get(url))
}
// Response processing
val processedResults: Future[Seq[String]] = parallelRequests
.mapAsync(1) { response =>
if (response.status.isSuccess) {
Unmarshal(response.entity).to[String]
} else {
Future.successful(s"Error: ${response.status}")
}
}
.runWith(Sink.seq)
processedResults.foreach { results =>
results.zipWithIndex.foreach { case (result, index) =>
println(s"URL ${index + 1}: ${result.take(100)}...")
}
}
// Efficient parallel processing with Host-Level API
val poolClientFlow = Http().cachedHostConnectionPool[Int]("api.example.com")
val requests: Source[(HttpRequest, Int), NotUsed] = Source(urls.zipWithIndex)
.map { case (url, index) =>
(HttpRequest(uri = Uri(url).path), index)
}
val responses: Source[(util.Try[HttpResponse], Int), Http.HostConnectionPool] =
requests.via(poolClientFlow)
responses.runForeach {
case (util.Success(response), index) =>
println(s"Request $index: ${response.status}")
response.discardEntityBytes() // Discard for memory efficiency
case (util.Failure(exception), index) =>
println(s"Request $index failed: ${exception.getMessage}")
}
// Streaming upload
import akka.util.ByteString
def streamingUpload(filePath: String, uploadUrl: String): Future[HttpResponse] = {
val fileSource: Source[ByteString, Future[IOResult]] =
FileIO.fromPath(Paths.get(filePath))
val request = Post(uploadUrl)
.withEntity(HttpEntity(ContentTypes.`application/octet-stream`, fileSource))
.withHeaders(headers.Authorization(headers.OAuth2BearerToken("your-token")))
Http().singleRequest(request)
}
// Streaming download
def streamingDownload(downloadUrl: String, targetPath: String): Future[IOResult] = {
val request = Get(downloadUrl)
.withHeaders(headers.Authorization(headers.OAuth2BearerToken("your-token")))
Http().singleRequest(request).flatMap { response =>
if (response.status.isSuccess) {
val fileSink: Sink[ByteString, Future[IOResult]] =
FileIO.toPath(Paths.get(targetPath))
response.entity.dataBytes.runWith(fileSink)
} else {
response.discardEntityBytes()
Future.failed(new RuntimeException(s"Download failed: ${response.status}"))
}
}
}
// Usage example
streamingDownload("https://api.example.com/files/large-dataset.zip", "/tmp/dataset.zip")
.onComplete {
case Success(ioResult) =>
println(s"Download completed: ${ioResult.count} bytes")
case Failure(exception) =>
println(s"Download failed: ${exception.getMessage}")
}
// Pagination-aware automatic fetching
def fetchAllPages(baseUrl: String, authToken: String): Source[JsValue, NotUsed] = {
def fetchPage(page: Int): Future[Option[(Seq[JsValue], Int)]] = {
val url = s"$baseUrl?page=$page&per_page=100"
val request = Get(url).withHeaders(headers.Authorization(headers.OAuth2BearerToken(authToken)))
Http().singleRequest(request).flatMap { response =>
if (response.status.isSuccess) {
Unmarshal(response.entity).to[JsValue].map { json =>
val items = json.asJsObject.fields.get("items").map(_.convertTo[Seq[JsValue]]).getOrElse(Seq.empty)
val hasMore = json.asJsObject.fields.get("has_more").exists(_.convertTo[Boolean])
Some((items, if (hasMore) page + 1 else -1))
}
} else {
response.discardEntityBytes()
Future.successful(None)
}
}
}
Source.unfoldAsync(1) { page =>
if (page == -1) {
Future.successful(None)
} else {
fetchPage(page).map {
case Some((items, nextPage)) => Some((nextPage, items))
case None => None
}
}
}.flatMapConcat(Source(_))
}
// Usage example
fetchAllPages("https://api.example.com/posts", "your-token")
.take(1000) // Maximum 1000 items
.runForeach { item =>
println(s"Fetched item: ${item.compactPrint}")
}
Framework Integration and Practical Examples
// Play Framework integration
import play.api.libs.json._
case class ApiResponse[T](data: T, status: String, message: String)
case class User(id: Long, name: String, email: String)
object JsonImplicits {
implicit val userFormat: Format[User] = Json.format[User]
implicit def apiResponseFormat[T: Format]: Format[ApiResponse[T]] = Json.format[ApiResponse[T]]
}
class UserApiClient(baseUrl: String, authToken: String)(implicit system: ActorSystem[_]) {
private implicit val ec: ExecutionContext = system.executionContext
private def request(path: String, method: HttpMethod = HttpMethods.GET, entity: RequestEntity = HttpEntity.Empty): Future[HttpResponse] = {
val uri = s"$baseUrl$path"
val req = HttpRequest(method, uri, entity = entity)
.withHeaders(headers.Authorization(headers.OAuth2BearerToken(authToken)))
Http().singleRequest(req)
}
def getUser(id: Long): Future[Option[User]] = {
request(s"/users/$id").flatMap { response =>
if (response.status == StatusCodes.OK) {
Unmarshal(response.entity).to[String].map { body =>
Json.parse(body).validate[ApiResponse[User]] match {
case JsSuccess(apiResponse, _) => Some(apiResponse.data)
case JsError(_) => None
}
}
} else if (response.status == StatusCodes.NotFound) {
response.discardEntityBytes()
Future.successful(None)
} else {
response.discardEntityBytes()
Future.failed(new RuntimeException(s"API Error: ${response.status}"))
}
}
}
def createUser(user: User): Future[User] = {
val json = Json.toJson(user).toString()
val entity = HttpEntity(ContentTypes.`application/json`, json)
request("/users", HttpMethods.POST, entity).flatMap { response =>
if (response.status == StatusCodes.Created) {
Unmarshal(response.entity).to[String].map { body =>
Json.parse(body).validate[ApiResponse[User]] match {
case JsSuccess(apiResponse, _) => apiResponse.data
case JsError(errors) => throw new RuntimeException(s"JSON Parse Error: $errors")
}
}
} else {
response.discardEntityBytes()
Future.failed(new RuntimeException(s"User creation failed: ${response.status}"))
}
}
}
def updateUser(id: Long, user: User): Future[User] = {
val json = Json.toJson(user.copy(id = id)).toString()
val entity = HttpEntity(ContentTypes.`application/json`, json)
request(s"/users/$id", HttpMethods.PUT, entity).flatMap { response =>
if (response.status == StatusCodes.OK) {
Unmarshal(response.entity).to[String].map { body =>
Json.parse(body).validate[ApiResponse[User]] match {
case JsSuccess(apiResponse, _) => apiResponse.data
case JsError(errors) => throw new RuntimeException(s"JSON Parse Error: $errors")
}
}
} else {
response.discardEntityBytes()
Future.failed(new RuntimeException(s"User update failed: ${response.status}"))
}
}
}
def deleteUser(id: Long): Future[Boolean] = {
request(s"/users/$id", HttpMethods.DELETE).map { response =>
val success = response.status == StatusCodes.NoContent
response.discardEntityBytes()
success
}
}
}
// Usage example
val apiClient = new UserApiClient("https://api.example.com/v1", "your-jwt-token")
for {
// Create user
newUser <- apiClient.createUser(User(0, "John Doe", "[email protected]"))
println(s"Created user: $newUser")
// Get user
retrievedUser <- apiClient.getUser(newUser.id)
println(s"Retrieved user: $retrievedUser")
// Update user
updatedUser <- apiClient.updateUser(newUser.id, newUser.copy(name = "Jane Doe"))
println(s"Updated user: $updatedUser")
// Delete user
deleted <- apiClient.deleteUser(newUser.id)
println(s"Deletion successful: $deleted")
} yield ()
// Actor integration example
import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.Behaviors
object HttpClientActor {
sealed trait Command
case class FetchUser(id: Long, replyTo: ActorRef[UserResponse]) extends Command
case class CreateUser(user: User, replyTo: ActorRef[UserResponse]) extends Command
sealed trait UserResponse
case class UserFound(user: User) extends UserResponse
case class UserCreated(user: User) extends UserResponse
case class UserNotFound(id: Long) extends UserResponse
case class ApiError(message: String) extends UserResponse
def apply(apiClient: UserApiClient): Behavior[Command] = Behaviors.setup { context =>
implicit val ec: ExecutionContext = context.executionContext
Behaviors.receiveMessage {
case FetchUser(id, replyTo) =>
context.pipeToSelf(apiClient.getUser(id)) {
case Success(Some(user)) => UserFound(user)
case Success(None) => UserNotFound(id)
case Failure(ex) => ApiError(ex.getMessage)
}
Behaviors.receiveMessage {
case response: UserResponse =>
replyTo ! response
Behaviors.same
case _ => Behaviors.unhandled
}
case CreateUser(user, replyTo) =>
context.pipeToSelf(apiClient.createUser(user)) {
case Success(createdUser) => UserCreated(createdUser)
case Failure(ex) => ApiError(ex.getMessage)
}
Behaviors.receiveMessage {
case response: UserResponse =>
replyTo ! response
Behaviors.same
case _ => Behaviors.unhandled
}
}
}
}