HTTPX

Modern HTTP client for Python. Provides Requests-like API with full async/await support, HTTP/2 compatibility, and WebSocket connection capabilities. Next-generation HTTP library enabling both synchronous and asynchronous APIs through unified interface.

HTTP ClientPythonAsyncHTTP/2Modern

GitHub Overview

encode/httpx

A next generation HTTP client for Python. 🦋

Stars14,326
Watchers115
Forks927
Created:April 4, 2019
Language:Python
License:BSD 3-Clause "New" or "Revised" License

Topics

asynciohttppythontrio

Star History

encode/httpx Star History
Data as of: 7/18/2025, 01:38 AM

Library

HTTPX

Overview

HTTPX is developed as "the next-generation Python HTTP client" serving as the modern successor to the requests library. It provides a unified interface for both synchronous and asynchronous APIs, implementing complete support for HTTP/1.1 and HTTP/2, excellent streaming capabilities, and comprehensive authentication systems. While maintaining high compatibility with the requests library, it addresses modern web application requirements as a high-performance HTTP communication library, experiencing rapid adoption growth within the Python ecosystem.

Details

HTTPX 2025 edition has established itself as the new standard for Python HTTP communication against the backdrop of widespread asynchronous programming adoption and HTTP/2 standardization. It particularly excels in integration with ASGI frameworks like FastAPI and Starlette, supporting high-performance asynchronous web application development. While offering an API as simple as requests, it provides rich functionality to meet modern HTTP communication requirements including async/await support, HTTP/2 support, streaming processing, and customizable Transport layers.

Key Features

  • Unified API: Consistent interface for both synchronous and asynchronous operations
  • Complete HTTP/2 Support: Multiplexing and performance improvements
  • Requests Compatibility: Easy migration from existing code
  • Streaming Processing: Efficient handling of large data volumes
  • Flexible Transport: Customizable communication layer
  • Comprehensive Authentication: Support for Basic, Digest, NetRC, and custom authentication

Pros and Cons

Pros

  • Easy migration from existing code through requests-compatible API
  • Reduced learning costs with unified interface for sync/async operations
  • Performance acceleration and connection efficiency through HTTP/2 support
  • Efficient processing of large HTTP request volumes through asynchronous processing
  • Support for modern Python development patterns (type hints, async/await)
  • Excellent integration with latest frameworks like FastAPI
  • Modern async/await support compared to requests' synchronous-only approach

Cons

  • Not as widespread in adoption and ecosystem maturity as requests yet
  • Requires server-side support to benefit from HTTP/2 advantages
  • Requires understanding of asynchronous programming, complex for beginners
  • Some requests extension libraries are not yet supported
  • Documentation and learning resources more limited than requests
  • Performance improvements vary in effectiveness depending on use cases

Reference Pages

Code Examples

Installation and Basic Setup

# Install HTTPX
pip install httpx

# Install with HTTP/2 support (recommended)
pip install 'httpx[http2]'

# Install development version with all features
pip install 'httpx[all]'

# Check installation
python -c "import httpx; print(httpx.__version__)"

Basic Requests (GET/POST/PUT/DELETE)

import httpx

# Basic GET request (synchronous)
response = httpx.get('https://api.example.com/users')
print(response.status_code)  # 200
print(response.headers['content-type'])  # application/json
print(response.text)  # Response text
data = response.json()  # Parse as JSON
print(data)

# GET request with URL parameters
params = {'page': 1, 'limit': 10, 'sort': 'created_at'}
response = httpx.get('https://api.example.com/users', params=params)
print(response.url)  # https://api.example.com/users?page=1&limit=10&sort=created_at

# POST request (sending JSON)
user_data = {
    'name': 'John Doe',
    'email': '[email protected]',
    'age': 30
}

response = httpx.post(
    'https://api.example.com/users',
    json=user_data,  # Automatically sets Content-Type: application/json
    headers={'Authorization': 'Bearer your-token'}
)

if response.status_code == 201:
    created_user = response.json()
    print(f"User created: ID={created_user['id']}")
else:
    print(f"Error: {response.status_code} - {response.text}")

# POST request (sending form data)
form_data = {'username': 'testuser', 'password': 'secret123'}
response = httpx.post('https://api.example.com/login', data=form_data)

