aiohttp

Python向けの非同期HTTPクライアント・サーバーライブラリ。asyncio基盤で高性能な非同期通信を実現。HTTPクライアント機能に加えて、Webサーバー、WebSocket、ミドルウェアサポートも提供する包括的な非同期Webフレームワーク。

HTTPクライアントPython非同期サーバーWebSocket高性能

GitHub概要

aio-libs/aiohttp

Asynchronous HTTP client/server framework for asyncio and Python

スター16,048
ウォッチ216
フォーク2,137
作成日:2013年10月1日
言語:Python
ライセンス:Other

トピックス

aiohttpasyncasynciohacktoberfesthttphttp-clienthttp-serverpython

スター履歴

aio-libs/aiohttp Star History
データ取得日時: 2025/10/22 04:10

ライブラリ

aiohttp

概要

aoihttpは「Python向けの非同期HTTPクライアント・サーバーライブラリ」として開発された、asyncio ベースの包括的Webフレームワークです。HTTPクライアント機能に加えて、HTTPサーバー、WebSocket、ミドルウェア、ルーティングを統合提供。Python 3.8以降での非同期プログラミングに最適化され、async/await構文を完全サポート。高性能なI/O処理とスケーラブルなWebアプリケーション開発を実現し、FastAPIやStarletteと並ぶPython非同期Webエコシステムの主要コンポーネントとして確立されています。

詳細

aiohttp 2025年版はPython非同期プログラミングの成熟と共に進化を続け、エンタープライズレベルの非同期Webアプリケーション開発で広く採用されています。単なるHTTPクライアントを超えて、Webサーバー、WebSocket、ストリーミング、ミドルウェアシステムを統合した包括的なソリューション。asyncioとの深い統合により、数千の同時接続を効率的に処理し、マイクロサービスアーキテクチャやリアルタイムアプリケーション開発に威力を発揮。豊富なエコシステムと拡張性により、小規模プロトタイプから大規模本番環境まで対応します。

主な特徴

  • クライアント・サーバー統合: HTTPクライアントとサーバーの両機能を提供
  • WebSocket完全対応: リアルタイム通信とストリーミング処理
  • 非同期ファーストデザイン: asyncio完全統合による高性能I/O処理
  • ミドルウェアシステム: 柔軟なリクエスト処理パイプライン
  • セッション管理: 高度なクッキー・認証・接続プーリング
  • ストリーミング処理: 大容量データの効率的な処理機能

メリット・デメリット

メリット

  • HTTPクライアントとサーバー機能の統合による一貫した開発体験
  • 非同期処理による高いスループットと同時接続処理能力
  • WebSocketサポートによるリアルタイムアプリケーション開発対応
  • 豊富なミドルウェアエコシステムと拡張機能
  • 成熟したセッション管理と認証システム
  • microservices アーキテクチャとの優れた親和性

デメリット

  • 非同期プログラミングの理解が必要で学習コストが高い
  • requests のようなシンプルな同期APIと比較して複雑
  • デバッグとエラーハンドリングが同期処理より困難
  • 小規模な単発リクエストではオーバーヘッドが大きい
  • Python 3.8以降が必須で古いバージョン対応不可
  • 初期設定とセットアップがrequestsより複雑

参考ページ

書き方の例

インストールと基本セットアップ

# aiohttpのインストール
pip install aiohttp

# セキュリティ強化版(推奨)
pip install aiohttp[speedups]

# 開発環境向け追加ツール
pip install aiohttp[speedups,dev]

# 依存関係確認
python -c "import aiohttp; print(aiohttp.__version__)"

基本的なリクエスト(GET/POST/PUT/DELETE)

import asyncio
import aiohttp

# 基本的なGETリクエスト
async def basic_get_request():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com/users') as response:
            print(f"ステータス: {response.status}")
            print(f"ヘッダー: {response.headers}")
            
            # JSON レスポンスの取得
            data = await response.json()
            print(f"ユーザーデータ: {data}")
            
            # テキストレスポンスの取得
            # text = await response.text()
            # print(f"テキスト: {text}")

# パラメータ付きGETリクエスト
async def get_with_params():
    async with aiohttp.ClientSession() as session:
        params = {
            'page': 1,
            'limit': 10,
            'sort': 'created_at',
            'filter': 'active'
        }
        
        async with session.get('https://api.example.com/users', params=params) as response:
            if response.status == 200:
                data = await response.json()
                print(f"取得データ: {len(data)}件")
                return data
            else:
                print(f"エラー: {response.status}")
                return None

