Akka HTTP
Scala/Java向けのHTTPサーバー・クライアントライブラリ。Akkaアクターモデル基盤の高性能非同期HTTP通信、ストリーミング対応、DSLベースの直感的API設計。マイクロサービス、高負荷システム、リアルタイム通信に最適化された成熟したソリューション。
GitHub概要
akka/akka-http
The Streaming-first HTTP server/module of Akka
トピックス
スター履歴
ライブラリ
Akka HTTP
概要
Akka HTTPは「Scala/Java向けの高性能なHTTPクライアント・サーバーライブラリ」として開発された、Akka Streamベースの非同期ストリーミングHTTPツールキットです。「ストリーミング・ファースト」のアーキテクチャにより、巨大なリクエストやレスポンスでも一定メモリ使用量を実現し、バックプレッシャー機能で高い並行性とパフォーマンスを提供。Web フレームワークではなく、HTTP ベースのサービスを提供・消費するためのより汎用的なツールキットとして設計され、エンタープライズレベルのリアクティブアプリケーション構築に最適化されています。
詳細
Akka HTTP 2025年版はScala/Javaエコシステムにおける高性能HTTPライブラリの決定版として地位を確立しています。Akka Streamの強力なストリーミング機能をベースに、完全非同期・ノンブロッキングなHTTP通信を実現し、大規模分散システムの構築に必要な機能を包括的に提供。現在のバージョン10.7.1では、Scala 3完全対応、Java 17以降のサポート、JSON統合(spray-json、circe等)の強化により、現代的なマイクロサービスアーキテクチャに最適化されています。アクターモデルベースの設計により、高い障害耐性と拡張性を実現します。
主な特徴
- ストリーミング・ファースト設計: リクエスト・レスポンス本体の完全ストリーミング処理
- 3レベルのクライアントAPI: Request/Host/Connection レベルでの柔軟な制御
- 完全非同期・ノンブロッキング: Akka Streamベースの高性能処理
- バックプレッシャー対応: 自動的な負荷制御とメモリ効率の最適化
- 包括的JSON統合: spray-json、circe、play-json等の豊富な選択肢
- Scala 3/Java 17対応: 最新言語機能との完全統合
メリット・デメリット
メリット
- Akka Streamベースによる卓越したパフォーマンスと並行性処理能力
- ストリーミング設計により一定メモリ使用量で巨大データを効率処理
- 3つのAPIレベルによる用途に応じた柔軟な実装選択肢
- アクターモデルによる高い障害耐性と分散システム対応
- Scala 3/Java両対応で豊富な言語機能とエコシステム活用
- 企業レベルの実績と安定したメンテナンス体制
デメリット
- Akka/Scala エコシステムの学習コストが比較的高い
- 2022年からのBSLライセンス移行による商用利用時の考慮事項
- 軽量なHTTPクライアントが必要な場合には過剰な機能性
- アクターモデルやストリーミング概念の理解が必要
- 設定とチューニングの複雑性(特に大規模環境)
- 依存関係が多くライブラリサイズが比較的大きい
参考ページ
書き方の例
インストールと基本セットアップ
// build.sbt での依存関係追加
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-http" % "10.7.1",
"com.typesafe.akka" %% "akka-actor-typed" % "2.10.5",
"com.typesafe.akka" %% "akka-stream" % "2.10.5",
"com.typesafe.akka" %% "akka-http-spray-json" % "10.7.1"
)
// 基本インポート(Scala)
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.Materializer
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
// ActorSystemの初期化
implicit val system: ActorSystem[Nothing] = ActorSystem(Behaviors.empty, "SingleRequest")
implicit val materializer: Materializer = Materializer(system)
implicit val executionContext: ExecutionContext = system.executionContext
// Maven依存関係(pom.xml)
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http_2.13</artifactId>
<version>10.7.1</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor-typed_2.13</artifactId>
<version>2.10.5</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream_2.13</artifactId>
<version>2.10.5</version>
</dependency>
// 基本インポート(Java)
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.http.javadsl.Http;
import akka.http.javadsl.model.*;
import akka.http.javadsl.unmarshalling.Unmarshaller;
import akka.stream.Materializer;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
// ActorSystemの初期化
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "SingleRequest");
基本的なHTTPリクエスト(GET/POST/PUT/DELETE)
import akka.http.scaladsl.client.RequestBuilding._
// 基本的なGETリクエスト
val responseFuture: Future[HttpResponse] =
Http().singleRequest(HttpRequest(uri = "https://api.example.com/users"))
// またはRequestBuilding import使用
val getResponse: Future[HttpResponse] =
Http().singleRequest(Get("https://api.example.com/users"))
// レスポンス処理
responseFuture.onComplete {
case Success(response) =>
println(s"ステータス: ${response.status}")
println(s"ヘッダー: ${response.headers}")
// レスポンス本体を文字列として取得
val entityFuture: Future[String] =
Unmarshal(response.entity).to[String]
entityFuture.foreach(body => println(s"本体: $body"))
case Failure(exception) =>
println(s"リクエスト失敗: ${exception.getMessage}")
}
// POSTリクエスト(JSON送信)
import spray.json._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
case class User(name: String, email: String, age: Int)
object JsonFormats extends DefaultJsonProtocol {
implicit val userFormat = jsonFormat3(User)
}
import JsonFormats._
val userData = User("田中太郎", "[email protected]", 30)
val postResponse: Future[HttpResponse] = Http().singleRequest(
HttpRequest(
method = HttpMethods.POST,
uri = "https://api.example.com/users",
entity = HttpEntity(
ContentTypes.`application/json`,
userData.toJson.compactPrint
),
headers = List(
headers.Authorization(headers.OAuth2BearerToken("your-token"))
)
)
)
// PUTリクエスト(データ更新)
val updatedData = User("田中次郎", "[email protected]", 31)
val putResponse: Future[HttpResponse] = Http().singleRequest(
Put("https://api.example.com/users/123")
.withEntity(HttpEntity(ContentTypes.`application/json`, updatedData.toJson.compactPrint))
.withHeaders(headers.Authorization(headers.OAuth2BearerToken("your-token")))
)
// DELETEリクエスト
val deleteResponse: Future[HttpResponse] = Http().singleRequest(
Delete("https://api.example.com/users/123")
.withHeaders(headers.Authorization(headers.OAuth2BearerToken("your-token")))
)
// クエリパラメータ付きリクエスト
import akka.http.scaladsl.model.Uri.Query
val queryParams = Query("page" -> "1", "limit" -> "10", "sort" -> "created_at")
val uriWithParams = Uri("https://api.example.com/users").withQuery(queryParams)
val getWithParams: Future[HttpResponse] = Http().singleRequest(Get(uriWithParams))
// 基本的なGETリクエスト(Java)
CompletionStage<HttpResponse> responseFuture =
Http.get(system).singleRequest(HttpRequest.create("https://api.example.com/users"));
// レスポンス処理
responseFuture.thenAccept(response -> {
System.out.println("ステータス: " + response.status());
System.out.println("ヘッダー: " + response.getHeaders());
// レスポンス本体を文字列として取得
CompletionStage<String> entityFuture =
Unmarshaller.entityToString().unmarshal(response.entity(), system);
entityFuture.thenAccept(body -> System.out.println("本体: " + body));
});
// POSTリクエスト(JSON送信)
String jsonData = "{\"name\":\"田中太郎\",\"email\":\"[email protected]\",\"age\":30}";
HttpRequest postRequest = HttpRequest.POST("https://api.example.com/users")
.withEntity(ContentTypes.APPLICATION_JSON, jsonData)
.addHeader(Authorization.oauth2("your-token"));
CompletionStage<HttpResponse> postResponse = Http.get(system).singleRequest(postRequest);
// PUTリクエスト(データ更新)
String updatedJsonData = "{\"name\":\"田中次郎\",\"email\":\"[email protected]\",\"age\":31}";
HttpRequest putRequest = HttpRequest.PUT("https://api.example.com/users/123")
.withEntity(ContentTypes.APPLICATION_JSON, updatedJsonData)
.addHeader(Authorization.oauth2("your-token"));
CompletionStage<HttpResponse> putResponse = Http.get(system).singleRequest(putRequest);
// DELETEリクエスト
HttpRequest deleteRequest = HttpRequest.DELETE("https://api.example.com/users/123")
.addHeader(Authorization.oauth2("your-token"));
CompletionStage<HttpResponse> deleteResponse = Http.get(system).singleRequest(deleteRequest);
高度な設定とカスタマイズ(ヘッダー、認証、タイムアウト等)
import akka.http.scaladsl.settings.ConnectionPoolSettings
import scala.concurrent.duration._
// カスタムヘッダー設定
val customHeaders = List(
headers.`User-Agent`("MyApp/1.0 (Akka HTTP)"),
headers.Accept(MediaRanges.`application/json`),
headers.`Accept-Language`(Language("ja-JP"), Language("en-US")),
headers.RawHeader("X-API-Version", "v2"),
headers.RawHeader("X-Request-ID", "req-12345")
)
val requestWithHeaders = Get("https://api.example.com/data").withHeaders(customHeaders)
// Basic認証
import akka.http.scaladsl.model.headers.BasicHttpCredentials
val basicAuth = headers.Authorization(BasicHttpCredentials("username", "password"))
val authenticatedRequest = Get("https://api.example.com/private").withHeaders(basicAuth)
// Bearer Token認証
val bearerAuth = headers.Authorization(headers.OAuth2BearerToken("your-jwt-token"))
val tokenRequest = Get("https://api.example.com/protected").withHeaders(bearerAuth)
// カスタム認証ヘッダー
val apiKeyAuth = headers.RawHeader("X-API-Key", "your-api-key")
val apiKeyRequest = Get("https://api.example.com/data").withHeaders(apiKeyAuth)
// タイムアウト設定付きのConnection Pool設定
val poolSettings = ConnectionPoolSettings(system)
.withConnectionSettings(
ClientConnectionSettings(system)
.withConnectingTimeout(5.seconds)
.withIdleTimeout(10.seconds)
)
.withMaxConnections(32)
.withMaxRetries(3)
.withMaxOpenRequests(256)
// Host-Level APIでの高度な設定
import akka.stream.scaladsl.{Flow, Sink, Source}
val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] =
Http().outgoingConnection("api.example.com", 443, useHttps = true)
val hostConnectionPool: Flow[(HttpRequest, Int), (util.Try[HttpResponse], Int), Http.HostConnectionPool] =
Http().cachedHostConnectionPool[Int]("api.example.com", 443, useHttps = true, settings = poolSettings)
// SSL/TLS設定
import akka.http.scaladsl.{ClientTransport, ConnectionContext, HttpsConnectionContext}
import javax.net.ssl.SSLContext
val httpsContext: HttpsConnectionContext = ConnectionContext.httpsClient((host, port) => {
val sslContext = SSLContext.getDefault
val engine = sslContext.createSSLEngine(host, port)
engine.setUseClientMode(true)
engine
})
val secureRequest = Http().singleRequest(
Get("https://secure-api.example.com/data"),
connectionContext = httpsContext
)
// プロキシ設定
val proxyTransport = ClientTransport.httpsProxy(
java.net.InetSocketAddress.createUnresolved("proxy.example.com", 8080)
)
val proxySettings = ConnectionPoolSettings(system).withTransport(proxyTransport)
// Cookie管理
import akka.http.scaladsl.model.headers.Cookie
val cookieHeader = Cookie("session_id" -> "abc123", "user_pref" -> "dark_mode")
val requestWithCookies = Get("https://api.example.com/user-data").withHeaders(cookieHeader)
エラーハンドリングとリトライ機能
import akka.pattern.retry
import akka.http.scaladsl.model.StatusCodes
import scala.util.Random
// 包括的なエラーハンドリング
def safeRequest(request: HttpRequest): Future[HttpResponse] = {
Http().singleRequest(request).flatMap { response =>
response.status match {
case StatusCodes.OK | StatusCodes.Created | StatusCodes.Accepted =>
Future.successful(response)
case StatusCodes.Unauthorized =>
Future.failed(new SecurityException("認証エラー: トークンを確認してください"))
case StatusCodes.Forbidden =>
Future.failed(new SecurityException("権限エラー: アクセス権限がありません"))
case StatusCodes.NotFound =>
Future.failed(new IllegalArgumentException("見つかりません: リソースが存在しません"))
case StatusCodes.TooManyRequests =>
Future.failed(new RuntimeException("レート制限: しばらく待ってから再試行してください"))
case status if status.isServerError =>
Future.failed(new RuntimeException(s"サーバーエラー: ${status.value}"))
case status =>
Future.failed(new RuntimeException(s"予期しないステータス: ${status.value}"))
}
}.recoverWith {
case ex: akka.stream.StreamTcpException =>
Future.failed(new RuntimeException(s"接続エラー: ${ex.getMessage}"))
case ex: java.util.concurrent.TimeoutException =>
Future.failed(new RuntimeException(s"タイムアウトエラー: ${ex.getMessage}"))
case ex =>
Future.failed(new RuntimeException(s"予期しないエラー: ${ex.getMessage}"))
}
}
// 使用例
val request = Get("https://api.example.com/data")
safeRequest(request).onComplete {
case Success(response) =>
Unmarshal(response.entity).to[String].foreach(println)
case Failure(exception) =>
println(s"リクエスト失敗: ${exception.getMessage}")
}
// リトライ機能付きリクエスト
def requestWithRetry(
request: HttpRequest,
maxRetries: Int = 3,
backoffFactor: Int = 1
): Future[HttpResponse] = {
def attemptRequest(attempt: Int): Future[HttpResponse] = {
Http().singleRequest(request).flatMap { response =>
if (response.status.isSuccess) {
Future.successful(response)
} else if (attempt < maxRetries && shouldRetry(response.status)) {
val waitTime = backoffFactor * Math.pow(2, attempt).toInt
println(s"試行 ${attempt + 1} 失敗. ${waitTime}秒後に再試行...")
akka.pattern.after(waitTime.seconds, system.scheduler) {
attemptRequest(attempt + 1)
}
} else {
Future.failed(new RuntimeException(s"最大試行回数に達しました: ${response.status}"))
}
}.recoverWith {
case ex if attempt < maxRetries =>
val waitTime = backoffFactor * Math.pow(2, attempt).toInt
println(s"エラーで試行 ${attempt + 1} 失敗. ${waitTime}秒後に再試行...")
akka.pattern.after(waitTime.seconds, system.scheduler) {
attemptRequest(attempt + 1)
}
case ex =>
Future.failed(ex)
}
}
attemptRequest(0)
}
def shouldRetry(status: StatusCode): Boolean = {
status == StatusCodes.TooManyRequests ||
status == StatusCodes.InternalServerError ||
status == StatusCodes.BadGateway ||
status == StatusCodes.ServiceUnavailable ||
status == StatusCodes.GatewayTimeout
}
// 使用例
val unstableRequest = Get("https://api.example.com/unstable")
requestWithRetry(unstableRequest, maxRetries = 3, backoffFactor = 1).onComplete {
case Success(response) =>
println(s"リクエスト成功: ${response.status}")
case Failure(exception) =>
println(s"最終的に失敗: ${exception.getMessage}")
}
// サーキットブレーカーパターン
import akka.pattern.CircuitBreaker
import scala.concurrent.duration._
val circuitBreaker = new CircuitBreaker(
scheduler = system.scheduler,
maxFailures = 5,
callTimeout = 10.seconds,
resetTimeout = 1.minute
)
def protectedRequest(request: HttpRequest): Future[HttpResponse] = {
circuitBreaker.withCircuitBreaker(Http().singleRequest(request))
}
// バルクヘッド パターン(専用スレッドプール)
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
val httpClientExecutionContext = ExecutionContext.fromExecutor(
Executors.newFixedThreadPool(10, (r: Runnable) => {
val t = new Thread(r, "http-client-thread")
t.setDaemon(true)
t
})
)
def isolatedRequest(request: HttpRequest): Future[HttpResponse] = {
Future(Http().singleRequest(request))(httpClientExecutionContext).flatten
}
並行処理とストリーミング処理
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.NotUsed
// 複数URLの並列取得
val urls = List(
"https://api.example.com/users",
"https://api.example.com/posts",
"https://api.example.com/comments",
"https://api.example.com/categories"
)
// Source.futureによる並列処理
val parallelRequests: Source[HttpResponse, NotUsed] = Source(urls)
.mapAsync(parallelism = 4) { url =>
Http().singleRequest(Get(url))
}
// レスポンス処理
val processedResults: Future[Seq[String]] = parallelRequests
.mapAsync(1) { response =>
if (response.status.isSuccess) {
Unmarshal(response.entity).to[String]
} else {
Future.successful(s"Error: ${response.status}")
}
}
.runWith(Sink.seq)
processedResults.foreach { results =>
results.zipWithIndex.foreach { case (result, index) =>
println(s"URL ${index + 1}: ${result.take(100)}...")
}
}
// Host-Level APIでの効率的な並列処理
val poolClientFlow = Http().cachedHostConnectionPool[Int]("api.example.com")
val requests: Source[(HttpRequest, Int), NotUsed] = Source(urls.zipWithIndex)
.map { case (url, index) =>
(HttpRequest(uri = Uri(url).path), index)
}
val responses: Source[(util.Try[HttpResponse], Int), Http.HostConnectionPool] =
requests.via(poolClientFlow)
responses.runForeach {
case (util.Success(response), index) =>
println(s"リクエスト $index: ${response.status}")
response.discardEntityBytes() // メモリ効率のため破棄
case (util.Failure(exception), index) =>
println(s"リクエスト $index 失敗: ${exception.getMessage}")
}
// ストリーミングアップロード
import akka.util.ByteString
def streamingUpload(filePath: String, uploadUrl: String): Future[HttpResponse] = {
val fileSource: Source[ByteString, Future[IOResult]] =
FileIO.fromPath(Paths.get(filePath))
val request = Post(uploadUrl)
.withEntity(HttpEntity(ContentTypes.`application/octet-stream`, fileSource))
.withHeaders(headers.Authorization(headers.OAuth2BearerToken("your-token")))
Http().singleRequest(request)
}
// ストリーミングダウンロード
def streamingDownload(downloadUrl: String, targetPath: String): Future[IOResult] = {
val request = Get(downloadUrl)
.withHeaders(headers.Authorization(headers.OAuth2BearerToken("your-token")))
Http().singleRequest(request).flatMap { response =>
if (response.status.isSuccess) {
val fileSink: Sink[ByteString, Future[IOResult]] =
FileIO.toPath(Paths.get(targetPath))
response.entity.dataBytes.runWith(fileSink)
} else {
response.discardEntityBytes()
Future.failed(new RuntimeException(s"ダウンロード失敗: ${response.status}"))
}
}
}
// 使用例
streamingDownload("https://api.example.com/files/large-dataset.zip", "/tmp/dataset.zip")
.onComplete {
case Success(ioResult) =>
println(s"ダウンロード完了: ${ioResult.count} bytes")
case Failure(exception) =>
println(s"ダウンロード失敗: ${exception.getMessage}")
}
// ページネーション対応の自動取得
def fetchAllPages(baseUrl: String, authToken: String): Source[JsValue, NotUsed] = {
def fetchPage(page: Int): Future[Option[(Seq[JsValue], Int)]] = {
val url = s"$baseUrl?page=$page&per_page=100"
val request = Get(url).withHeaders(headers.Authorization(headers.OAuth2BearerToken(authToken)))
Http().singleRequest(request).flatMap { response =>
if (response.status.isSuccess) {
Unmarshal(response.entity).to[JsValue].map { json =>
val items = json.asJsObject.fields.get("items").map(_.convertTo[Seq[JsValue]]).getOrElse(Seq.empty)
val hasMore = json.asJsObject.fields.get("has_more").exists(_.convertTo[Boolean])
Some((items, if (hasMore) page + 1 else -1))
}
} else {
response.discardEntityBytes()
Future.successful(None)
}
}
}
Source.unfoldAsync(1) { page =>
if (page == -1) {
Future.successful(None)
} else {
fetchPage(page).map {
case Some((items, nextPage)) => Some((nextPage, items))
case None => None
}
}
}.flatMapConcat(Source(_))
}
// 使用例
fetchAllPages("https://api.example.com/posts", "your-token")
.take(1000) // 最大1000件
.runForeach { item =>
println(s"取得アイテム: ${item.compactPrint}")
}
フレームワーク統合と実用例
// Play Frameworkとの統合
import play.api.libs.json._
case class ApiResponse[T](data: T, status: String, message: String)
case class User(id: Long, name: String, email: String)
object JsonImplicits {
implicit val userFormat: Format[User] = Json.format[User]
implicit def apiResponseFormat[T: Format]: Format[ApiResponse[T]] = Json.format[ApiResponse[T]]
}
class UserApiClient(baseUrl: String, authToken: String)(implicit system: ActorSystem[_]) {
private implicit val ec: ExecutionContext = system.executionContext
private def request(path: String, method: HttpMethod = HttpMethods.GET, entity: RequestEntity = HttpEntity.Empty): Future[HttpResponse] = {
val uri = s"$baseUrl$path"
val req = HttpRequest(method, uri, entity = entity)
.withHeaders(headers.Authorization(headers.OAuth2BearerToken(authToken)))
Http().singleRequest(req)
}
def getUser(id: Long): Future[Option[User]] = {
request(s"/users/$id").flatMap { response =>
if (response.status == StatusCodes.OK) {
Unmarshal(response.entity).to[String].map { body =>
Json.parse(body).validate[ApiResponse[User]] match {
case JsSuccess(apiResponse, _) => Some(apiResponse.data)
case JsError(_) => None
}
}
} else if (response.status == StatusCodes.NotFound) {
response.discardEntityBytes()
Future.successful(None)
} else {
response.discardEntityBytes()
Future.failed(new RuntimeException(s"API Error: ${response.status}"))
}
}
}
def createUser(user: User): Future[User] = {
val json = Json.toJson(user).toString()
val entity = HttpEntity(ContentTypes.`application/json`, json)
request("/users", HttpMethods.POST, entity).flatMap { response =>
if (response.status == StatusCodes.Created) {
Unmarshal(response.entity).to[String].map { body =>
Json.parse(body).validate[ApiResponse[User]] match {
case JsSuccess(apiResponse, _) => apiResponse.data
case JsError(errors) => throw new RuntimeException(s"JSON Parse Error: $errors")
}
}
} else {
response.discardEntityBytes()
Future.failed(new RuntimeException(s"User creation failed: ${response.status}"))
}
}
}
def updateUser(id: Long, user: User): Future[User] = {
val json = Json.toJson(user.copy(id = id)).toString()
val entity = HttpEntity(ContentTypes.`application/json`, json)
request(s"/users/$id", HttpMethods.PUT, entity).flatMap { response =>
if (response.status == StatusCodes.OK) {
Unmarshal(response.entity).to[String].map { body =>
Json.parse(body).validate[ApiResponse[User]] match {
case JsSuccess(apiResponse, _) => apiResponse.data
case JsError(errors) => throw new RuntimeException(s"JSON Parse Error: $errors")
}
}
} else {
response.discardEntityBytes()
Future.failed(new RuntimeException(s"User update failed: ${response.status}"))
}
}
}
def deleteUser(id: Long): Future[Boolean] = {
request(s"/users/$id", HttpMethods.DELETE).map { response =>
val success = response.status == StatusCodes.NoContent
response.discardEntityBytes()
success
}
}
}
// 使用例
val apiClient = new UserApiClient("https://api.example.com/v1", "your-jwt-token")
for {
// ユーザー作成
newUser <- apiClient.createUser(User(0, "田中太郎", "[email protected]"))
println(s"作成されたユーザー: $newUser")
// ユーザー取得
retrievedUser <- apiClient.getUser(newUser.id)
println(s"取得されたユーザー: $retrievedUser")
// ユーザー更新
updatedUser <- apiClient.updateUser(newUser.id, newUser.copy(name = "田中次郎"))
println(s"更新されたユーザー: $updatedUser")
// ユーザー削除
deleted <- apiClient.deleteUser(newUser.id)
println(s"削除成功: $deleted")
} yield ()
// アクター統合例
import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.Behaviors
object HttpClientActor {
sealed trait Command
case class FetchUser(id: Long, replyTo: ActorRef[UserResponse]) extends Command
case class CreateUser(user: User, replyTo: ActorRef[UserResponse]) extends Command
sealed trait UserResponse
case class UserFound(user: User) extends UserResponse
case class UserCreated(user: User) extends UserResponse
case class UserNotFound(id: Long) extends UserResponse
case class ApiError(message: String) extends UserResponse
def apply(apiClient: UserApiClient): Behavior[Command] = Behaviors.setup { context =>
implicit val ec: ExecutionContext = context.executionContext
Behaviors.receiveMessage {
case FetchUser(id, replyTo) =>
context.pipeToSelf(apiClient.getUser(id)) {
case Success(Some(user)) => UserFound(user)
case Success(None) => UserNotFound(id)
case Failure(ex) => ApiError(ex.getMessage)
}
Behaviors.receiveMessage {
case response: UserResponse =>
replyTo ! response
Behaviors.same
case _ => Behaviors.unhandled
}
case CreateUser(user, replyTo) =>
context.pipeToSelf(apiClient.createUser(user)) {
case Success(createdUser) => UserCreated(createdUser)
case Failure(ex) => ApiError(ex.getMessage)
}
Behaviors.receiveMessage {
case response: UserResponse =>
replyTo ! response
Behaviors.same
case _ => Behaviors.unhandled
}
}
}
}