# PUT request (data update)
updated_data = {'name': 'Jane Doe', 'email': '[email protected]'}
response = httpx.put(
    'https://api.example.com/users/123',
    json=updated_data,
    headers={'Authorization': 'Bearer your-token'}
)

# DELETE request
response = httpx.delete(
    'https://api.example.com/users/123',
    headers={'Authorization': 'Bearer your-token'}
)

if response.status_code == 204:
    print("User deleted successfully")

# Detailed response attribute inspection
print(f"Status code: {response.status_code}")
print(f"Reason phrase: {response.reason_phrase}")
print(f"Headers: {response.headers}")
print(f"Encoding: {response.encoding}")
print(f"Request URL: {response.url}")
print(f"HTTP version: {response.http_version}")

# Basic asynchronous request example
import asyncio

async def async_request_example():
    async with httpx.AsyncClient() as client:
        response = await client.get('https://api.example.com/users')
        data = response.json()
        print(f"Async fetched data: {len(data)} items")
        return data

# Execution example
# asyncio.run(async_request_example())

Advanced Configuration and Customization (Headers, Authentication, Timeout, etc.)

import httpx
import ssl

# Custom header configuration
headers = {
    'User-Agent': 'MyApp/1.0 (HTTPX Python)',
    'Accept': 'application/json',
    'Accept-Language': 'en-US,ja-JP',
    'X-API-Version': 'v2',
    'X-Request-ID': 'req-12345'
}

response = httpx.get('https://api.example.com/data', headers=headers)

# Basic authentication
auth = httpx.BasicAuth('username', 'password')
response = httpx.get('https://api.example.com/private', auth=auth)

# Or authentication tuple (requests compatible)
response = httpx.get('https://api.example.com/private', auth=('username', 'password'))

# Digest authentication
digest_auth = httpx.DigestAuth('username', 'password')
response = httpx.get('https://api.example.com/digest', auth=digest_auth)

# NetRC authentication (using .netrc file)
netrc_auth = httpx.NetRCAuth()
response = httpx.get('https://api.example.com/netrc', auth=netrc_auth)

# Bearer Token authentication
headers = {'Authorization': 'Bearer your-jwt-token'}
response = httpx.get('https://api.example.com/protected', headers=headers)

# Timeout configuration
try:
    # Connection timeout 5 seconds, read timeout 10 seconds
    timeout = httpx.Timeout(5.0, read=10.0)
    response = httpx.get('https://api.example.com/slow', timeout=timeout)
    
    # Simple timeout (overall 15 seconds)
    response = httpx.get('https://api.example.com/data', timeout=15.0)
    
    # Disable timeout
    response = httpx.get('https://api.example.com/unlimited', timeout=None)
    
except httpx.TimeoutException:
    print("Request timed out")

# SSL configuration and client certificates
ssl_context = ssl.create_default_context()
ssl_context.load_cert_chain('/path/to/client.pem')

response = httpx.get(
    'https://secure-api.example.com/data',
    verify=ssl_context  # Custom SSL context
)

# Disable SSL certificate verification (development only)
response = httpx.get('https://self-signed.example.com/', verify=False)

# Proxy configuration
proxies = {
    'http://': 'http://proxy.example.com:8080',
    'https://': 'http://proxy.example.com:8080'
}

response = httpx.get('https://api.example.com/data', proxies=proxies)

# Authenticated proxy
proxies = {
    'http://': 'http://user:[email protected]:8080',
    'https://': 'http://user:[email protected]:8080'
}

# Cookie configuration
cookies = {'session_id': 'abc123', 'user_pref': 'dark_mode'}
response = httpx.get('https://api.example.com/user-data', cookies=cookies)

# Redirect control
response = httpx.get(
    'https://api.example.com/redirect',
    follow_redirects=False  # Disable redirects
)

# Enable HTTP/2
with httpx.Client(http2=True) as client:
    response = client.get('https://api.example.com/http2-endpoint')
    print(f"HTTP version: {response.http_version}")

# Detailed timeout configuration
timeout = httpx.Timeout(
    connect=5.0,  # Connection timeout
    read=10.0,    # Read timeout
    write=5.0,    # Write timeout
    pool=15.0     # Pool timeout
)

with httpx.Client(timeout=timeout) as client:
    response = client.get('https://api.example.com/data')