# POSTリクエスト(JSON送信)
async def post_json_request():
    user_data = {
        'name': '田中太郎',
        'email': '[email protected]',
        'age': 30,
        'department': '開発部'
    }
    
    headers = {
        'Authorization': 'Bearer your-jwt-token',
        'Content-Type': 'application/json'
    }
    
    async with aiohttp.ClientSession() as session:
        async with session.post(
            'https://api.example.com/users',
            json=user_data,
            headers=headers
        ) as response:
            if response.status == 201:
                created_user = await response.json()
                print(f"ユーザー作成完了: ID={created_user['id']}")
                return created_user
            else:
                error_text = await response.text()
                print(f"作成失敗: {response.status} - {error_text}")
                return None

# POSTリクエスト(フォームデータ送信)
async def post_form_request():
    form_data = aiohttp.FormData()
    form_data.add_field('username', 'testuser')
    form_data.add_field('password', 'secret123')
    form_data.add_field('remember_me', 'true')
    
    async with aiohttp.ClientSession() as session:
        async with session.post('https://api.example.com/login', data=form_data) as response:
            if response.status == 200:
                result = await response.json()
                print(f"ログイン成功: {result}")
                return result
            else:
                print(f"ログイン失敗: {response.status}")
                return None

# PUTリクエスト(データ更新)
async def put_request():
    update_data = {
        'name': '田中次郎',
        'email': '[email protected]',
        'age': 31
    }
    
    headers = {'Authorization': 'Bearer your-jwt-token'}
    
    async with aiohttp.ClientSession() as session:
        async with session.put(
            'https://api.example.com/users/123',
            json=update_data,
            headers=headers
        ) as response:
            if response.status == 200:
                updated_user = await response.json()
                print(f"ユーザー更新完了: {updated_user}")
                return updated_user
            else:
                error_text = await response.text()
                print(f"更新失敗: {response.status} - {error_text}")
                return None

# DELETEリクエスト
async def delete_request():
    headers = {'Authorization': 'Bearer your-jwt-token'}
    
    async with aiohttp.ClientSession() as session:
        async with session.delete('https://api.example.com/users/123', headers=headers) as response:
            if response.status == 204:
                print("ユーザー削除完了")
                return True
            else:
                error_text = await response.text()
                print(f"削除失敗: {response.status} - {error_text}")
                return False

# レスポンス詳細情報の取得
async def detailed_response_info():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com/info') as response:
            print(f"ステータスコード: {response.status}")
            print(f"リーズン: {response.reason}")
            print(f"HTTPバージョン: {response.version}")
            print(f"URL: {response.url}")
            print(f"履歴: {response.history}")
            print(f"ヘッダー: {dict(response.headers)}")
            print(f"Content-Type: {response.content_type}")
            print(f"Charset: {response.charset}")

# 実行例
async def main():
    await basic_get_request()
    await get_with_params()
    await post_json_request()
    await post_form_request()
    await put_request()
    await delete_request()
    await detailed_response_info()

# asyncio.run(main())

高度な設定とカスタマイズ(ヘッダー、認証、タイムアウト等)

import aiohttp
import asyncio
import ssl

# カスタムヘッダーとセッション設定
async def custom_session_example():
    headers = {
        'User-Agent': 'MyApp/1.0 (aiohttp Python)',
        'Accept': 'application/json',
        'Accept-Language': 'ja-JP,en-US;q=0.9',
        'X-API-Version': 'v2',
        'X-Request-ID': 'req-12345'
    }
    
    timeout = aiohttp.ClientTimeout(total=30, connect=10)
    
    async with aiohttp.ClientSession(
        headers=headers,
        timeout=timeout,
        connector=aiohttp.TCPConnector(limit=100, ttl_dns_cache=300)
    ) as session:
        async with session.get('https://api.example.com/data') as response:
            data = await response.json()
            return data

# Basic認証の設定
async def basic_auth_example():
    auth = aiohttp.BasicAuth('username', 'password')
    
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com/private', auth=auth) as response:
            if response.status == 200:
                data = await response.json()
                print(f"認証成功: {data}")
                return data
            else:
                print(f"認証失敗: {response.status}")
                return None

