HTTPX

PythonのモダンHTTPクライアント。RequestsライクAPIに加えて、async/await完全サポート、HTTP/2対応、WebSocket接続機能を提供。同期・非同期両方のAPIを統一インターフェースで利用可能な次世代HTTPライブラリ。

HTTPクライアントPython非同期HTTP/2モダン

GitHub概要

encode/httpx

A next generation HTTP client for Python. 🦋

スター14,326
ウォッチ115
フォーク927
作成日:2019年4月4日
言語:Python
ライセンス:BSD 3-Clause "New" or "Revised" License

トピックス

asynciohttppythontrio

スター履歴

encode/httpx Star History
データ取得日時: 2025/7/18 01:38

ライブラリ

HTTPX

概要

HTTPXは「次世代PythonHTTPクライアント」として開発された、requestsライブラリの現代的な後継者です。同期・非同期の両方のAPIを統一インターフェースで提供し、HTTP/1.1とHTTP/2の完全サポート、優れたストリーミング機能、包括的な認証システムを実装。requestsライブラリとの高い互換性を保ちながら、現代のWebアプリケーション要件に対応する高性能HTTP通信ライブラリとして、Pythonエコシステムでの採用が急速に拡大しています。

詳細

HTTPX 2025年版は非同期プログラミングの普及とHTTP/2の標準化を背景に、Python HTTP通信の新しいスタンダードとして確立されています。FastAPI、Starlette等のASGI フレームワークとの統合で特に威力を発揮し、高性能非同期Webアプリケーション開発を支援。requestsと同等のシンプルなAPIながら、async/await対応、HTTP/2サポート、ストリーミング処理、カスタマイズ可能なTransport層など、モダンなHTTP通信要件を満たす豊富な機能を提供します。

主な特徴

  • 統一API: 同期・非同期の両方で一貫したインターフェース
  • HTTP/2完全サポート: マルチプレクシングと性能向上
  • requests互換性: 既存コードの移行が容易
  • ストリーミング処理: 大容量データの効率的な処理
  • 柔軟なTransport: カスタマイズ可能な通信層
  • 包括的認証: Basic、Digest、NetRC、カスタム認証対応

メリット・デメリット

メリット

  • requests互換APIによる既存コードからの容易な移行
  • 同期・非同期の統一インターフェースで学習コストを削減
  • HTTP/2サポートによる高速化とコネクション効率向上
  • 非同期処理により大量のHTTPリクエストを効率的に処理
  • モダンなPython開発パターン(type hints、async/await)への対応
  • FastAPI等の最新フレームワークとの優れた統合性

デメリット

  • requestsほどの普及率とエコシステム成熟度に到達していない
  • HTTP/2の恩恵を受けるためにはサーバー側対応が必要
  • 非同期プログラミングの理解が必要で初学者には複雑
  • 一部のrequests拡張ライブラリが未対応
  • ドキュメントと学習リソースがrequestsより限定的
  • パフォーマンス向上は用途により効果に差がある

参考ページ

書き方の例

インストールと基本セットアップ

# HTTPXのインストール
pip install httpx

# HTTP/2サポート付きインストール(推奨)
pip install 'httpx[http2]'

# 開発版・最新機能付きインストール
pip install 'httpx[all]'

# 依存関係確認
python -c "import httpx; print(httpx.__version__)"

基本的なリクエスト(GET/POST/PUT/DELETE)

import httpx

# 基本的なGETリクエスト(同期)
response = httpx.get('https://api.example.com/users')
print(response.status_code)  # 200
print(response.headers['content-type'])  # application/json
print(response.text)  # レスポンステキスト
data = response.json()  # JSONとして解析
print(data)

# URLパラメータ付きGETリクエスト
params = {'page': 1, 'limit': 10, 'sort': 'created_at'}
response = httpx.get('https://api.example.com/users', params=params)
print(response.url)  # https://api.example.com/users?page=1&limit=10&sort=created_at

