aiohttp
Python向けの非同期HTTPクライアント・サーバーライブラリ。asyncio基盤で高性能な非同期通信を実現。HTTPクライアント機能に加えて、Webサーバー、WebSocket、ミドルウェアサポートも提供する包括的な非同期Webフレームワーク。
GitHub概要
aio-libs/aiohttp
Asynchronous HTTP client/server framework for asyncio and Python
スター16,048
ウォッチ216
フォーク2,137
作成日:2013年10月1日
言語:Python
ライセンス:Other
トピックス
aiohttpasyncasynciohacktoberfesthttphttp-clienthttp-serverpython
スター履歴
データ取得日時: 2025/10/22 04:10
ライブラリ
aiohttp
概要
aoihttpは「Python向けの非同期HTTPクライアント・サーバーライブラリ」として開発された、asyncio ベースの包括的Webフレームワークです。HTTPクライアント機能に加えて、HTTPサーバー、WebSocket、ミドルウェア、ルーティングを統合提供。Python 3.8以降での非同期プログラミングに最適化され、async/await構文を完全サポート。高性能なI/O処理とスケーラブルなWebアプリケーション開発を実現し、FastAPIやStarletteと並ぶPython非同期Webエコシステムの主要コンポーネントとして確立されています。
詳細
aiohttp 2025年版はPython非同期プログラミングの成熟と共に進化を続け、エンタープライズレベルの非同期Webアプリケーション開発で広く採用されています。単なるHTTPクライアントを超えて、Webサーバー、WebSocket、ストリーミング、ミドルウェアシステムを統合した包括的なソリューション。asyncioとの深い統合により、数千の同時接続を効率的に処理し、マイクロサービスアーキテクチャやリアルタイムアプリケーション開発に威力を発揮。豊富なエコシステムと拡張性により、小規模プロトタイプから大規模本番環境まで対応します。
主な特徴
- クライアント・サーバー統合: HTTPクライアントとサーバーの両機能を提供
- WebSocket完全対応: リアルタイム通信とストリーミング処理
- 非同期ファーストデザイン: asyncio完全統合による高性能I/O処理
- ミドルウェアシステム: 柔軟なリクエスト処理パイプライン
- セッション管理: 高度なクッキー・認証・接続プーリング
- ストリーミング処理: 大容量データの効率的な処理機能
メリット・デメリット
メリット
- HTTPクライアントとサーバー機能の統合による一貫した開発体験
- 非同期処理による高いスループットと同時接続処理能力
- WebSocketサポートによるリアルタイムアプリケーション開発対応
- 豊富なミドルウェアエコシステムと拡張機能
- 成熟したセッション管理と認証システム
- microservices アーキテクチャとの優れた親和性
デメリット
- 非同期プログラミングの理解が必要で学習コストが高い
- requests のようなシンプルな同期APIと比較して複雑
- デバッグとエラーハンドリングが同期処理より困難
- 小規模な単発リクエストではオーバーヘッドが大きい
- Python 3.8以降が必須で古いバージョン対応不可
- 初期設定とセットアップがrequestsより複雑
参考ページ
書き方の例
インストールと基本セットアップ
# aiohttpのインストール
pip install aiohttp
# セキュリティ強化版(推奨)
pip install aiohttp[speedups]
# 開発環境向け追加ツール
pip install aiohttp[speedups,dev]
# 依存関係確認
python -c "import aiohttp; print(aiohttp.__version__)"
基本的なリクエスト(GET/POST/PUT/DELETE)
import asyncio
import aiohttp
# 基本的なGETリクエスト
async def basic_get_request():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/users') as response:
print(f"ステータス: {response.status}")
print(f"ヘッダー: {response.headers}")
# JSON レスポンスの取得
data = await response.json()
print(f"ユーザーデータ: {data}")
# テキストレスポンスの取得
# text = await response.text()
# print(f"テキスト: {text}")
# パラメータ付きGETリクエスト
async def get_with_params():
async with aiohttp.ClientSession() as session:
params = {
'page': 1,
'limit': 10,
'sort': 'created_at',
'filter': 'active'
}
async with session.get('https://api.example.com/users', params=params) as response:
if response.status == 200:
data = await response.json()
print(f"取得データ: {len(data)}件")
return data
else:
print(f"エラー: {response.status}")
return None
# POSTリクエスト(JSON送信)
async def post_json_request():
user_data = {
'name': '田中太郎',
'email': '[email protected]',
'age': 30,
'department': '開発部'
}
headers = {
'Authorization': 'Bearer your-jwt-token',
'Content-Type': 'application/json'
}
async with aiohttp.ClientSession() as session:
async with session.post(
'https://api.example.com/users',
json=user_data,
headers=headers
) as response:
if response.status == 201:
created_user = await response.json()
print(f"ユーザー作成完了: ID={created_user['id']}")
return created_user
else:
error_text = await response.text()
print(f"作成失敗: {response.status} - {error_text}")
return None
# POSTリクエスト(フォームデータ送信)
async def post_form_request():
form_data = aiohttp.FormData()
form_data.add_field('username', 'testuser')
form_data.add_field('password', 'secret123')
form_data.add_field('remember_me', 'true')
async with aiohttp.ClientSession() as session:
async with session.post('https://api.example.com/login', data=form_data) as response:
if response.status == 200:
result = await response.json()
print(f"ログイン成功: {result}")
return result
else:
print(f"ログイン失敗: {response.status}")
return None
# PUTリクエスト(データ更新)
async def put_request():
update_data = {
'name': '田中次郎',
'email': '[email protected]',
'age': 31
}
headers = {'Authorization': 'Bearer your-jwt-token'}
async with aiohttp.ClientSession() as session:
async with session.put(
'https://api.example.com/users/123',
json=update_data,
headers=headers
) as response:
if response.status == 200:
updated_user = await response.json()
print(f"ユーザー更新完了: {updated_user}")
return updated_user
else:
error_text = await response.text()
print(f"更新失敗: {response.status} - {error_text}")
return None
# DELETEリクエスト
async def delete_request():
headers = {'Authorization': 'Bearer your-jwt-token'}
async with aiohttp.ClientSession() as session:
async with session.delete('https://api.example.com/users/123', headers=headers) as response:
if response.status == 204:
print("ユーザー削除完了")
return True
else:
error_text = await response.text()
print(f"削除失敗: {response.status} - {error_text}")
return False
# レスポンス詳細情報の取得
async def detailed_response_info():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/info') as response:
print(f"ステータスコード: {response.status}")
print(f"リーズン: {response.reason}")
print(f"HTTPバージョン: {response.version}")
print(f"URL: {response.url}")
print(f"履歴: {response.history}")
print(f"ヘッダー: {dict(response.headers)}")
print(f"Content-Type: {response.content_type}")
print(f"Charset: {response.charset}")
# 実行例
async def main():
await basic_get_request()
await get_with_params()
await post_json_request()
await post_form_request()
await put_request()
await delete_request()
await detailed_response_info()
# asyncio.run(main())
高度な設定とカスタマイズ(ヘッダー、認証、タイムアウト等)
import aiohttp
import asyncio
import ssl
# カスタムヘッダーとセッション設定
async def custom_session_example():
headers = {
'User-Agent': 'MyApp/1.0 (aiohttp Python)',
'Accept': 'application/json',
'Accept-Language': 'ja-JP,en-US;q=0.9',
'X-API-Version': 'v2',
'X-Request-ID': 'req-12345'
}
timeout = aiohttp.ClientTimeout(total=30, connect=10)
async with aiohttp.ClientSession(
headers=headers,
timeout=timeout,
connector=aiohttp.TCPConnector(limit=100, ttl_dns_cache=300)
) as session:
async with session.get('https://api.example.com/data') as response:
data = await response.json()
return data
# Basic認証の設定
async def basic_auth_example():
auth = aiohttp.BasicAuth('username', 'password')
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/private', auth=auth) as response:
if response.status == 200:
data = await response.json()
print(f"認証成功: {data}")
return data
else:
print(f"認証失敗: {response.status}")
return None
# タイムアウト詳細設定
async def timeout_configuration():
# 詳細なタイムアウト設定
timeout = aiohttp.ClientTimeout(
total=30, # 全体タイムアウト(30秒)
connect=5, # 接続タイムアウト(5秒)
sock_read=10, # ソケット読み込みタイムアウト(10秒)
sock_connect=5 # ソケット接続タイムアウト(5秒)
)
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
async with session.get('https://api.example.com/slow-endpoint') as response:
data = await response.json()
return data
except asyncio.TimeoutError:
print("リクエストがタイムアウトしました")
return None
# SSL証明書とHTTPS設定
async def ssl_configuration():
# SSL証明書検証の無効化(開発時のみ)
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
connector = aiohttp.TCPConnector(ssl=ssl_context)
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get('https://self-signed.example.com/api') as response:
data = await response.json()
return data
# クライアント証明書認証
async def client_certificate_auth():
ssl_context = ssl.create_default_context()
ssl_context.load_cert_chain('/path/to/client.crt', '/path/to/client.key')
connector = aiohttp.TCPConnector(ssl=ssl_context)
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get('https://secure-api.example.com/data') as response:
data = await response.json()
return data
# プロキシ設定
async def proxy_configuration():
# HTTPプロキシ設定
proxy = 'http://proxy.example.com:8080'
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data', proxy=proxy) as response:
data = await response.json()
return data
# 認証付きプロキシ
async def authenticated_proxy():
proxy = 'http://user:[email protected]:8080'
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data', proxy=proxy) as response:
data = await response.json()
return data
# Cookie管理
async def cookie_management():
# Cookie Jarでセッション管理
jar = aiohttp.CookieJar(unsafe=True) # unsafe=Trueで非HTTPSクッキーも受け入れ
async with aiohttp.ClientSession(cookie_jar=jar) as session:
# ログインしてセッションCookieを取得
login_data = {'username': 'user', 'password': 'pass'}
async with session.post('https://api.example.com/login', json=login_data) as response:
if response.status == 200:
print("ログイン成功、Cookieが保存されました")
# 後続のリクエストでCookieが自動的に送信される
async with session.get('https://api.example.com/protected') as response:
data = await response.json()
return data
# カスタムコネクタ設定
async def custom_connector():
connector = aiohttp.TCPConnector(
limit=100, # 総接続数制限
limit_per_host=30, # ホスト毎接続数制限
ttl_dns_cache=300, # DNS キャッシュTTL(秒)
use_dns_cache=True, # DNS キャッシュ使用
keepalive_timeout=30, # Keep-Alive タイムアウト
enable_cleanup_closed=True # 閉じた接続のクリーンアップ
)
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get('https://api.example.com/data') as response:
data = await response.json()
return data
# リトライ機能付きリクエスト
async def request_with_retry(url, max_retries=3, backoff_factor=1):
async with aiohttp.ClientSession() as session:
for attempt in range(max_retries + 1):
try:
async with session.get(url) as response:
if response.status == 200:
return await response.json()
elif response.status in [500, 502, 503, 504]:
if attempt < max_retries:
wait_time = backoff_factor * (2 ** attempt)
print(f"サーバーエラー {response.status}. {wait_time}秒後にリトライ...")
await asyncio.sleep(wait_time)
continue
# リトライしないエラー
response.raise_for_status()
except aiohttp.ClientError as e:
if attempt < max_retries:
wait_time = backoff_factor * (2 ** attempt)
print(f"接続エラー: {e}. {wait_time}秒後にリトライ...")
await asyncio.sleep(wait_time)
else:
raise
return None
エラーハンドリングとリトライ機能
import aiohttp
import asyncio
import logging
from typing import Optional, Dict, Any
# 包括的なエラーハンドリング
async def comprehensive_error_handling():
async with aiohttp.ClientSession() as session:
try:
async with session.get('https://api.example.com/users', timeout=10) as response:
# HTTPステータスエラーを明示的にチェック
response.raise_for_status()
data = await response.json()
return data
except aiohttp.ClientConnectionError as e:
print(f"接続エラー: {e}")
print("ネットワーク接続やDNS設定を確認してください")
except aiohttp.ClientTimeout as e:
print(f"タイムアウトエラー: {e}")
print("リクエストがタイムアウトしました")
except aiohttp.ClientResponseError as e:
print(f"HTTPレスポンスエラー: {e}")
print(f"ステータスコード: {e.status}")
print(f"レスポンステキスト: {await e.response.text()}")
except aiohttp.ClientPayloadError as e:
print(f"ペイロードエラー: {e}")
print("レスポンスの読み込み中にエラーが発生しました")
except aiohttp.ClientError as e:
print(f"クライアントエラー: {e}")
print("予期しないクライアントエラーが発生しました")
except asyncio.TimeoutError:
print("非同期処理がタイムアウトしました")
except Exception as e:
print(f"予期しないエラー: {e}")
return None
# 高度なリトライクラス
class RetryableSession:
def __init__(self, max_retries=3, backoff_factor=1.0, retry_statuses=None):
self.max_retries = max_retries
self.backoff_factor = backoff_factor
self.retry_statuses = retry_statuses or [500, 502, 503, 504, 429]
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def request_with_retry(self, method: str, url: str, **kwargs) -> Optional[aiohttp.ClientResponse]:
for attempt in range(self.max_retries + 1):
try:
async with self.session.request(method, url, **kwargs) as response:
if response.status not in self.retry_statuses:
return response
if attempt < self.max_retries:
wait_time = self.backoff_factor * (2 ** attempt)
print(f"ステータス {response.status} でリトライ {attempt + 1}/{self.max_retries}. {wait_time}秒待機...")
await asyncio.sleep(wait_time)
continue
else:
return response
except (aiohttp.ClientConnectionError, aiohttp.ClientTimeout) as e:
if attempt < self.max_retries:
wait_time = self.backoff_factor * (2 ** attempt)
print(f"接続エラーでリトライ {attempt + 1}/{self.max_retries}: {e}. {wait_time}秒待機...")
await asyncio.sleep(wait_time)
else:
raise
return None
# リトライセッションの使用例
async def retry_session_example():
async with RetryableSession(max_retries=3, backoff_factor=1.5) as retry_session:
response = await retry_session.request_with_retry('GET', 'https://api.example.com/unstable')
if response:
data = await response.json()
return data
return None
# エラー別の詳細処理
async def detailed_error_processing():
async with aiohttp.ClientSession() as session:
try:
async with session.get('https://api.example.com/data') as response:
if response.status == 200:
return await response.json()
elif response.status == 401:
print("認証エラー: トークンを確認してください")
return {'error': 'authentication_required'}
elif response.status == 403:
print("権限エラー: アクセス権限がありません")
return {'error': 'permission_denied'}
elif response.status == 404:
print("見つかりません: リソースが存在しません")
return {'error': 'not_found'}
elif response.status == 429:
# Rate limiting の処理
retry_after = response.headers.get('Retry-After')
if retry_after:
print(f"レート制限: {retry_after}秒後に再試行してください")
await asyncio.sleep(int(retry_after))
# 再試行ロジックをここに追加
return {'error': 'rate_limited'}
elif response.status >= 500:
print("サーバーエラー: サーバー側の問題です")
return {'error': 'server_error'}
else:
print(f"予期しないステータス: {response.status}")
return {'error': 'unexpected_status'}
except Exception as e:
print(f"処理中にエラーが発生: {e}")
return {'error': 'processing_failed'}
# レスポンス検証とエラーハンドリング
async def response_validation():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as response:
# Content-Type チェック
content_type = response.headers.get('Content-Type', '')
if 'application/json' not in content_type:
print(f"予期しないContent-Type: {content_type}")
return None
try:
data = await response.json()
# データ構造の検証
if not isinstance(data, dict):
print("レスポンスがJSON オブジェクトではありません")
return None
# 必須フィールドの確認
required_fields = ['id', 'name', 'status']
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
print(f"必須フィールドが不足: {missing_fields}")
return None
return data
except aiohttp.ContentTypeError as e:
print(f"JSON パースエラー: {e}")
text_content = await response.text()
print(f"レスポンス内容: {text_content[:200]}...")
return None
# サーキットブレーカーパターンの実装
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
async def call(self, func, *args, **kwargs):
if self.state == 'OPEN':
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = 'HALF_OPEN'
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise
def on_success(self):
self.failure_count = 0
self.state = 'CLOSED'
def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
# サーキットブレーカー使用例
circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
async def protected_api_call():
async def api_request():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/unreliable') as response:
return await response.json()
try:
return await circuit_breaker.call(api_request)
except Exception as e:
print(f"サーキットブレーカーによる保護: {e}")
return None
並行処理と非同期リクエスト
import aiohttp
import asyncio
from typing import List, Dict, Any, Optional
import time
# 複数URLの並列取得
async def fetch_multiple_urls(urls: List[str]) -> List[Dict[str, Any]]:
results = []
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
task = asyncio.create_task(fetch_single_url(session, url))
tasks.append(task)
completed_tasks = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(completed_tasks):
if isinstance(result, Exception):
results.append({
'url': urls[i],
'success': False,
'error': str(result),
'status_code': None
})
else:
results.append(result)
return results
async def fetch_single_url(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]:
try:
start_time = time.time()
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
end_time = time.time()
content_length = len(await response.read())
return {
'url': url,
'status_code': response.status,
'success': response.status == 200,
'content_length': content_length,
'response_time': end_time - start_time,
'headers': dict(response.headers)
}
except Exception as e:
return {
'url': url,
'success': False,
'error': str(e),
'status_code': None
}
# セマフォによる同時接続数制御
async def controlled_concurrent_requests(urls: List[str], max_concurrent: int = 5) -> List[Dict[str, Any]]:
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_semaphore(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]:
async with semaphore:
return await fetch_single_url(session, url)
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_semaphore(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# ページネーション対応の全データ取得
async def fetch_all_paginated_data(base_url: str, headers: Optional[Dict[str, str]] = None) -> List[Dict[str, Any]]:
all_data = []
page = 1
has_more = True
session_headers = headers or {}
async with aiohttp.ClientSession(headers=session_headers) as session:
while has_more:
try:
params = {'page': page, 'per_page': 100}
async with session.get(base_url, params=params) as response:
if response.status == 200:
page_data = await response.json()
if isinstance(page_data, dict) and 'items' in page_data:
items = page_data['items']
if not items:
break
all_data.extend(items)
has_more = page_data.get('has_more', False)
elif isinstance(page_data, list):
if not page_data:
break
all_data.extend(page_data)
has_more = len(page_data) == 100 # Full page suggests more data
else:
break
print(f"ページ {page} 完了: {len(page_data if isinstance(page_data, list) else items)}件")
page += 1
# API負荷軽減
await asyncio.sleep(0.1)
else:
print(f"ページ {page} でエラー: {response.status}")
break
except Exception as e:
print(f"ページ {page} で例外: {e}")
break
print(f"合計 {len(all_data)} 件のデータを取得")
return all_data
# 段階的なデータ処理(レスポンス依存)
async def dependent_requests_pipeline():
async with aiohttp.ClientSession() as session:
# Step 1: ユーザー情報取得
async with session.get('https://api.example.com/user/profile') as response:
if response.status != 200:
return {'error': 'Failed to get user profile'}
user_profile = await response.json()
user_id = user_profile['id']
# Step 2: ユーザーの投稿一覧を並列取得
posts_tasks = []
for category in ['tech', 'personal', 'work']:
task = session.get(f'https://api.example.com/users/{user_id}/posts/{category}')
posts_tasks.append(task)
posts_responses = await asyncio.gather(*posts_tasks)
# Step 3: 各カテゴリの投稿データを処理
all_posts = {}
for i, response in enumerate(posts_responses):
category = ['tech', 'personal', 'work'][i]
async with response:
if response.status == 200:
posts_data = await response.json()
all_posts[category] = posts_data
else:
all_posts[category] = []
return {
'user_profile': user_profile,
'posts': all_posts
}
# WebSocket クライアント機能
async def websocket_client_example():
session = aiohttp.ClientSession()
try:
async with session.ws_connect('wss://api.example.com/websocket') as ws:
print("WebSocket接続が確立されました")
# 初期メッセージ送信
await ws.send_str(json.dumps({
'type': 'subscribe',
'channel': 'updates'
}))
# メッセージ受信ループ
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
print(f"受信: {data}")
# 特定メッセージに対する応答
if data.get('type') == 'ping':
await ws.send_str(json.dumps({'type': 'pong'}))
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"WebSocketエラー: {ws.exception()}")
break
elif msg.type == aiohttp.WSMsgType.CLOSE:
print("WebSocket接続が閉じられました")
break
except Exception as e:
print(f"WebSocket接続エラー: {e}")
finally:
await session.close()
# ストリーミングダウンロード
async def streaming_download(url: str, file_path: str, chunk_size: int = 8192):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
total_size = int(response.headers.get('Content-Length', 0))
downloaded = 0
with open(file_path, 'wb') as file:
async for chunk in response.content.iter_chunked(chunk_size):
file.write(chunk)
downloaded += len(chunk)
if total_size > 0:
progress = (downloaded / total_size) * 100
print(f"ダウンロード進捗: {progress:.1f}%", end='\r')
print(f"\nダウンロード完了: {file_path}")
return True
else:
print(f"ダウンロード失敗: {response.status}")
return False
# バッチ処理によるリクエスト最適化
async def batch_process_requests(items: List[str], batch_size: int = 10):
all_results = []
async with aiohttp.ClientSession() as session:
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
print(f"バッチ {i//batch_size + 1} 処理中: {len(batch)}項目")
# バッチ内の並列処理
tasks = []
for item in batch:
task = fetch_single_url(session, f'https://api.example.com/items/{item}')
tasks.append(task)
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
# 結果の処理
for result in batch_results:
if isinstance(result, Exception):
all_results.append({'success': False, 'error': str(result)})
else:
all_results.append(result)
# バッチ間の待機
if i + batch_size < len(items):
await asyncio.sleep(0.5)
return all_results
# 使用例
async def main_parallel_example():
urls = [
'https://api.example.com/users',
'https://api.example.com/posts',
'https://api.example.com/comments',
'https://api.example.com/categories'
]
print("並列リクエスト実行中...")
results = await fetch_multiple_urls(urls)
successful_requests = [r for r in results if r['success']]
print(f"成功: {len(successful_requests)}/{len(urls)}")
print("\n同時接続制御テスト...")
controlled_results = await controlled_concurrent_requests(urls, max_concurrent=2)
print("\nページネーションテスト...")
paginated_data = await fetch_all_paginated_data('https://api.example.com/posts')
print("\n依存リクエストパイプライン...")
pipeline_result = await dependent_requests_pipeline()
# asyncio.run(main_parallel_example())
フレームワーク統合と実用例
import aiohttp
import asyncio
import json
from typing import Optional, Dict, Any, List
from pathlib import Path
import mimetypes
# FastAPI統合用の高度なAPIクライアント
class AsyncAPIClient:
def __init__(self, base_url: str, token: Optional[str] = None):
self.base_url = base_url.rstrip('/')
self.token = token
self.session = None
async def __aenter__(self):
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'User-Agent': 'AsyncAPIClient/1.0 (aiohttp)'
}
if self.token:
headers['Authorization'] = f'Bearer {self.token}'
timeout = aiohttp.ClientTimeout(total=30, connect=5)
connector = aiohttp.TCPConnector(limit=100, ttl_dns_cache=300)
self.session = aiohttp.ClientSession(
headers=headers,
timeout=timeout,
connector=connector
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
async with self.session.get(url, params=params) as response:
response.raise_for_status()
return await response.json()
async def post(self, endpoint: str, data: Optional[Dict] = None, json_data: Optional[Dict] = None) -> Dict[str, Any]:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
kwargs = {}
if json_data:
kwargs['json'] = json_data
if data:
kwargs['data'] = data
async with self.session.post(url, **kwargs) as response:
response.raise_for_status()
return await response.json()
async def put(self, endpoint: str, data: Optional[Dict] = None) -> Dict[str, Any]:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
async with self.session.put(url, json=data) as response:
response.raise_for_status()
return await response.json()
async def delete(self, endpoint: str) -> bool:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
async with self.session.delete(url) as response:
response.raise_for_status()
return response.status == 204
# 使用例
async def api_client_example():
async with AsyncAPIClient('https://api.example.com/v1', token='your-jwt-token') as client:
# ユーザー一覧取得
users = await client.get('users', params={'page': 1, 'limit': 50})
print(f"取得ユーザー数: {len(users)}")
# 新しいユーザー作成
new_user = await client.post('users', json_data={
'name': '田中太郎',
'email': '[email protected]',
'department': '開発部'
})
print(f"作成されたユーザー: {new_user['id']}")
# ユーザー情報更新
updated_user = await client.put(f'users/{new_user["id"]}', data={
'name': '田中次郎',
'department': '技術部'
})
print(f"更新完了: {updated_user['name']}")
# マルチパートファイルアップロード
async def upload_file_multipart(file_path: Path, upload_url: str, additional_fields: Optional[Dict[str, str]] = None):
"""マルチパート形式でのファイルアップロード"""
# ファイルのMIMEタイプを自動判定
mime_type, _ = mimetypes.guess_type(str(file_path))
if not mime_type:
mime_type = 'application/octet-stream'
data = aiohttp.FormData()
data.add_field('file',
file_path.open('rb'),
filename=file_path.name,
content_type=mime_type)
# 追加フィールドの設定
if additional_fields:
for key, value in additional_fields.items():
data.add_field(key, value)
headers = {'Authorization': 'Bearer your-upload-token'}
async with aiohttp.ClientSession() as session:
try:
async with session.post(upload_url, data=data, headers=headers) as response:
response.raise_for_status()
result = await response.json()
print(f"アップロード成功: {result}")
return result
except aiohttp.ClientError as e:
print(f"アップロードエラー: {e}")
raise
finally:
# ファイルハンドルのクリーンアップ
for field in data._fields:
if hasattr(field[2], 'close'):
field[2].close()
# プログレス付きファイルアップロード
async def upload_with_progress(file_path: Path, upload_url: str):
"""進捗表示付きファイルアップロード"""
file_size = file_path.stat().st_size
uploaded = 0
class ProgressReader:
def __init__(self, file_obj):
self.file_obj = file_obj
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.file_obj.close()
def read(self, size=-1):
nonlocal uploaded
chunk = self.file_obj.read(size)
if chunk:
uploaded += len(chunk)
progress = (uploaded / file_size) * 100
print(f"アップロード進捗: {progress:.1f}%", end='\r')
return chunk
async with ProgressReader(file_path.open('rb')) as progress_file:
data = aiohttp.FormData()
data.add_field('file', progress_file, filename=file_path.name)
async with aiohttp.ClientSession() as session:
async with session.post(upload_url, data=data) as response:
response.raise_for_status()
print(f"\nアップロード完了: {file_path.name}")
return await response.json()
# 認証トークン自動更新機能
class TokenManager:
def __init__(self, client_id: str, client_secret: str, token_url: str):
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self.access_token = None
self.refresh_token = None
self.token_expires_at = None
async def get_valid_token(self) -> str:
if self.access_token and self.token_expires_at:
import time
if time.time() < self.token_expires_at:
return self.access_token
await self._refresh_token()
return self.access_token
async def _refresh_token(self):
auth_data = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'client_credentials'
}
async with aiohttp.ClientSession() as session:
async with session.post(self.token_url, data=auth_data) as response:
response.raise_for_status()
token_data = await response.json()
self.access_token = token_data['access_token']
self.refresh_token = token_data.get('refresh_token')
import time
expires_in = token_data.get('expires_in', 3600)
self.token_expires_at = time.time() + expires_in - 60 # 60秒のバッファ
# 認証付きHTTPクライアント
class AuthenticatedClient:
def __init__(self, base_url: str, token_manager: TokenManager):
self.base_url = base_url.rstrip('/')
self.token_manager = token_manager
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def _make_request(self, method: str, endpoint: str, **kwargs):
token = await self.token_manager.get_valid_token()
headers = kwargs.pop('headers', {})
headers['Authorization'] = f'Bearer {token}'
url = f"{self.base_url}/{endpoint.lstrip('/')}"
async with self.session.request(method, url, headers=headers, **kwargs) as response:
if response.status == 401:
# トークンを強制更新して再試行
await self.token_manager._refresh_token()
token = await self.token_manager.get_valid_token()
headers['Authorization'] = f'Bearer {token}'
async with self.session.request(method, url, headers=headers, **kwargs) as retry_response:
retry_response.raise_for_status()
return await retry_response.json()
else:
response.raise_for_status()
return await response.json()
# サーバーサイドイベント(SSE)クライアント
async def sse_client(url: str):
"""Server-Sent Events クライアント"""
async with aiohttp.ClientSession() as session:
async with session.get(url, headers={'Accept': 'text/event-stream'}) as response:
if response.status == 200:
print("SSE接続が確立されました")
async for line in response.content:
line = line.decode('utf-8').strip()
if line.startswith('data: '):
event_data = line[6:] # 'data: ' プレフィックスを削除
try:
data = json.loads(event_data)
print(f"イベント受信: {data}")
# イベントタイプ別の処理
event_type = data.get('type')
if event_type == 'user_update':
await handle_user_update(data)
elif event_type == 'system_notification':
await handle_system_notification(data)
except json.JSONDecodeError:
print(f"テキストイベント: {event_data}")
elif line.startswith('event: '):
event_type = line[7:]
print(f"イベントタイプ: {event_type}")
else:
print(f"SSE接続失敗: {response.status}")
async def handle_user_update(data):
print(f"ユーザー更新: {data}")
async def handle_system_notification(data):
print(f"システム通知: {data}")
# FastAPI サーバーとの統合例
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
# バックグラウンドタスクでHTTPリクエスト実行
async def background_api_call(user_id: int):
async with AsyncAPIClient('https://external-api.example.com') as client:
user_data = await client.get(f'users/{user_id}')
# データ処理ロジック
processed_data = process_user_data(user_data)
# 結果を別のAPIに送信
await client.post('analytics/user-activity', json_data=processed_data)
@app.post("/trigger-analysis/{user_id}")
async def trigger_user_analysis(user_id: int, background_tasks: BackgroundTasks):
background_tasks.add_task(background_api_call, user_id)
return {"message": "Analysis started in background"}
def process_user_data(user_data):
# データ処理ロジック
return {
"user_id": user_data["id"],
"activity_score": calculate_activity_score(user_data),
"processed_at": datetime.now().isoformat()
}
def calculate_activity_score(user_data):
# アクティビティスコア計算
return user_data.get("login_count", 0) * 10
# 使用例
async def integration_examples():
# API クライアント例
await api_client_example()
# ファイルアップロード例
file_path = Path("/path/to/document.pdf")
if file_path.exists():
await upload_file_multipart(
file_path,
'https://api.example.com/upload',
additional_fields={'category': 'documents', 'public': 'false'}
)
# SSE クライアント例
# await sse_client('https://api.example.com/events')
# asyncio.run(integration_examples())