# タイムアウト詳細設定
async def timeout_configuration():
    # 詳細なタイムアウト設定
    timeout = aiohttp.ClientTimeout(
        total=30,      # 全体タイムアウト(30秒)
        connect=5,     # 接続タイムアウト(5秒)
        sock_read=10,  # ソケット読み込みタイムアウト(10秒)
        sock_connect=5 # ソケット接続タイムアウト(5秒)
    )
    
    async with aiohttp.ClientSession(timeout=timeout) as session:
        try:
            async with session.get('https://api.example.com/slow-endpoint') as response:
                data = await response.json()
                return data
        except asyncio.TimeoutError:
            print("リクエストがタイムアウトしました")
            return None

# SSL証明書とHTTPS設定
async def ssl_configuration():
    # SSL証明書検証の無効化(開発時のみ)
    ssl_context = ssl.create_default_context()
    ssl_context.check_hostname = False
    ssl_context.verify_mode = ssl.CERT_NONE
    
    connector = aiohttp.TCPConnector(ssl=ssl_context)
    
    async with aiohttp.ClientSession(connector=connector) as session:
        async with session.get('https://self-signed.example.com/api') as response:
            data = await response.json()
            return data

# クライアント証明書認証
async def client_certificate_auth():
    ssl_context = ssl.create_default_context()
    ssl_context.load_cert_chain('/path/to/client.crt', '/path/to/client.key')
    
    connector = aiohttp.TCPConnector(ssl=ssl_context)
    
    async with aiohttp.ClientSession(connector=connector) as session:
        async with session.get('https://secure-api.example.com/data') as response:
            data = await response.json()
            return data

# プロキシ設定
async def proxy_configuration():
    # HTTPプロキシ設定
    proxy = 'http://proxy.example.com:8080'
    
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com/data', proxy=proxy) as response:
            data = await response.json()
            return data

# 認証付きプロキシ
async def authenticated_proxy():
    proxy = 'http://user:[email protected]:8080'
    
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com/data', proxy=proxy) as response:
            data = await response.json()
            return data

# Cookie管理
async def cookie_management():
    # Cookie Jarでセッション管理
    jar = aiohttp.CookieJar(unsafe=True)  # unsafe=Trueで非HTTPSクッキーも受け入れ
    
    async with aiohttp.ClientSession(cookie_jar=jar) as session:
        # ログインしてセッションCookieを取得
        login_data = {'username': 'user', 'password': 'pass'}
        async with session.post('https://api.example.com/login', json=login_data) as response:
            if response.status == 200:
                print("ログイン成功、Cookieが保存されました")
        
        # 後続のリクエストでCookieが自動的に送信される
        async with session.get('https://api.example.com/protected') as response:
            data = await response.json()
            return data

# カスタムコネクタ設定
async def custom_connector():
    connector = aiohttp.TCPConnector(
        limit=100,              # 総接続数制限
        limit_per_host=30,      # ホスト毎接続数制限
        ttl_dns_cache=300,      # DNS キャッシュTTL(秒)
        use_dns_cache=True,     # DNS キャッシュ使用
        keepalive_timeout=30,   # Keep-Alive タイムアウト
        enable_cleanup_closed=True  # 閉じた接続のクリーンアップ
    )
    
    async with aiohttp.ClientSession(connector=connector) as session:
        async with session.get('https://api.example.com/data') as response:
            data = await response.json()
            return data

# リトライ機能付きリクエスト
async def request_with_retry(url, max_retries=3, backoff_factor=1):
    async with aiohttp.ClientSession() as session:
        for attempt in range(max_retries + 1):
            try:
                async with session.get(url) as response:
                    if response.status == 200:
                        return await response.json()
                    elif response.status in [500, 502, 503, 504]:
                        if attempt < max_retries:
                            wait_time = backoff_factor * (2 ** attempt)
                            print(f"サーバーエラー {response.status}. {wait_time}秒後にリトライ...")
                            await asyncio.sleep(wait_time)
                            continue
                    
                    # リトライしないエラー
                    response.raise_for_status()
                    
            except aiohttp.ClientError as e:
                if attempt < max_retries:
                    wait_time = backoff_factor * (2 ** attempt)
                    print(f"接続エラー: {e}. {wait_time}秒後にリトライ...")
                    await asyncio.sleep(wait_time)
                else:
                    raise
        
        return None

エラーハンドリングとリトライ機能

import aiohttp
import asyncio
import logging
from typing import Optional, Dict, Any