# POSTリクエスト(JSON送信)
user_data = {
    'name': '田中太郎',
    'email': '[email protected]',
    'age': 30
}

response = httpx.post(
    'https://api.example.com/users',
    json=user_data,  # 自動的にContent-Type: application/jsonが設定される
    headers={'Authorization': 'Bearer your-token'}
)

if response.status_code == 201:
    created_user = response.json()
    print(f"ユーザー作成完了: ID={created_user['id']}")
else:
    print(f"エラー: {response.status_code} - {response.text}")

# POSTリクエスト(フォームデータ送信)
form_data = {'username': 'testuser', 'password': 'secret123'}
response = httpx.post('https://api.example.com/login', data=form_data)

# PUTリクエスト(データ更新)
updated_data = {'name': '田中次郎', 'email': '[email protected]'}
response = httpx.put(
    'https://api.example.com/users/123',
    json=updated_data,
    headers={'Authorization': 'Bearer your-token'}
)

# DELETEリクエスト
response = httpx.delete(
    'https://api.example.com/users/123',
    headers={'Authorization': 'Bearer your-token'}
)

if response.status_code == 204:
    print("ユーザー削除完了")

# レスポンス属性の詳細確認
print(f"ステータスコード: {response.status_code}")
print(f"リーズン: {response.reason_phrase}")
print(f"ヘッダー: {response.headers}")
print(f"エンコーディング: {response.encoding}")
print(f"リクエストURL: {response.url}")
print(f"HTTPバージョン: {response.http_version}")

# 非同期リクエストの基本例
import asyncio

async def async_request_example():
    async with httpx.AsyncClient() as client:
        response = await client.get('https://api.example.com/users')
        data = response.json()
        print(f"非同期取得データ: {len(data)}件")
        return data

# 実行例
# asyncio.run(async_request_example())

高度な設定とカスタマイズ(ヘッダー、認証、タイムアウト等)

import httpx
import ssl

# カスタムヘッダーの設定
headers = {
    'User-Agent': 'MyApp/1.0 (HTTPX Python)',
    'Accept': 'application/json',
    'Accept-Language': 'ja-JP,en-US',
    'X-API-Version': 'v2',
    'X-Request-ID': 'req-12345'
}

response = httpx.get('https://api.example.com/data', headers=headers)

# Basic認証
auth = httpx.BasicAuth('username', 'password')
response = httpx.get('https://api.example.com/private', auth=auth)

# または認証タプル(requests互換)
response = httpx.get('https://api.example.com/private', auth=('username', 'password'))

# Digest認証
digest_auth = httpx.DigestAuth('username', 'password')
response = httpx.get('https://api.example.com/digest', auth=digest_auth)

# NetRC認証(.netrcファイル使用)
netrc_auth = httpx.NetRCAuth()
response = httpx.get('https://api.example.com/netrc', auth=netrc_auth)

# Bearer Token認証
headers = {'Authorization': 'Bearer your-jwt-token'}
response = httpx.get('https://api.example.com/protected', headers=headers)

# タイムアウト設定
try:
    # 接続タイムアウト5秒、読み込みタイムアウト10秒
    timeout = httpx.Timeout(5.0, read=10.0)
    response = httpx.get('https://api.example.com/slow', timeout=timeout)
    
    # シンプルなタイムアウト(全体で15秒)
    response = httpx.get('https://api.example.com/data', timeout=15.0)
    
    # タイムアウト無効化
    response = httpx.get('https://api.example.com/unlimited', timeout=None)
    
except httpx.TimeoutException:
    print("リクエストがタイムアウトしました")

# SSL設定とクライアント証明書
ssl_context = ssl.create_default_context()
ssl_context.load_cert_chain('/path/to/client.pem')

response = httpx.get(
    'https://secure-api.example.com/data',
    verify=ssl_context  # カスタムSSLコンテキスト
)

