HTTPX
PythonのモダンHTTPクライアント。RequestsライクAPIに加えて、async/await完全サポート、HTTP/2対応、WebSocket接続機能を提供。同期・非同期両方のAPIを統一インターフェースで利用可能な次世代HTTPライブラリ。
GitHub概要
encode/httpx
A next generation HTTP client for Python. 🦋
スター14,326
ウォッチ115
フォーク927
作成日:2019年4月4日
言語:Python
ライセンス:BSD 3-Clause "New" or "Revised" License
トピックス
asynciohttppythontrio
スター履歴
データ取得日時: 2025/7/18 01:38
ライブラリ
HTTPX
概要
HTTPXは「次世代PythonHTTPクライアント」として開発された、requestsライブラリの現代的な後継者です。同期・非同期の両方のAPIを統一インターフェースで提供し、HTTP/1.1とHTTP/2の完全サポート、優れたストリーミング機能、包括的な認証システムを実装。requestsライブラリとの高い互換性を保ちながら、現代のWebアプリケーション要件に対応する高性能HTTP通信ライブラリとして、Pythonエコシステムでの採用が急速に拡大しています。
詳細
HTTPX 2025年版は非同期プログラミングの普及とHTTP/2の標準化を背景に、Python HTTP通信の新しいスタンダードとして確立されています。FastAPI、Starlette等のASGI フレームワークとの統合で特に威力を発揮し、高性能非同期Webアプリケーション開発を支援。requestsと同等のシンプルなAPIながら、async/await対応、HTTP/2サポート、ストリーミング処理、カスタマイズ可能なTransport層など、モダンなHTTP通信要件を満たす豊富な機能を提供します。
主な特徴
- 統一API: 同期・非同期の両方で一貫したインターフェース
- HTTP/2完全サポート: マルチプレクシングと性能向上
- requests互換性: 既存コードの移行が容易
- ストリーミング処理: 大容量データの効率的な処理
- 柔軟なTransport: カスタマイズ可能な通信層
- 包括的認証: Basic、Digest、NetRC、カスタム認証対応
メリット・デメリット
メリット
- requests互換APIによる既存コードからの容易な移行
- 同期・非同期の統一インターフェースで学習コストを削減
- HTTP/2サポートによる高速化とコネクション効率向上
- 非同期処理により大量のHTTPリクエストを効率的に処理
- モダンなPython開発パターン(type hints、async/await)への対応
- FastAPI等の最新フレームワークとの優れた統合性
デメリット
- requestsほどの普及率とエコシステム成熟度に到達していない
- HTTP/2の恩恵を受けるためにはサーバー側対応が必要
- 非同期プログラミングの理解が必要で初学者には複雑
- 一部のrequests拡張ライブラリが未対応
- ドキュメントと学習リソースがrequestsより限定的
- パフォーマンス向上は用途により効果に差がある
参考ページ
書き方の例
インストールと基本セットアップ
# HTTPXのインストール
pip install httpx
# HTTP/2サポート付きインストール(推奨)
pip install 'httpx[http2]'
# 開発版・最新機能付きインストール
pip install 'httpx[all]'
# 依存関係確認
python -c "import httpx; print(httpx.__version__)"
基本的なリクエスト(GET/POST/PUT/DELETE)
import httpx
# 基本的なGETリクエスト(同期)
response = httpx.get('https://api.example.com/users')
print(response.status_code) # 200
print(response.headers['content-type']) # application/json
print(response.text) # レスポンステキスト
data = response.json() # JSONとして解析
print(data)
# URLパラメータ付きGETリクエスト
params = {'page': 1, 'limit': 10, 'sort': 'created_at'}
response = httpx.get('https://api.example.com/users', params=params)
print(response.url) # https://api.example.com/users?page=1&limit=10&sort=created_at
# POSTリクエスト(JSON送信)
user_data = {
'name': '田中太郎',
'email': '[email protected]',
'age': 30
}
response = httpx.post(
'https://api.example.com/users',
json=user_data, # 自動的にContent-Type: application/jsonが設定される
headers={'Authorization': 'Bearer your-token'}
)
if response.status_code == 201:
created_user = response.json()
print(f"ユーザー作成完了: ID={created_user['id']}")
else:
print(f"エラー: {response.status_code} - {response.text}")
# POSTリクエスト(フォームデータ送信)
form_data = {'username': 'testuser', 'password': 'secret123'}
response = httpx.post('https://api.example.com/login', data=form_data)
# PUTリクエスト(データ更新)
updated_data = {'name': '田中次郎', 'email': '[email protected]'}
response = httpx.put(
'https://api.example.com/users/123',
json=updated_data,
headers={'Authorization': 'Bearer your-token'}
)
# DELETEリクエスト
response = httpx.delete(
'https://api.example.com/users/123',
headers={'Authorization': 'Bearer your-token'}
)
if response.status_code == 204:
print("ユーザー削除完了")
# レスポンス属性の詳細確認
print(f"ステータスコード: {response.status_code}")
print(f"リーズン: {response.reason_phrase}")
print(f"ヘッダー: {response.headers}")
print(f"エンコーディング: {response.encoding}")
print(f"リクエストURL: {response.url}")
print(f"HTTPバージョン: {response.http_version}")
# 非同期リクエストの基本例
import asyncio
async def async_request_example():
async with httpx.AsyncClient() as client:
response = await client.get('https://api.example.com/users')
data = response.json()
print(f"非同期取得データ: {len(data)}件")
return data
# 実行例
# asyncio.run(async_request_example())
高度な設定とカスタマイズ(ヘッダー、認証、タイムアウト等)
import httpx
import ssl
# カスタムヘッダーの設定
headers = {
'User-Agent': 'MyApp/1.0 (HTTPX Python)',
'Accept': 'application/json',
'Accept-Language': 'ja-JP,en-US',
'X-API-Version': 'v2',
'X-Request-ID': 'req-12345'
}
response = httpx.get('https://api.example.com/data', headers=headers)
# Basic認証
auth = httpx.BasicAuth('username', 'password')
response = httpx.get('https://api.example.com/private', auth=auth)
# または認証タプル(requests互換)
response = httpx.get('https://api.example.com/private', auth=('username', 'password'))
# Digest認証
digest_auth = httpx.DigestAuth('username', 'password')
response = httpx.get('https://api.example.com/digest', auth=digest_auth)
# NetRC認証(.netrcファイル使用)
netrc_auth = httpx.NetRCAuth()
response = httpx.get('https://api.example.com/netrc', auth=netrc_auth)
# Bearer Token認証
headers = {'Authorization': 'Bearer your-jwt-token'}
response = httpx.get('https://api.example.com/protected', headers=headers)
# タイムアウト設定
try:
# 接続タイムアウト5秒、読み込みタイムアウト10秒
timeout = httpx.Timeout(5.0, read=10.0)
response = httpx.get('https://api.example.com/slow', timeout=timeout)
# シンプルなタイムアウト(全体で15秒)
response = httpx.get('https://api.example.com/data', timeout=15.0)
# タイムアウト無効化
response = httpx.get('https://api.example.com/unlimited', timeout=None)
except httpx.TimeoutException:
print("リクエストがタイムアウトしました")
# SSL設定とクライアント証明書
ssl_context = ssl.create_default_context()
ssl_context.load_cert_chain('/path/to/client.pem')
response = httpx.get(
'https://secure-api.example.com/data',
verify=ssl_context # カスタムSSLコンテキスト
)
# SSL証明書検証の無効化(開発時のみ)
response = httpx.get('https://self-signed.example.com/', verify=False)
# プロキシ設定
proxies = {
'http://': 'http://proxy.example.com:8080',
'https://': 'http://proxy.example.com:8080'
}
response = httpx.get('https://api.example.com/data', proxies=proxies)
# 認証付きプロキシ
proxies = {
'http://': 'http://user:[email protected]:8080',
'https://': 'http://user:[email protected]:8080'
}
# Cookie設定
cookies = {'session_id': 'abc123', 'user_pref': 'dark_mode'}
response = httpx.get('https://api.example.com/user-data', cookies=cookies)
# リダイレクト制御
response = httpx.get(
'https://api.example.com/redirect',
follow_redirects=False # リダイレクトを無効化
)
# HTTP/2有効化
with httpx.Client(http2=True) as client:
response = client.get('https://api.example.com/http2-endpoint')
print(f"HTTPバージョン: {response.http_version}")
# 詳細なタイムアウト設定
timeout = httpx.Timeout(
connect=5.0, # 接続タイムアウト
read=10.0, # 読み込みタイムアウト
write=5.0, # 書き込みタイムアウト
pool=15.0 # プールタイムアウト
)
with httpx.Client(timeout=timeout) as client:
response = client.get('https://api.example.com/data')
エラーハンドリングとリトライ機能
import httpx
import asyncio
import time
from typing import Optional
# 包括的なエラーハンドリング
def safe_request(url: str, **kwargs) -> Optional[httpx.Response]:
try:
response = httpx.get(url, **kwargs)
# HTTPステータスコードのチェック
response.raise_for_status() # 4xx/5xxでHTTPStatusErrorを発生
return response
except httpx.ConnectError as e:
print(f"接続エラー: {e}")
print("ネットワーク接続を確認してください")
except httpx.TimeoutException as e:
print(f"タイムアウトエラー: {e}")
print("リクエストがタイムアウトしました")
except httpx.HTTPStatusError as e:
print(f"HTTPエラー: {e}")
print(f"ステータスコード: {e.response.status_code}")
print(f"レスポンス: {e.response.text}")
except httpx.TooManyRedirects as e:
print(f"リダイレクトエラー: {e}")
print("リダイレクトが多すぎます")
except httpx.RequestError as e:
print(f"リクエストエラー: {e}")
print("予期しないエラーが発生しました")
return None
# 使用例
response = safe_request('https://api.example.com/data', timeout=10.0)
if response:
data = response.json()
print(data)
# 手動リトライ実装(同期版)
def request_with_retry(url: str, max_retries: int = 3, backoff_factor: float = 1.0, **kwargs) -> httpx.Response:
for attempt in range(max_retries + 1):
try:
response = httpx.get(url, **kwargs)
response.raise_for_status()
return response
except httpx.RequestError as e:
if attempt == max_retries:
print(f"最大試行回数に達しました: {e}")
raise
wait_time = backoff_factor * (2 ** attempt)
print(f"試行 {attempt + 1} 失敗. {wait_time}秒後に再試行...")
time.sleep(wait_time)
# 非同期リトライ実装
async def async_request_with_retry(
client: httpx.AsyncClient,
url: str,
max_retries: int = 3,
backoff_factor: float = 1.0,
**kwargs
) -> httpx.Response:
for attempt in range(max_retries + 1):
try:
response = await client.get(url, **kwargs)
response.raise_for_status()
return response
except httpx.RequestError as e:
if attempt == max_retries:
print(f"最大試行回数に達しました: {e}")
raise
wait_time = backoff_factor * (2 ** attempt)
print(f"試行 {attempt + 1} 失敗. {wait_time}秒後に再試行...")
await asyncio.sleep(wait_time)
# 使用例
try:
response = request_with_retry(
'https://api.example.com/unstable',
max_retries=3,
backoff_factor=1.0,
timeout=10.0
)
print("リクエスト成功:", response.status_code)
except httpx.RequestError as e:
print("最終的に失敗:", e)
# カスタム認証とエラーハンドリング
class CustomAuthWithRetry(httpx.Auth):
def __init__(self, token: str):
self.token = token
def auth_flow(self, request):
# 最初にトークンを設定
request.headers['X-Authentication'] = self.token
response = yield request
# 401エラーの場合、新しいトークンで再試行
if response.status_code == 401:
# 新しいトークンを取得(実際の実装では外部API呼び出し)
new_token = self.refresh_token()
request.headers['X-Authentication'] = new_token
yield request
def refresh_token(self) -> str:
# トークンリフレッシュのロジック
return "new-refreshed-token"
# ステータスコード別処理
response = httpx.get('https://api.example.com/status-check')
if response.status_code == 200:
print("正常: ", response.json())
elif response.status_code == 401:
print("認証エラー: トークンを確認してください")
elif response.status_code == 403:
print("権限エラー: アクセス権限がありません")
elif response.status_code == 404:
print("見つかりません: リソースが存在しません")
elif response.status_code == 429:
print("レート制限: しばらく待ってから再試行してください")
elif response.status_code >= 500:
print("サーバーエラー: サーバー側の問題です")
else:
print(f"予期しないステータス: {response.status_code}")
# HTTPステータスエラーの詳細処理
try:
response = httpx.get('https://api.example.com/may-fail')
response.raise_for_status()
except httpx.HTTPStatusError as exc:
print(f"エラーレスポンス {exc.response.status_code} 取得中: {exc.request.url!r}")
# レスポンス内容を確認
error_details = exc.response.json() if exc.response.headers.get('content-type') == 'application/json' else exc.response.text
print(f"エラー詳細: {error_details}")
並行処理と非同期リクエスト
import httpx
import asyncio
from typing import List, Dict, Any
# 非同期クライアントを使用した並列リクエスト
async def fetch_multiple_urls_async(urls: List[str]) -> List[Dict[str, Any]]:
async with httpx.AsyncClient() as client:
tasks = []
for url in urls:
task = asyncio.create_task(fetch_url_async(client, url))
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# 結果の処理
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({
'url': urls[i],
'success': False,
'error': str(result)
})
else:
processed_results.append(result)
return processed_results
async def fetch_url_async(client: httpx.AsyncClient, url: str) -> Dict[str, Any]:
try:
response = await client.get(url, timeout=10.0)
response.raise_for_status()
return {
'url': url,
'status_code': response.status_code,
'content_length': len(response.content),
'success': True,
'data': response.json() if 'application/json' in response.headers.get('content-type', '') else None
}
except httpx.RequestError as e:
return {
'url': url,
'success': False,
'error': str(e)
}
# 使用例
urls = [
'https://api.example.com/users',
'https://api.example.com/posts',
'https://api.example.com/comments',
'https://api.example.com/categories'
]
# async環境での実行
# results = await fetch_multiple_urls_async(urls)
# successful_results = [r for r in results if r['success']]
# print(f"成功: {len(successful_results)}/{len(urls)}")
# セマフォを使用した同時接続数制御
async def fetch_with_semaphore(urls: List[str], max_concurrent: int = 5) -> List[Dict[str, Any]]:
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_limit(client: httpx.AsyncClient, url: str) -> Dict[str, Any]:
async with semaphore:
return await fetch_url_async(client, url)
async with httpx.AsyncClient() as client:
tasks = [fetch_with_limit(client, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r if not isinstance(r, Exception) else {'url': 'unknown', 'success': False, 'error': str(r)} for r in results]
# ページネーション対応の非同期データ取得
async def fetch_all_pages_async(base_url: str, headers: Dict[str, str] = None, max_pages: int = None) -> List[Dict[str, Any]]:
all_data = []
page = 1
async with httpx.AsyncClient() as client:
if headers:
client.headers.update(headers)
while True:
try:
params = {'page': page, 'per_page': 100}
response = await client.get(base_url, params=params, timeout=10.0)
response.raise_for_status()
data = response.json()
if not data or (isinstance(data, list) and len(data) == 0):
break # データがない場合は終了
if isinstance(data, dict) and 'items' in data:
items = data['items']
all_data.extend(items)
# 次のページがない場合は終了
if not data.get('has_more', True) or len(items) == 0:
break
else:
all_data.extend(data)
print(f"ページ {page} 取得完了: {len(data if isinstance(data, list) else data.get('items', []))}件")
page += 1
# 最大ページ制限チェック
if max_pages and page > max_pages:
break
# API負荷軽減のための待機
await asyncio.sleep(0.1)
except httpx.RequestError as e:
print(f"ページ {page} でエラー: {e}")
break
print(f"総取得件数: {len(all_data)}件")
return all_data
# ストリーミングレスポンスの処理
async def stream_large_response(url: str) -> None:
async with httpx.AsyncClient() as client:
async with client.stream('GET', url) as response:
response.raise_for_status()
# ヘッダー情報の確認
content_length = response.headers.get('content-length')
if content_length:
print(f"コンテンツサイズ: {int(content_length):,} bytes")
# チャンクごとの処理
downloaded = 0
async for chunk in response.aiter_bytes(chunk_size=8192):
downloaded += len(chunk)
print(f"ダウンロード済み: {downloaded:,} bytes", end='\r')
# ここで実際のチャンク処理(ファイル書き込み等)を行う
print(f"\nダウンロード完了: {downloaded:,} bytes")
# バッチ処理による効率的なリクエスト
async def process_items_in_batches(items: List[str], batch_size: int = 10) -> List[Dict[str, Any]]:
all_results = []
async with httpx.AsyncClient() as client:
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
print(f"バッチ {i//batch_size + 1} 処理中: {len(batch)}件")
# バッチ内の並列処理
tasks = [fetch_url_async(client, f'https://api.example.com/items/{item}') for item in batch]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
# エラーハンドリング
for result in batch_results:
if isinstance(result, Exception):
all_results.append({'success': False, 'error': str(result)})
else:
all_results.append(result)
# バッチ間の待機
if i + batch_size < len(items):
await asyncio.sleep(1.0)
return all_results
# WebSocketライクなストリーミング処理
async def handle_streaming_events(url: str) -> None:
async with httpx.AsyncClient() as client:
async with client.stream('GET', url) as response:
async for line in response.aiter_lines():
if line.startswith('data: '):
# Server-Sent Events形式のデータ処理
event_data = line[6:] # 'data: 'プレフィックスを削除
try:
import json
data = json.loads(event_data)
print(f"イベント受信: {data}")
# ここでイベントデータの処理を行う
except json.JSONDecodeError:
print(f"テキストイベント: {event_data}")
フレームワーク統合と実用例
import httpx
import asyncio
from typing import Optional, Dict, Any
import json
from pathlib import Path
# FastAPI統合用の非同期HTTPクライアント
class AsyncAPIClient:
def __init__(self, base_url: str, token: Optional[str] = None, http2: bool = True):
self.base_url = base_url.rstrip('/')
self.token = token
self.http2 = http2
async def __aenter__(self):
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'User-Agent': 'AsyncAPIClient/1.0'
}
if self.token:
headers['Authorization'] = f'Bearer {self.token}'
self.client = httpx.AsyncClient(
headers=headers,
timeout=httpx.Timeout(30.0),
http2=self.http2,
follow_redirects=True
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.client.aclose()
async def get(self, endpoint: str, **kwargs) -> Dict[str, Any]:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
return await self._make_request('GET', url, **kwargs)
async def post(self, endpoint: str, data: Optional[Dict] = None, **kwargs) -> Dict[str, Any]:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
return await self._make_request('POST', url, json=data, **kwargs)
async def put(self, endpoint: str, data: Optional[Dict] = None, **kwargs) -> Dict[str, Any]:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
return await self._make_request('PUT', url, json=data, **kwargs)
async def delete(self, endpoint: str, **kwargs) -> Optional[Dict[str, Any]]:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
return await self._make_request('DELETE', url, **kwargs)
async def _make_request(self, method: str, url: str, **kwargs) -> Optional[Dict[str, Any]]:
try:
response = await self.client.request(method, url, **kwargs)
response.raise_for_status()
if response.content:
return response.json()
return None
except httpx.RequestError as e:
print(f"API Error: {method} {url} - {e}")
raise
# 使用例
async def api_client_example():
async with AsyncAPIClient('https://api.example.com/v1', token='your-jwt-token') as client:
# ユーザー一覧取得
users = await client.get('users', params={'page': 1, 'limit': 50})
# 新しいユーザー作成
new_user = await client.post('users', data={
'name': '田中太郎',
'email': '[email protected]'
})
# ユーザー更新
updated_user = await client.put(f'users/{new_user["id"]}', data={
'name': '田中次郎'
})
# カスタム認証クラス(トークン自動更新)
class AutoRefreshAuth(httpx.Auth):
def __init__(self, client_id: str, client_secret: str, token_url: str):
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self.access_token = None
self.refresh_token = None
def auth_flow(self, request):
# 初回または無効なトークンの場合、新しいトークンを取得
if not self.access_token:
self._get_new_token()
request.headers['Authorization'] = f'Bearer {self.access_token}'
response = yield request
# 401エラーの場合、トークンをリフレッシュして再試行
if response.status_code == 401:
self._refresh_access_token()
request.headers['Authorization'] = f'Bearer {self.access_token}'
yield request
def _get_new_token(self):
# クライアント認証でトークン取得
auth_data = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'client_credentials'
}
response = httpx.post(self.token_url, data=auth_data)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data['access_token']
self.refresh_token = token_data.get('refresh_token')
def _refresh_access_token(self):
# リフレッシュトークンでアクセストークンを更新
if self.refresh_token:
refresh_data = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'refresh_token',
'refresh_token': self.refresh_token
}
response = httpx.post(self.token_url, data=refresh_data)
if response.status_code == 200:
token_data = response.json()
self.access_token = token_data['access_token']
return
# リフレッシュ失敗時は新しいトークンを取得
self._get_new_token()
# ファイルアップロード(マルチパート)
async def upload_file_async(file_path: Path, upload_url: str, additional_fields: Dict[str, str] = None) -> Dict[str, Any]:
"""非同期ファイルアップロード"""
async with httpx.AsyncClient(timeout=httpx.Timeout(300.0)) as client:
files = {'file': (file_path.name, file_path.open('rb'), 'application/octet-stream')}
data = additional_fields or {}
try:
response = await client.post(
upload_url,
files=files,
data=data,
headers={'Authorization': 'Bearer your-token'}
)
response.raise_for_status()
return response.json()
finally:
files['file'][1].close()
# 大容量ファイルのストリーミングダウンロード
async def download_large_file_async(url: str, local_path: Path, chunk_size: int = 8192) -> None:
"""大容量ファイルの非同期ストリーミングダウンロード"""
async with httpx.AsyncClient(timeout=httpx.Timeout(None)) as client:
async with client.stream('GET', url) as response:
response.raise_for_status()
# ファイルサイズの取得
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
with local_path.open('wb') as f:
async for chunk in response.aiter_bytes(chunk_size=chunk_size):
f.write(chunk)
downloaded += len(chunk)
# 進捗表示
if total_size > 0:
progress = (downloaded / total_size) * 100
print(f"ダウンロード進捗: {progress:.1f}%", end='\r')
print(f"\nダウンロード完了: {local_path}")
# プロキシサーバー統合(FastAPI)
from fastapi import FastAPI, Request, Response
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.api_route("/proxy/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def proxy_requests(request: Request, path: str):
"""外部APIへのプロキシエンドポイント"""
target_url = f"https://external-api.example.com/{path}"
# リクエストヘッダーの転送(一部除外)
excluded_headers = {'host', 'content-length', 'transfer-encoding'}
headers = {k: v for k, v in request.headers.items() if k.lower() not in excluded_headers}
async with httpx.AsyncClient() as client:
# リクエストボディの取得
body = await request.body()
try:
response = await client.request(
method=request.method,
url=target_url,
headers=headers,
content=body,
params=request.query_params,
timeout=30.0
)
# レスポンスヘッダーの転送(一部除外)
excluded_response_headers = {'content-encoding', 'content-length', 'transfer-encoding', 'connection'}
response_headers = {k: v for k, v in response.headers.items() if k.lower() not in excluded_response_headers}
return Response(
content=response.content,
status_code=response.status_code,
headers=response_headers
)
except httpx.RequestError as e:
return Response(
content=json.dumps({'error': str(e)}),
status_code=502,
headers={'content-type': 'application/json'}
)
# WebSocketプロキシのようなストリーミング転送
@app.get("/stream-proxy/{path:path}")
async def stream_proxy(path: str):
"""ストリーミングレスポンスのプロキシ"""
target_url = f"https://streaming-api.example.com/{path}"
async def generate_stream():
async with httpx.AsyncClient() as client:
async with client.stream('GET', target_url) as response:
async for chunk in response.aiter_bytes():
yield chunk
return StreamingResponse(
generate_stream(),
media_type='application/octet-stream'
)
# 使用例
# async def main():
# await api_client_example()
# await upload_file_async(Path('/path/to/file.pdf'), 'https://api.example.com/upload')
# await download_large_file_async('https://api.example.com/large-file.zip', Path('/tmp/file.zip'))
# asyncio.run(main())