Error Handling and Retry Functionality

import httpx
import asyncio
import time
from typing import Optional

# Comprehensive error handling
def safe_request(url: str, **kwargs) -> Optional[httpx.Response]:
    try:
        response = httpx.get(url, **kwargs)
        
        # HTTP status code check
        response.raise_for_status()  # Raises HTTPStatusError for 4xx/5xx
        
        return response
        
    except httpx.ConnectError as e:
        print(f"Connection error: {e}")
        print("Please check your network connection")
    except httpx.TimeoutException as e:
        print(f"Timeout error: {e}")
        print("Request timed out")
    except httpx.HTTPStatusError as e:
        print(f"HTTP error: {e}")
        print(f"Status code: {e.response.status_code}")
        print(f"Response: {e.response.text}")
    except httpx.TooManyRedirects as e:
        print(f"Redirect error: {e}")
        print("Too many redirects")
    except httpx.RequestError as e:
        print(f"Request error: {e}")
        print("Unexpected error occurred")
    
    return None

# Usage example
response = safe_request('https://api.example.com/data', timeout=10.0)
if response:
    data = response.json()
    print(data)

# Manual retry implementation (synchronous version)
def request_with_retry(url: str, max_retries: int = 3, backoff_factor: float = 1.0, **kwargs) -> httpx.Response:
    for attempt in range(max_retries + 1):
        try:
            response = httpx.get(url, **kwargs)
            response.raise_for_status()
            return response
            
        except httpx.RequestError as e:
            if attempt == max_retries:
                print(f"Maximum attempts reached: {e}")
                raise
            
            wait_time = backoff_factor * (2 ** attempt)
            print(f"Attempt {attempt + 1} failed. Retrying in {wait_time} seconds...")
            time.sleep(wait_time)

# Asynchronous retry implementation
async def async_request_with_retry(
    client: httpx.AsyncClient, 
    url: str, 
    max_retries: int = 3, 
    backoff_factor: float = 1.0, 
    **kwargs
) -> httpx.Response:
    for attempt in range(max_retries + 1):
        try:
            response = await client.get(url, **kwargs)
            response.raise_for_status()
            return response
            
        except httpx.RequestError as e:
            if attempt == max_retries:
                print(f"Maximum attempts reached: {e}")
                raise
            
            wait_time = backoff_factor * (2 ** attempt)
            print(f"Attempt {attempt + 1} failed. Retrying in {wait_time} seconds...")
            await asyncio.sleep(wait_time)

# Usage example
try:
    response = request_with_retry(
        'https://api.example.com/unstable',
        max_retries=3,
        backoff_factor=1.0,
        timeout=10.0
    )
    print("Request successful:", response.status_code)
except httpx.RequestError as e:
    print("Finally failed:", e)

# Custom authentication with error handling
class CustomAuthWithRetry(httpx.Auth):
    def __init__(self, token: str):
        self.token = token
    
    def auth_flow(self, request):
        # First, set token
        request.headers['X-Authentication'] = self.token
        response = yield request
        
        # On 401 error, retry with new token
        if response.status_code == 401:
            # Get new token (in actual implementation, call external API)
            new_token = self.refresh_token()
            request.headers['X-Authentication'] = new_token
            yield request
    
    def refresh_token(self) -> str:
        # Token refresh logic
        return "new-refreshed-token"

# Status code-specific handling
response = httpx.get('https://api.example.com/status-check')

if response.status_code == 200:
    print("Success: ", response.json())
elif response.status_code == 401:
    print("Authentication error: Please check your token")
elif response.status_code == 403:
    print("Permission error: Access denied")
elif response.status_code == 404:
    print("Not found: Resource does not exist")
elif response.status_code == 429:
    print("Rate limit: Please wait before retrying")
elif response.status_code >= 500:
    print("Server error: Problem on server side")
else:
    print(f"Unexpected status: {response.status_code}")

# Detailed HTTP status error handling
try:
    response = httpx.get('https://api.example.com/may-fail')
    response.raise_for_status()
except httpx.HTTPStatusError as exc:
    print(f"Error response {exc.response.status_code} while requesting {exc.request.url!r}")
    # Check response content
    error_details = exc.response.json() if exc.response.headers.get('content-type') == 'application/json' else exc.response.text
    print(f"Error details: {error_details}")