# SSL証明書検証の無効化(開発時のみ)
response = httpx.get('https://self-signed.example.com/', verify=False)

# プロキシ設定
proxies = {
    'http://': 'http://proxy.example.com:8080',
    'https://': 'http://proxy.example.com:8080'
}

response = httpx.get('https://api.example.com/data', proxies=proxies)

# 認証付きプロキシ
proxies = {
    'http://': 'http://user:[email protected]:8080',
    'https://': 'http://user:[email protected]:8080'
}

# Cookie設定
cookies = {'session_id': 'abc123', 'user_pref': 'dark_mode'}
response = httpx.get('https://api.example.com/user-data', cookies=cookies)

# リダイレクト制御
response = httpx.get(
    'https://api.example.com/redirect',
    follow_redirects=False  # リダイレクトを無効化
)

# HTTP/2有効化
with httpx.Client(http2=True) as client:
    response = client.get('https://api.example.com/http2-endpoint')
    print(f"HTTPバージョン: {response.http_version}")

# 詳細なタイムアウト設定
timeout = httpx.Timeout(
    connect=5.0,  # 接続タイムアウト
    read=10.0,    # 読み込みタイムアウト
    write=5.0,    # 書き込みタイムアウト
    pool=15.0     # プールタイムアウト
)

with httpx.Client(timeout=timeout) as client:
    response = client.get('https://api.example.com/data')

エラーハンドリングとリトライ機能

import httpx
import asyncio
import time
from typing import Optional

# 包括的なエラーハンドリング
def safe_request(url: str, **kwargs) -> Optional[httpx.Response]:
    try:
        response = httpx.get(url, **kwargs)
        
        # HTTPステータスコードのチェック
        response.raise_for_status()  # 4xx/5xxでHTTPStatusErrorを発生
        
        return response
        
    except httpx.ConnectError as e:
        print(f"接続エラー: {e}")
        print("ネットワーク接続を確認してください")
    except httpx.TimeoutException as e:
        print(f"タイムアウトエラー: {e}")
        print("リクエストがタイムアウトしました")
    except httpx.HTTPStatusError as e:
        print(f"HTTPエラー: {e}")
        print(f"ステータスコード: {e.response.status_code}")
        print(f"レスポンス: {e.response.text}")
    except httpx.TooManyRedirects as e:
        print(f"リダイレクトエラー: {e}")
        print("リダイレクトが多すぎます")
    except httpx.RequestError as e:
        print(f"リクエストエラー: {e}")
        print("予期しないエラーが発生しました")
    
    return None

# 使用例
response = safe_request('https://api.example.com/data', timeout=10.0)
if response:
    data = response.json()
    print(data)

# 手動リトライ実装(同期版)
def request_with_retry(url: str, max_retries: int = 3, backoff_factor: float = 1.0, **kwargs) -> httpx.Response:
    for attempt in range(max_retries + 1):
        try:
            response = httpx.get(url, **kwargs)
            response.raise_for_status()
            return response
            
        except httpx.RequestError as e:
            if attempt == max_retries:
                print(f"最大試行回数に達しました: {e}")
                raise
            
            wait_time = backoff_factor * (2 ** attempt)
            print(f"試行 {attempt + 1} 失敗. {wait_time}秒後に再試行...")
            time.sleep(wait_time)

# 非同期リトライ実装
async def async_request_with_retry(
    client: httpx.AsyncClient, 
    url: str, 
    max_retries: int = 3, 
    backoff_factor: float = 1.0, 
    **kwargs
) -> httpx.Response:
    for attempt in range(max_retries + 1):
        try:
            response = await client.get(url, **kwargs)
            response.raise_for_status()
            return response
            
        except httpx.RequestError as e:
            if attempt == max_retries:
                print(f"最大試行回数に達しました: {e}")
                raise
            
            wait_time = backoff_factor * (2 ** attempt)
            print(f"試行 {attempt + 1} 失敗. {wait_time}秒後に再試行...")
            await asyncio.sleep(wait_time)