# 包括的なエラーハンドリング
async def comprehensive_error_handling():
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get('https://api.example.com/users', timeout=10) as response:
                # HTTPステータスエラーを明示的にチェック
                response.raise_for_status()
                
                data = await response.json()
                return data
                
        except aiohttp.ClientConnectionError as e:
            print(f"接続エラー: {e}")
            print("ネットワーク接続やDNS設定を確認してください")
        except aiohttp.ClientTimeout as e:
            print(f"タイムアウトエラー: {e}")
            print("リクエストがタイムアウトしました")
        except aiohttp.ClientResponseError as e:
            print(f"HTTPレスポンスエラー: {e}")
            print(f"ステータスコード: {e.status}")
            print(f"レスポンステキスト: {await e.response.text()}")
        except aiohttp.ClientPayloadError as e:
            print(f"ペイロードエラー: {e}")
            print("レスポンスの読み込み中にエラーが発生しました")
        except aiohttp.ClientError as e:
            print(f"クライアントエラー: {e}")
            print("予期しないクライアントエラーが発生しました")
        except asyncio.TimeoutError:
            print("非同期処理がタイムアウトしました")
        except Exception as e:
            print(f"予期しないエラー: {e}")
        
        return None

# 高度なリトライクラス
class RetryableSession:
    def __init__(self, max_retries=3, backoff_factor=1.0, retry_statuses=None):
        self.max_retries = max_retries
        self.backoff_factor = backoff_factor
        self.retry_statuses = retry_statuses or [500, 502, 503, 504, 429]
        self.session = None
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()
    
    async def request_with_retry(self, method: str, url: str, **kwargs) -> Optional[aiohttp.ClientResponse]:
        for attempt in range(self.max_retries + 1):
            try:
                async with self.session.request(method, url, **kwargs) as response:
                    if response.status not in self.retry_statuses:
                        return response
                    
                    if attempt < self.max_retries:
                        wait_time = self.backoff_factor * (2 ** attempt)
                        print(f"ステータス {response.status} でリトライ {attempt + 1}/{self.max_retries}. {wait_time}秒待機...")
                        await asyncio.sleep(wait_time)
                        continue
                    else:
                        return response
                        
            except (aiohttp.ClientConnectionError, aiohttp.ClientTimeout) as e:
                if attempt < self.max_retries:
                    wait_time = self.backoff_factor * (2 ** attempt)
                    print(f"接続エラーでリトライ {attempt + 1}/{self.max_retries}: {e}. {wait_time}秒待機...")
                    await asyncio.sleep(wait_time)
                else:
                    raise
        
        return None

# リトライセッションの使用例
async def retry_session_example():
    async with RetryableSession(max_retries=3, backoff_factor=1.5) as retry_session:
        response = await retry_session.request_with_retry('GET', 'https://api.example.com/unstable')
        if response:
            data = await response.json()
            return data
        return None

# エラー別の詳細処理
async def detailed_error_processing():
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get('https://api.example.com/data') as response:
                if response.status == 200:
                    return await response.json()
                elif response.status == 401:
                    print("認証エラー: トークンを確認してください")
                    return {'error': 'authentication_required'}
                elif response.status == 403:
                    print("権限エラー: アクセス権限がありません")
                    return {'error': 'permission_denied'}
                elif response.status == 404:
                    print("見つかりません: リソースが存在しません")
                    return {'error': 'not_found'}
                elif response.status == 429:
                    # Rate limiting の処理
                    retry_after = response.headers.get('Retry-After')
                    if retry_after:
                        print(f"レート制限: {retry_after}秒後に再試行してください")
                        await asyncio.sleep(int(retry_after))
                        # 再試行ロジックをここに追加
                    return {'error': 'rate_limited'}
                elif response.status >= 500:
                    print("サーバーエラー: サーバー側の問題です")
                    return {'error': 'server_error'}
                else:
                    print(f"予期しないステータス: {response.status}")
                    return {'error': 'unexpected_status'}
                    
        except Exception as e:
            print(f"処理中にエラーが発生: {e}")
            return {'error': 'processing_failed'}

# レスポンス検証とエラーハンドリング
async def response_validation():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com/data') as response:
            # Content-Type チェック
            content_type = response.headers.get('Content-Type', '')
            if 'application/json' not in content_type:
                print(f"予期しないContent-Type: {content_type}")
                return None
            
            try:
                data = await response.json()
                
                # データ構造の検証
                if not isinstance(data, dict):
                    print("レスポンスがJSON オブジェクトではありません")
                    return None
                
                # 必須フィールドの確認
                required_fields = ['id', 'name', 'status']
                missing_fields = [field for field in required_fields if field not in data]
                if missing_fields:
                    print(f"必須フィールドが不足: {missing_fields}")
                    return None
                
                return data
                
            except aiohttp.ContentTypeError as e:
                print(f"JSON パースエラー: {e}")
                text_content = await response.text()
                print(f"レスポンス内容: {text_content[:200]}...")
                return None