Concurrent Processing and Asynchronous Requests

import httpx
import asyncio
from typing import List, Dict, Any

# Parallel requests using asynchronous client
async def fetch_multiple_urls_async(urls: List[str]) -> List[Dict[str, Any]]:
    async with httpx.AsyncClient() as client:
        tasks = []
        for url in urls:
            task = asyncio.create_task(fetch_url_async(client, url))
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Process results
        processed_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                processed_results.append({
                    'url': urls[i],
                    'success': False,
                    'error': str(result)
                })
            else:
                processed_results.append(result)
        
        return processed_results

async def fetch_url_async(client: httpx.AsyncClient, url: str) -> Dict[str, Any]:
    try:
        response = await client.get(url, timeout=10.0)
        response.raise_for_status()
        return {
            'url': url,
            'status_code': response.status_code,
            'content_length': len(response.content),
            'success': True,
            'data': response.json() if 'application/json' in response.headers.get('content-type', '') else None
        }
    except httpx.RequestError as e:
        return {
            'url': url,
            'success': False,
            'error': str(e)
        }

# Usage example
urls = [
    'https://api.example.com/users',
    'https://api.example.com/posts',
    'https://api.example.com/comments',
    'https://api.example.com/categories'
]

# Run in async environment
# results = await fetch_multiple_urls_async(urls)
# successful_results = [r for r in results if r['success']]
# print(f"Success: {len(successful_results)}/{len(urls)}")