# 使用例
try:
    response = request_with_retry(
        'https://api.example.com/unstable',
        max_retries=3,
        backoff_factor=1.0,
        timeout=10.0
    )
    print("リクエスト成功:", response.status_code)
except httpx.RequestError as e:
    print("最終的に失敗:", e)

# カスタム認証とエラーハンドリング
class CustomAuthWithRetry(httpx.Auth):
    def __init__(self, token: str):
        self.token = token
    
    def auth_flow(self, request):
        # 最初にトークンを設定
        request.headers['X-Authentication'] = self.token
        response = yield request
        
        # 401エラーの場合、新しいトークンで再試行
        if response.status_code == 401:
            # 新しいトークンを取得(実際の実装では外部API呼び出し)
            new_token = self.refresh_token()
            request.headers['X-Authentication'] = new_token
            yield request
    
    def refresh_token(self) -> str:
        # トークンリフレッシュのロジック
        return "new-refreshed-token"

# ステータスコード別処理
response = httpx.get('https://api.example.com/status-check')

if response.status_code == 200:
    print("正常: ", response.json())
elif response.status_code == 401:
    print("認証エラー: トークンを確認してください")
elif response.status_code == 403:
    print("権限エラー: アクセス権限がありません")
elif response.status_code == 404:
    print("見つかりません: リソースが存在しません")
elif response.status_code == 429:
    print("レート制限: しばらく待ってから再試行してください")
elif response.status_code >= 500:
    print("サーバーエラー: サーバー側の問題です")
else:
    print(f"予期しないステータス: {response.status_code}")

# HTTPステータスエラーの詳細処理
try:
    response = httpx.get('https://api.example.com/may-fail')
    response.raise_for_status()
except httpx.HTTPStatusError as exc:
    print(f"エラーレスポンス {exc.response.status_code} 取得中: {exc.request.url!r}")
    # レスポンス内容を確認
    error_details = exc.response.json() if exc.response.headers.get('content-type') == 'application/json' else exc.response.text
    print(f"エラー詳細: {error_details}")

並行処理と非同期リクエスト

import httpx
import asyncio
from typing import List, Dict, Any

# 非同期クライアントを使用した並列リクエスト
async def fetch_multiple_urls_async(urls: List[str]) -> List[Dict[str, Any]]:
    async with httpx.AsyncClient() as client:
        tasks = []
        for url in urls:
            task = asyncio.create_task(fetch_url_async(client, url))
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 結果の処理
        processed_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                processed_results.append({
                    'url': urls[i],
                    'success': False,
                    'error': str(result)
                })
            else:
                processed_results.append(result)
        
        return processed_results

async def fetch_url_async(client: httpx.AsyncClient, url: str) -> Dict[str, Any]:
    try:
        response = await client.get(url, timeout=10.0)
        response.raise_for_status()
        return {
            'url': url,
            'status_code': response.status_code,
            'content_length': len(response.content),
            'success': True,
            'data': response.json() if 'application/json' in response.headers.get('content-type', '') else None
        }
    except httpx.RequestError as e:
        return {
            'url': url,
            'success': False,
            'error': str(e)
        }

# 使用例
urls = [
    'https://api.example.com/users',
    'https://api.example.com/posts',
    'https://api.example.com/comments',
    'https://api.example.com/categories'
]

# async環境での実行
# results = await fetch_multiple_urls_async(urls)
# successful_results = [r for r in results if r['success']]
# print(f"成功: {len(successful_results)}/{len(urls)}")