# サーキットブレーカーパターンの実装
class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN
    
    async def call(self, func, *args, **kwargs):
        if self.state == 'OPEN':
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = 'HALF_OPEN'
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = await func(*args, **kwargs)
            self.on_success()
            return result
        except Exception as e:
            self.on_failure()
            raise
    
    def on_success(self):
        self.failure_count = 0
        self.state = 'CLOSED'
    
    def on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = 'OPEN'

# サーキットブレーカー使用例
circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)

async def protected_api_call():
    async def api_request():
        async with aiohttp.ClientSession() as session:
            async with session.get('https://api.example.com/unreliable') as response:
                return await response.json()
    
    try:
        return await circuit_breaker.call(api_request)
    except Exception as e:
        print(f"サーキットブレーカーによる保護: {e}")
        return None

並行処理と非同期リクエスト

import aiohttp
import asyncio
from typing import List, Dict, Any, Optional
import time

# 複数URLの並列取得
async def fetch_multiple_urls(urls: List[str]) -> List[Dict[str, Any]]:
    results = []
    
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            task = asyncio.create_task(fetch_single_url(session, url))
            tasks.append(task)
        
        completed_tasks = await asyncio.gather(*tasks, return_exceptions=True)
        
        for i, result in enumerate(completed_tasks):
            if isinstance(result, Exception):
                results.append({
                    'url': urls[i],
                    'success': False,
                    'error': str(result),
                    'status_code': None
                })
            else:
                results.append(result)
    
    return results

async def fetch_single_url(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]:
    try:
        start_time = time.time()
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
            end_time = time.time()
            
            content_length = len(await response.read())
            
            return {
                'url': url,
                'status_code': response.status,
                'success': response.status == 200,
                'content_length': content_length,
                'response_time': end_time - start_time,
                'headers': dict(response.headers)
            }
    except Exception as e:
        return {
            'url': url,
            'success': False,
            'error': str(e),
            'status_code': None
        }