# Concurrent connection control using semaphore
async def fetch_with_semaphore(urls: List[str], max_concurrent: int = 5) -> List[Dict[str, Any]]:
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def fetch_with_limit(client: httpx.AsyncClient, url: str) -> Dict[str, Any]:
        async with semaphore:
            return await fetch_url_async(client, url)
    
    async with httpx.AsyncClient() as client:
        tasks = [fetch_with_limit(client, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        return [r if not isinstance(r, Exception) else {'url': 'unknown', 'success': False, 'error': str(r)} for r in results]

# Pagination-aware asynchronous data fetching
async def fetch_all_pages_async(base_url: str, headers: Dict[str, str] = None, max_pages: int = None) -> List[Dict[str, Any]]:
    all_data = []
    page = 1
    
    async with httpx.AsyncClient() as client:
        if headers:
            client.headers.update(headers)
        
        while True:
            try:
                params = {'page': page, 'per_page': 100}
                response = await client.get(base_url, params=params, timeout=10.0)
                response.raise_for_status()
                
                data = response.json()
                
                if not data or (isinstance(data, list) and len(data) == 0):
                    break  # End if no data
                
                if isinstance(data, dict) and 'items' in data:
                    items = data['items']
                    all_data.extend(items)
                    
                    # End if no next page
                    if not data.get('has_more', True) or len(items) == 0:
                        break
                else:
                    all_data.extend(data)
                
                print(f"Page {page} completed: {len(data if isinstance(data, list) else data.get('items', []))} items")
                page += 1
                
                # Check maximum page limit
                if max_pages and page > max_pages:
                    break
                
                # Wait to reduce API load
                await asyncio.sleep(0.1)
                
            except httpx.RequestError as e:
                print(f"Error on page {page}: {e}")
                break
    
    print(f"Total items fetched: {len(all_data)}")
    return all_data

# Streaming response processing
async def stream_large_response(url: str) -> None:
    async with httpx.AsyncClient() as client:
        async with client.stream('GET', url) as response:
            response.raise_for_status()
            
            # Check header information
            content_length = response.headers.get('content-length')
            if content_length:
                print(f"Content size: {int(content_length):,} bytes")
            
            # Process by chunks
            downloaded = 0
            async for chunk in response.aiter_bytes(chunk_size=8192):
                downloaded += len(chunk)
                print(f"Downloaded: {downloaded:,} bytes", end='\r')
                # Perform actual chunk processing (file writing, etc.) here
            
            print(f"\nDownload completed: {downloaded:,} bytes")

# Efficient batch processing
async def process_items_in_batches(items: List[str], batch_size: int = 10) -> List[Dict[str, Any]]:
    all_results = []
    
    async with httpx.AsyncClient() as client:
        for i in range(0, len(items), batch_size):
            batch = items[i:i + batch_size]
            print(f"Processing batch {i//batch_size + 1}: {len(batch)} items")
            
            # Parallel processing within batch
            tasks = [fetch_url_async(client, f'https://api.example.com/items/{item}') for item in batch]
            batch_results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # Error handling
            for result in batch_results:
                if isinstance(result, Exception):
                    all_results.append({'success': False, 'error': str(result)})
                else:
                    all_results.append(result)
            
            # Wait between batches
            if i + batch_size < len(items):
                await asyncio.sleep(1.0)
    
    return all_results

# WebSocket-like streaming processing
async def handle_streaming_events(url: str) -> None:
    async with httpx.AsyncClient() as client:
        async with client.stream('GET', url) as response:
            async for line in response.aiter_lines():
                if line.startswith('data: '):
                    # Server-Sent Events format data processing
                    event_data = line[6:]  # Remove 'data: ' prefix
                    try:
                        import json
                        data = json.loads(event_data)
                        print(f"Event received: {data}")
                        # Process event data here
                    except json.JSONDecodeError:
                        print(f"Text event: {event_data}")

Framework Integration and Practical Examples

import httpx
import asyncio
from typing import Optional, Dict, Any
import json
from pathlib import Path

# Asynchronous HTTP client for FastAPI integration
class AsyncAPIClient:
    def __init__(self, base_url: str, token: Optional[str] = None, http2: bool = True):
        self.base_url = base_url.rstrip('/')
        self.token = token
        self.http2 = http2
    
    async def __aenter__(self):
        headers = {
            'Content-Type': 'application/json',
            'Accept': 'application/json',
            'User-Agent': 'AsyncAPIClient/1.0'
        }
        
        if self.token:
            headers['Authorization'] = f'Bearer {self.token}'
        
        self.client = httpx.AsyncClient(
            headers=headers,
            timeout=httpx.Timeout(30.0),
            http2=self.http2,
            follow_redirects=True
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.client.aclose()
    
    async def get(self, endpoint: str, **kwargs) -> Dict[str, Any]:
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        return await self._make_request('GET', url, **kwargs)
    
    async def post(self, endpoint: str, data: Optional[Dict] = None, **kwargs) -> Dict[str, Any]:
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        return await self._make_request('POST', url, json=data, **kwargs)
    
    async def put(self, endpoint: str, data: Optional[Dict] = None, **kwargs) -> Dict[str, Any]:
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        return await self._make_request('PUT', url, json=data, **kwargs)
    
    async def delete(self, endpoint: str, **kwargs) -> Optional[Dict[str, Any]]:
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        return await self._make_request('DELETE', url, **kwargs)
    
    async def _make_request(self, method: str, url: str, **kwargs) -> Optional[Dict[str, Any]]:
        try:
            response = await self.client.request(method, url, **kwargs)
            response.raise_for_status()
            
            if response.content:
                return response.json()
            return None
            
        except httpx.RequestError as e:
            print(f"API Error: {method} {url} - {e}")
            raise

# Usage example
async def api_client_example():
    async with AsyncAPIClient('https://api.example.com/v1', token='your-jwt-token') as client:
        # Get user list
        users = await client.get('users', params={'page': 1, 'limit': 50})
        
        # Create new user
        new_user = await client.post('users', data={
            'name': 'John Doe',
            'email': '[email protected]'
        })
        
        # Update user
        updated_user = await client.put(f'users/{new_user["id"]}', data={
            'name': 'Jane Doe'
        })

# Custom authentication class (automatic token refresh)
class AutoRefreshAuth(httpx.Auth):
    def __init__(self, client_id: str, client_secret: str, token_url: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_url = token_url
        self.access_token = None
        self.refresh_token = None
    
    def auth_flow(self, request):
        # Get new token if first time or invalid token
        if not self.access_token:
            self._get_new_token()
        
        request.headers['Authorization'] = f'Bearer {self.access_token}'
        response = yield request
        
        # On 401 error, refresh token and retry
        if response.status_code == 401:
            self._refresh_access_token()
            request.headers['Authorization'] = f'Bearer {self.access_token}'
            yield request
    
    def _get_new_token(self):
        # Get token with client authentication
        auth_data = {
            'client_id': self.client_id,
            'client_secret': self.client_secret,
            'grant_type': 'client_credentials'
        }
        
        response = httpx.post(self.token_url, data=auth_data)
        response.raise_for_status()
        
        token_data = response.json()
        self.access_token = token_data['access_token']
        self.refresh_token = token_data.get('refresh_token')
    
    def _refresh_access_token(self):
        # Update access token with refresh token
        if self.refresh_token:
            refresh_data = {
                'client_id': self.client_id,
                'client_secret': self.client_secret,
                'grant_type': 'refresh_token',
                'refresh_token': self.refresh_token
            }
            
            response = httpx.post(self.token_url, data=refresh_data)
            if response.status_code == 200:
                token_data = response.json()
                self.access_token = token_data['access_token']
                return
        
        # Get new token if refresh fails
        self._get_new_token()

# File upload (multipart)
async def upload_file_async(file_path: Path, upload_url: str, additional_fields: Dict[str, str] = None) -> Dict[str, Any]:
    """Asynchronous file upload"""
    
    async with httpx.AsyncClient(timeout=httpx.Timeout(300.0)) as client:
        files = {'file': (file_path.name, file_path.open('rb'), 'application/octet-stream')}
        data = additional_fields or {}
        
        try:
            response = await client.post(
                upload_url,
                files=files,
                data=data,
                headers={'Authorization': 'Bearer your-token'}
            )
            response.raise_for_status()
            return response.json()
            
        finally:
            files['file'][1].close()

# Large file streaming download
async def download_large_file_async(url: str, local_path: Path, chunk_size: int = 8192) -> None:
    """Large file asynchronous streaming download"""
    
    async with httpx.AsyncClient(timeout=httpx.Timeout(None)) as client:
        async with client.stream('GET', url) as response:
            response.raise_for_status()
            
            # Get file size
            total_size = int(response.headers.get('content-length', 0))
            downloaded = 0
            
            with local_path.open('wb') as f:
                async for chunk in response.aiter_bytes(chunk_size=chunk_size):
                    f.write(chunk)
                    downloaded += len(chunk)
                    
                    # Progress display
                    if total_size > 0:
                        progress = (downloaded / total_size) * 100
                        print(f"Download progress: {progress:.1f}%", end='\r')
            
            print(f"\nDownload completed: {local_path}")

# Proxy server integration (FastAPI)
from fastapi import FastAPI, Request, Response
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.api_route("/proxy/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def proxy_requests(request: Request, path: str):
    """Proxy endpoint for external APIs"""
    
    target_url = f"https://external-api.example.com/{path}"
    
    # Forward request headers (excluding some)
    excluded_headers = {'host', 'content-length', 'transfer-encoding'}
    headers = {k: v for k, v in request.headers.items() if k.lower() not in excluded_headers}
    
    async with httpx.AsyncClient() as client:
        # Get request body
        body = await request.body()
        
        try:
            response = await client.request(
                method=request.method,
                url=target_url,
                headers=headers,
                content=body,
                params=request.query_params,
                timeout=30.0
            )
            
            # Forward response headers (excluding some)
            excluded_response_headers = {'content-encoding', 'content-length', 'transfer-encoding', 'connection'}
            response_headers = {k: v for k, v in response.headers.items() if k.lower() not in excluded_response_headers}
            
            return Response(
                content=response.content,
                status_code=response.status_code,
                headers=response_headers
            )
            
        except httpx.RequestError as e:
            return Response(
                content=json.dumps({'error': str(e)}),
                status_code=502,
                headers={'content-type': 'application/json'}
            )

# WebSocket-like streaming transfer
@app.get("/stream-proxy/{path:path}")
async def stream_proxy(path: str):
    """Streaming response proxy"""
    
    target_url = f"https://streaming-api.example.com/{path}"
    
    async def generate_stream():
        async with httpx.AsyncClient() as client:
            async with client.stream('GET', target_url) as response:
                async for chunk in response.aiter_bytes():
                    yield chunk
    
    return StreamingResponse(
        generate_stream(),
        media_type='application/octet-stream'
    )

# Usage example
# async def main():
#     await api_client_example()
#     await upload_file_async(Path('/path/to/file.pdf'), 'https://api.example.com/upload')
#     await download_large_file_async('https://api.example.com/large-file.zip', Path('/tmp/file.zip'))

# asyncio.run(main())