# セマフォを使用した同時接続数制御
async def fetch_with_semaphore(urls: List[str], max_concurrent: int = 5) -> List[Dict[str, Any]]:
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def fetch_with_limit(client: httpx.AsyncClient, url: str) -> Dict[str, Any]:
        async with semaphore:
            return await fetch_url_async(client, url)
    
    async with httpx.AsyncClient() as client:
        tasks = [fetch_with_limit(client, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        return [r if not isinstance(r, Exception) else {'url': 'unknown', 'success': False, 'error': str(r)} for r in results]

# ページネーション対応の非同期データ取得
async def fetch_all_pages_async(base_url: str, headers: Dict[str, str] = None, max_pages: int = None) -> List[Dict[str, Any]]:
    all_data = []
    page = 1
    
    async with httpx.AsyncClient() as client:
        if headers:
            client.headers.update(headers)
        
        while True:
            try:
                params = {'page': page, 'per_page': 100}
                response = await client.get(base_url, params=params, timeout=10.0)
                response.raise_for_status()
                
                data = response.json()
                
                if not data or (isinstance(data, list) and len(data) == 0):
                    break  # データがない場合は終了
                
                if isinstance(data, dict) and 'items' in data:
                    items = data['items']
                    all_data.extend(items)
                    
                    # 次のページがない場合は終了
                    if not data.get('has_more', True) or len(items) == 0:
                        break
                else:
                    all_data.extend(data)
                
                print(f"ページ {page} 取得完了: {len(data if isinstance(data, list) else data.get('items', []))}件")
                page += 1
                
                # 最大ページ制限チェック
                if max_pages and page > max_pages:
                    break
                
                # API負荷軽減のための待機
                await asyncio.sleep(0.1)
                
            except httpx.RequestError as e:
                print(f"ページ {page} でエラー: {e}")
                break
    
    print(f"総取得件数: {len(all_data)}件")
    return all_data

# ストリーミングレスポンスの処理
async def stream_large_response(url: str) -> None:
    async with httpx.AsyncClient() as client:
        async with client.stream('GET', url) as response:
            response.raise_for_status()
            
            # ヘッダー情報の確認
            content_length = response.headers.get('content-length')
            if content_length:
                print(f"コンテンツサイズ: {int(content_length):,} bytes")
            
            # チャンクごとの処理
            downloaded = 0
            async for chunk in response.aiter_bytes(chunk_size=8192):
                downloaded += len(chunk)
                print(f"ダウンロード済み: {downloaded:,} bytes", end='\r')
                # ここで実際のチャンク処理(ファイル書き込み等)を行う
            
            print(f"\nダウンロード完了: {downloaded:,} bytes")

# バッチ処理による効率的なリクエスト
async def process_items_in_batches(items: List[str], batch_size: int = 10) -> List[Dict[str, Any]]:
    all_results = []
    
    async with httpx.AsyncClient() as client:
        for i in range(0, len(items), batch_size):
            batch = items[i:i + batch_size]
            print(f"バッチ {i//batch_size + 1} 処理中: {len(batch)}件")
            
            # バッチ内の並列処理
            tasks = [fetch_url_async(client, f'https://api.example.com/items/{item}') for item in batch]
            batch_results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # エラーハンドリング
            for result in batch_results:
                if isinstance(result, Exception):
                    all_results.append({'success': False, 'error': str(result)})
                else:
                    all_results.append(result)
            
            # バッチ間の待機
            if i + batch_size < len(items):
                await asyncio.sleep(1.0)
    
    return all_results

# WebSocketライクなストリーミング処理
async def handle_streaming_events(url: str) -> None:
    async with httpx.AsyncClient() as client:
        async with client.stream('GET', url) as response:
            async for line in response.aiter_lines():
                if line.startswith('data: '):
                    # Server-Sent Events形式のデータ処理
                    event_data = line[6:]  # 'data: 'プレフィックスを削除
                    try:
                        import json
                        data = json.loads(event_data)
                        print(f"イベント受信: {data}")
                        # ここでイベントデータの処理を行う
                    except json.JSONDecodeError:
                        print(f"テキストイベント: {event_data}")

フレームワーク統合と実用例

import httpx
import asyncio
from typing import Optional, Dict, Any
import json
from pathlib import Path

# FastAPI統合用の非同期HTTPクライアント
class AsyncAPIClient:
    def __init__(self, base_url: str, token: Optional[str] = None, http2: bool = True):
        self.base_url = base_url.rstrip('/')
        self.token = token
        self.http2 = http2
    
    async def __aenter__(self):
        headers = {
            'Content-Type': 'application/json',
            'Accept': 'application/json',
            'User-Agent': 'AsyncAPIClient/1.0'
        }
        
        if self.token:
            headers['Authorization'] = f'Bearer {self.token}'
        
        self.client = httpx.AsyncClient(
            headers=headers,
            timeout=httpx.Timeout(30.0),
            http2=self.http2,
            follow_redirects=True
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.client.aclose()
    
    async def get(self, endpoint: str, **kwargs) -> Dict[str, Any]:
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        return await self._make_request('GET', url, **kwargs)
    
    async def post(self, endpoint: str, data: Optional[Dict] = None, **kwargs) -> Dict[str, Any]:
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        return await self._make_request('POST', url, json=data, **kwargs)
    
    async def put(self, endpoint: str, data: Optional[Dict] = None, **kwargs) -> Dict[str, Any]:
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        return await self._make_request('PUT', url, json=data, **kwargs)
    
    async def delete(self, endpoint: str, **kwargs) -> Optional[Dict[str, Any]]:
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        return await self._make_request('DELETE', url, **kwargs)
    
    async def _make_request(self, method: str, url: str, **kwargs) -> Optional[Dict[str, Any]]:
        try:
            response = await self.client.request(method, url, **kwargs)
            response.raise_for_status()
            
            if response.content:
                return response.json()
            return None
            
        except httpx.RequestError as e:
            print(f"API Error: {method} {url} - {e}")
            raise

# 使用例
async def api_client_example():
    async with AsyncAPIClient('https://api.example.com/v1', token='your-jwt-token') as client:
        # ユーザー一覧取得
        users = await client.get('users', params={'page': 1, 'limit': 50})
        
        # 新しいユーザー作成
        new_user = await client.post('users', data={
            'name': '田中太郎',
            'email': '[email protected]'
        })
        
        # ユーザー更新
        updated_user = await client.put(f'users/{new_user["id"]}', data={
            'name': '田中次郎'
        })

# カスタム認証クラス(トークン自動更新)
class AutoRefreshAuth(httpx.Auth):
    def __init__(self, client_id: str, client_secret: str, token_url: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_url = token_url
        self.access_token = None
        self.refresh_token = None
    
    def auth_flow(self, request):
        # 初回または無効なトークンの場合、新しいトークンを取得
        if not self.access_token:
            self._get_new_token()
        
        request.headers['Authorization'] = f'Bearer {self.access_token}'
        response = yield request
        
        # 401エラーの場合、トークンをリフレッシュして再試行
        if response.status_code == 401:
            self._refresh_access_token()
            request.headers['Authorization'] = f'Bearer {self.access_token}'
            yield request
    
    def _get_new_token(self):
        # クライアント認証でトークン取得
        auth_data = {
            'client_id': self.client_id,
            'client_secret': self.client_secret,
            'grant_type': 'client_credentials'
        }
        
        response = httpx.post(self.token_url, data=auth_data)
        response.raise_for_status()
        
        token_data = response.json()
        self.access_token = token_data['access_token']
        self.refresh_token = token_data.get('refresh_token')
    
    def _refresh_access_token(self):
        # リフレッシュトークンでアクセストークンを更新
        if self.refresh_token:
            refresh_data = {
                'client_id': self.client_id,
                'client_secret': self.client_secret,
                'grant_type': 'refresh_token',
                'refresh_token': self.refresh_token
            }
            
            response = httpx.post(self.token_url, data=refresh_data)
            if response.status_code == 200:
                token_data = response.json()
                self.access_token = token_data['access_token']
                return
        
        # リフレッシュ失敗時は新しいトークンを取得
        self._get_new_token()

# ファイルアップロード(マルチパート)
async def upload_file_async(file_path: Path, upload_url: str, additional_fields: Dict[str, str] = None) -> Dict[str, Any]:
    """非同期ファイルアップロード"""
    
    async with httpx.AsyncClient(timeout=httpx.Timeout(300.0)) as client:
        files = {'file': (file_path.name, file_path.open('rb'), 'application/octet-stream')}
        data = additional_fields or {}
        
        try:
            response = await client.post(
                upload_url,
                files=files,
                data=data,
                headers={'Authorization': 'Bearer your-token'}
            )
            response.raise_for_status()
            return response.json()
            
        finally:
            files['file'][1].close()

# 大容量ファイルのストリーミングダウンロード
async def download_large_file_async(url: str, local_path: Path, chunk_size: int = 8192) -> None:
    """大容量ファイルの非同期ストリーミングダウンロード"""
    
    async with httpx.AsyncClient(timeout=httpx.Timeout(None)) as client:
        async with client.stream('GET', url) as response:
            response.raise_for_status()
            
            # ファイルサイズの取得
            total_size = int(response.headers.get('content-length', 0))
            downloaded = 0
            
            with local_path.open('wb') as f:
                async for chunk in response.aiter_bytes(chunk_size=chunk_size):
                    f.write(chunk)
                    downloaded += len(chunk)
                    
                    # 進捗表示
                    if total_size > 0:
                        progress = (downloaded / total_size) * 100
                        print(f"ダウンロード進捗: {progress:.1f}%", end='\r')
            
            print(f"\nダウンロード完了: {local_path}")

# プロキシサーバー統合(FastAPI)
from fastapi import FastAPI, Request, Response
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.api_route("/proxy/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def proxy_requests(request: Request, path: str):
    """外部APIへのプロキシエンドポイント"""
    
    target_url = f"https://external-api.example.com/{path}"
    
    # リクエストヘッダーの転送(一部除外)
    excluded_headers = {'host', 'content-length', 'transfer-encoding'}
    headers = {k: v for k, v in request.headers.items() if k.lower() not in excluded_headers}
    
    async with httpx.AsyncClient() as client:
        # リクエストボディの取得
        body = await request.body()
        
        try:
            response = await client.request(
                method=request.method,
                url=target_url,
                headers=headers,
                content=body,
                params=request.query_params,
                timeout=30.0
            )
            
            # レスポンスヘッダーの転送(一部除外)
            excluded_response_headers = {'content-encoding', 'content-length', 'transfer-encoding', 'connection'}
            response_headers = {k: v for k, v in response.headers.items() if k.lower() not in excluded_response_headers}
            
            return Response(
                content=response.content,
                status_code=response.status_code,
                headers=response_headers
            )
            
        except httpx.RequestError as e:
            return Response(
                content=json.dumps({'error': str(e)}),
                status_code=502,
                headers={'content-type': 'application/json'}
            )

# WebSocketプロキシのようなストリーミング転送
@app.get("/stream-proxy/{path:path}")
async def stream_proxy(path: str):
    """ストリーミングレスポンスのプロキシ"""
    
    target_url = f"https://streaming-api.example.com/{path}"
    
    async def generate_stream():
        async with httpx.AsyncClient() as client:
            async with client.stream('GET', target_url) as response:
                async for chunk in response.aiter_bytes():
                    yield chunk
    
    return StreamingResponse(
        generate_stream(),
        media_type='application/octet-stream'
    )

# 使用例
# async def main():
#     await api_client_example()
#     await upload_file_async(Path('/path/to/file.pdf'), 'https://api.example.com/upload')
#     await download_large_file_async('https://api.example.com/large-file.zip', Path('/tmp/file.zip'))

# asyncio.run(main())