# セマフォによる同時接続数制御
async def controlled_concurrent_requests(urls: List[str], max_concurrent: int = 5) -> List[Dict[str, Any]]:
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def fetch_with_semaphore(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]:
        async with semaphore:
            return await fetch_single_url(session, url)
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_semaphore(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    
    return results

# ページネーション対応の全データ取得
async def fetch_all_paginated_data(base_url: str, headers: Optional[Dict[str, str]] = None) -> List[Dict[str, Any]]:
    all_data = []
    page = 1
    has_more = True
    
    session_headers = headers or {}
    
    async with aiohttp.ClientSession(headers=session_headers) as session:
        while has_more:
            try:
                params = {'page': page, 'per_page': 100}
                async with session.get(base_url, params=params) as response:
                    if response.status == 200:
                        page_data = await response.json()
                        
                        if isinstance(page_data, dict) and 'items' in page_data:
                            items = page_data['items']
                            if not items:
                                break
                            all_data.extend(items)
                            has_more = page_data.get('has_more', False)
                        elif isinstance(page_data, list):
                            if not page_data:
                                break
                            all_data.extend(page_data)
                            has_more = len(page_data) == 100  # Full page suggests more data
                        else:
                            break
                        
                        print(f"ページ {page} 完了: {len(page_data if isinstance(page_data, list) else items)}件")
                        page += 1
                        
                        # API負荷軽減
                        await asyncio.sleep(0.1)
                        
                    else:
                        print(f"ページ {page} でエラー: {response.status}")
                        break
                        
            except Exception as e:
                print(f"ページ {page} で例外: {e}")
                break
    
    print(f"合計 {len(all_data)} 件のデータを取得")
    return all_data

# 段階的なデータ処理(レスポンス依存)
async def dependent_requests_pipeline():
    async with aiohttp.ClientSession() as session:
        # Step 1: ユーザー情報取得
        async with session.get('https://api.example.com/user/profile') as response:
            if response.status != 200:
                return {'error': 'Failed to get user profile'}
            
            user_profile = await response.json()
            user_id = user_profile['id']
        
        # Step 2: ユーザーの投稿一覧を並列取得
        posts_tasks = []
        for category in ['tech', 'personal', 'work']:
            task = session.get(f'https://api.example.com/users/{user_id}/posts/{category}')
            posts_tasks.append(task)
        
        posts_responses = await asyncio.gather(*posts_tasks)
        
        # Step 3: 各カテゴリの投稿データを処理
        all_posts = {}
        for i, response in enumerate(posts_responses):
            category = ['tech', 'personal', 'work'][i]
            async with response:
                if response.status == 200:
                    posts_data = await response.json()
                    all_posts[category] = posts_data
                else:
                    all_posts[category] = []
        
        return {
            'user_profile': user_profile,
            'posts': all_posts
        }

# WebSocket クライアント機能
async def websocket_client_example():
    session = aiohttp.ClientSession()
    
    try:
        async with session.ws_connect('wss://api.example.com/websocket') as ws:
            print("WebSocket接続が確立されました")
            
            # 初期メッセージ送信
            await ws.send_str(json.dumps({
                'type': 'subscribe',
                'channel': 'updates'
            }))
            
            # メッセージ受信ループ
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    data = json.loads(msg.data)
                    print(f"受信: {data}")
                    
                    # 特定メッセージに対する応答
                    if data.get('type') == 'ping':
                        await ws.send_str(json.dumps({'type': 'pong'}))
                        
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    print(f"WebSocketエラー: {ws.exception()}")
                    break
                elif msg.type == aiohttp.WSMsgType.CLOSE:
                    print("WebSocket接続が閉じられました")
                    break
                    
    except Exception as e:
        print(f"WebSocket接続エラー: {e}")
    finally:
        await session.close()

# ストリーミングダウンロード
async def streaming_download(url: str, file_path: str, chunk_size: int = 8192):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            if response.status == 200:
                total_size = int(response.headers.get('Content-Length', 0))
                downloaded = 0
                
                with open(file_path, 'wb') as file:
                    async for chunk in response.content.iter_chunked(chunk_size):
                        file.write(chunk)
                        downloaded += len(chunk)
                        
                        if total_size > 0:
                            progress = (downloaded / total_size) * 100
                            print(f"ダウンロード進捗: {progress:.1f}%", end='\r')
                
                print(f"\nダウンロード完了: {file_path}")
                return True
            else:
                print(f"ダウンロード失敗: {response.status}")
                return False

# バッチ処理によるリクエスト最適化
async def batch_process_requests(items: List[str], batch_size: int = 10):
    all_results = []
    
    async with aiohttp.ClientSession() as session:
        for i in range(0, len(items), batch_size):
            batch = items[i:i + batch_size]
            print(f"バッチ {i//batch_size + 1} 処理中: {len(batch)}項目")
            
            # バッチ内の並列処理
            tasks = []
            for item in batch:
                task = fetch_single_url(session, f'https://api.example.com/items/{item}')
                tasks.append(task)
            
            batch_results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # 結果の処理
            for result in batch_results:
                if isinstance(result, Exception):
                    all_results.append({'success': False, 'error': str(result)})
                else:
                    all_results.append(result)
            
            # バッチ間の待機
            if i + batch_size < len(items):
                await asyncio.sleep(0.5)
    
    return all_results

# 使用例
async def main_parallel_example():
    urls = [
        'https://api.example.com/users',
        'https://api.example.com/posts',
        'https://api.example.com/comments',
        'https://api.example.com/categories'
    ]
    
    print("並列リクエスト実行中...")
    results = await fetch_multiple_urls(urls)
    
    successful_requests = [r for r in results if r['success']]
    print(f"成功: {len(successful_requests)}/{len(urls)}")
    
    print("\n同時接続制御テスト...")
    controlled_results = await controlled_concurrent_requests(urls, max_concurrent=2)
    
    print("\nページネーションテスト...")
    paginated_data = await fetch_all_paginated_data('https://api.example.com/posts')
    
    print("\n依存リクエストパイプライン...")
    pipeline_result = await dependent_requests_pipeline()

# asyncio.run(main_parallel_example())

フレームワーク統合と実用例

import aiohttp
import asyncio
import json
from typing import Optional, Dict, Any, List
from pathlib import Path
import mimetypes

# FastAPI統合用の高度なAPIクライアント
class AsyncAPIClient:
    def __init__(self, base_url: str, token: Optional[str] = None):
        self.base_url = base_url.rstrip('/')
        self.token = token
        self.session = None
    
    async def __aenter__(self):
        headers = {
            'Content-Type': 'application/json',
            'Accept': 'application/json',
            'User-Agent': 'AsyncAPIClient/1.0 (aiohttp)'
        }
        
        if self.token:
            headers['Authorization'] = f'Bearer {self.token}'
        
        timeout = aiohttp.ClientTimeout(total=30, connect=5)
        connector = aiohttp.TCPConnector(limit=100, ttl_dns_cache=300)
        
        self.session = aiohttp.ClientSession(
            headers=headers,
            timeout=timeout,
            connector=connector
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    async def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]:
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        async with self.session.get(url, params=params) as response:
            response.raise_for_status()
            return await response.json()
    
    async def post(self, endpoint: str, data: Optional[Dict] = None, json_data: Optional[Dict] = None) -> Dict[str, Any]:
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        kwargs = {}
        if json_data:
            kwargs['json'] = json_data
        if data:
            kwargs['data'] = data
            
        async with self.session.post(url, **kwargs) as response:
            response.raise_for_status()
            return await response.json()
    
    async def put(self, endpoint: str, data: Optional[Dict] = None) -> Dict[str, Any]:
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        async with self.session.put(url, json=data) as response:
            response.raise_for_status()
            return await response.json()
    
    async def delete(self, endpoint: str) -> bool:
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        async with self.session.delete(url) as response:
            response.raise_for_status()
            return response.status == 204

# 使用例
async def api_client_example():
    async with AsyncAPIClient('https://api.example.com/v1', token='your-jwt-token') as client:
        # ユーザー一覧取得
        users = await client.get('users', params={'page': 1, 'limit': 50})
        print(f"取得ユーザー数: {len(users)}")
        
        # 新しいユーザー作成
        new_user = await client.post('users', json_data={
            'name': '田中太郎',
            'email': '[email protected]',
            'department': '開発部'
        })
        print(f"作成されたユーザー: {new_user['id']}")
        
        # ユーザー情報更新
        updated_user = await client.put(f'users/{new_user["id"]}', data={
            'name': '田中次郎',
            'department': '技術部'
        })
        print(f"更新完了: {updated_user['name']}")

# マルチパートファイルアップロード
async def upload_file_multipart(file_path: Path, upload_url: str, additional_fields: Optional[Dict[str, str]] = None):
    """マルチパート形式でのファイルアップロード"""
    
    # ファイルのMIMEタイプを自動判定
    mime_type, _ = mimetypes.guess_type(str(file_path))
    if not mime_type:
        mime_type = 'application/octet-stream'
    
    data = aiohttp.FormData()
    data.add_field('file', 
                   file_path.open('rb'), 
                   filename=file_path.name,
                   content_type=mime_type)
    
    # 追加フィールドの設定
    if additional_fields:
        for key, value in additional_fields.items():
            data.add_field(key, value)
    
    headers = {'Authorization': 'Bearer your-upload-token'}
    
    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(upload_url, data=data, headers=headers) as response:
                response.raise_for_status()
                result = await response.json()
                print(f"アップロード成功: {result}")
                return result
        except aiohttp.ClientError as e:
            print(f"アップロードエラー: {e}")
            raise
        finally:
            # ファイルハンドルのクリーンアップ
            for field in data._fields:
                if hasattr(field[2], 'close'):
                    field[2].close()

# プログレス付きファイルアップロード
async def upload_with_progress(file_path: Path, upload_url: str):
    """進捗表示付きファイルアップロード"""
    
    file_size = file_path.stat().st_size
    uploaded = 0
    
    class ProgressReader:
        def __init__(self, file_obj):
            self.file_obj = file_obj
        
        async def __aenter__(self):
            return self
        
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            self.file_obj.close()
        
        def read(self, size=-1):
            nonlocal uploaded
            chunk = self.file_obj.read(size)
            if chunk:
                uploaded += len(chunk)
                progress = (uploaded / file_size) * 100
                print(f"アップロード進捗: {progress:.1f}%", end='\r')
            return chunk
    
    async with ProgressReader(file_path.open('rb')) as progress_file:
        data = aiohttp.FormData()
        data.add_field('file', progress_file, filename=file_path.name)
        
        async with aiohttp.ClientSession() as session:
            async with session.post(upload_url, data=data) as response:
                response.raise_for_status()
                print(f"\nアップロード完了: {file_path.name}")
                return await response.json()

# 認証トークン自動更新機能
class TokenManager:
    def __init__(self, client_id: str, client_secret: str, token_url: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_url = token_url
        self.access_token = None
        self.refresh_token = None
        self.token_expires_at = None
    
    async def get_valid_token(self) -> str:
        if self.access_token and self.token_expires_at:
            import time
            if time.time() < self.token_expires_at:
                return self.access_token
        
        await self._refresh_token()
        return self.access_token
    
    async def _refresh_token(self):
        auth_data = {
            'client_id': self.client_id,
            'client_secret': self.client_secret,
            'grant_type': 'client_credentials'
        }
        
        async with aiohttp.ClientSession() as session:
            async with session.post(self.token_url, data=auth_data) as response:
                response.raise_for_status()
                token_data = await response.json()
                
                self.access_token = token_data['access_token']
                self.refresh_token = token_data.get('refresh_token')
                
                import time
                expires_in = token_data.get('expires_in', 3600)
                self.token_expires_at = time.time() + expires_in - 60  # 60秒のバッファ

# 認証付きHTTPクライアント
class AuthenticatedClient:
    def __init__(self, base_url: str, token_manager: TokenManager):
        self.base_url = base_url.rstrip('/')
        self.token_manager = token_manager
        self.session = None
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()
    
    async def _make_request(self, method: str, endpoint: str, **kwargs):
        token = await self.token_manager.get_valid_token()
        headers = kwargs.pop('headers', {})
        headers['Authorization'] = f'Bearer {token}'
        
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        
        async with self.session.request(method, url, headers=headers, **kwargs) as response:
            if response.status == 401:
                # トークンを強制更新して再試行
                await self.token_manager._refresh_token()
                token = await self.token_manager.get_valid_token()
                headers['Authorization'] = f'Bearer {token}'
                
                async with self.session.request(method, url, headers=headers, **kwargs) as retry_response:
                    retry_response.raise_for_status()
                    return await retry_response.json()
            else:
                response.raise_for_status()
                return await response.json()

# サーバーサイドイベント(SSE)クライアント
async def sse_client(url: str):
    """Server-Sent Events クライアント"""
    
    async with aiohttp.ClientSession() as session:
        async with session.get(url, headers={'Accept': 'text/event-stream'}) as response:
            if response.status == 200:
                print("SSE接続が確立されました")
                
                async for line in response.content:
                    line = line.decode('utf-8').strip()
                    
                    if line.startswith('data: '):
                        event_data = line[6:]  # 'data: ' プレフィックスを削除
                        try:
                            data = json.loads(event_data)
                            print(f"イベント受信: {data}")
                            
                            # イベントタイプ別の処理
                            event_type = data.get('type')
                            if event_type == 'user_update':
                                await handle_user_update(data)
                            elif event_type == 'system_notification':
                                await handle_system_notification(data)
                                
                        except json.JSONDecodeError:
                            print(f"テキストイベント: {event_data}")
                    
                    elif line.startswith('event: '):
                        event_type = line[7:]
                        print(f"イベントタイプ: {event_type}")
            else:
                print(f"SSE接続失敗: {response.status}")

async def handle_user_update(data):
    print(f"ユーザー更新: {data}")

async def handle_system_notification(data):
    print(f"システム通知: {data}")

# FastAPI サーバーとの統合例
from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

# バックグラウンドタスクでHTTPリクエスト実行
async def background_api_call(user_id: int):
    async with AsyncAPIClient('https://external-api.example.com') as client:
        user_data = await client.get(f'users/{user_id}')
        # データ処理ロジック
        processed_data = process_user_data(user_data)
        
        # 結果を別のAPIに送信
        await client.post('analytics/user-activity', json_data=processed_data)

@app.post("/trigger-analysis/{user_id}")
async def trigger_user_analysis(user_id: int, background_tasks: BackgroundTasks):
    background_tasks.add_task(background_api_call, user_id)
    return {"message": "Analysis started in background"}

def process_user_data(user_data):
    # データ処理ロジック
    return {
        "user_id": user_data["id"],
        "activity_score": calculate_activity_score(user_data),
        "processed_at": datetime.now().isoformat()
    }

def calculate_activity_score(user_data):
    # アクティビティスコア計算
    return user_data.get("login_count", 0) * 10

# 使用例
async def integration_examples():
    # API クライアント例
    await api_client_example()
    
    # ファイルアップロード例
    file_path = Path("/path/to/document.pdf")
    if file_path.exists():
        await upload_file_multipart(
            file_path, 
            'https://api.example.com/upload',
            additional_fields={'category': 'documents', 'public': 'false'}
        )
    
    # SSE クライアント例
    # await sse_client('https://api.example.com/events')

# asyncio.run(integration_examples())