HTTPX
Modern HTTP client for Python. Provides Requests-like API with full async/await support, HTTP/2 compatibility, and WebSocket connection capabilities. Next-generation HTTP library enabling both synchronous and asynchronous APIs through unified interface.
GitHub Overview
encode/httpx
A next generation HTTP client for Python. 🦋
Topics
Star History
Library
HTTPX
Overview
HTTPX is developed as "the next-generation Python HTTP client" serving as the modern successor to the requests library. It provides a unified interface for both synchronous and asynchronous APIs, implementing complete support for HTTP/1.1 and HTTP/2, excellent streaming capabilities, and comprehensive authentication systems. While maintaining high compatibility with the requests library, it addresses modern web application requirements as a high-performance HTTP communication library, experiencing rapid adoption growth within the Python ecosystem.
Details
HTTPX 2025 edition has established itself as the new standard for Python HTTP communication against the backdrop of widespread asynchronous programming adoption and HTTP/2 standardization. It particularly excels in integration with ASGI frameworks like FastAPI and Starlette, supporting high-performance asynchronous web application development. While offering an API as simple as requests, it provides rich functionality to meet modern HTTP communication requirements including async/await support, HTTP/2 support, streaming processing, and customizable Transport layers.
Key Features
- Unified API: Consistent interface for both synchronous and asynchronous operations
- Complete HTTP/2 Support: Multiplexing and performance improvements
- Requests Compatibility: Easy migration from existing code
- Streaming Processing: Efficient handling of large data volumes
- Flexible Transport: Customizable communication layer
- Comprehensive Authentication: Support for Basic, Digest, NetRC, and custom authentication
Pros and Cons
Pros
- Easy migration from existing code through requests-compatible API
- Reduced learning costs with unified interface for sync/async operations
- Performance acceleration and connection efficiency through HTTP/2 support
- Efficient processing of large HTTP request volumes through asynchronous processing
- Support for modern Python development patterns (type hints, async/await)
- Excellent integration with latest frameworks like FastAPI
- Modern async/await support compared to requests' synchronous-only approach
Cons
- Not as widespread in adoption and ecosystem maturity as requests yet
- Requires server-side support to benefit from HTTP/2 advantages
- Requires understanding of asynchronous programming, complex for beginners
- Some requests extension libraries are not yet supported
- Documentation and learning resources more limited than requests
- Performance improvements vary in effectiveness depending on use cases
Reference Pages
Code Examples
Installation and Basic Setup
# Install HTTPX
pip install httpx
# Install with HTTP/2 support (recommended)
pip install 'httpx[http2]'
# Install development version with all features
pip install 'httpx[all]'
# Check installation
python -c "import httpx; print(httpx.__version__)"
Basic Requests (GET/POST/PUT/DELETE)
import httpx
# Basic GET request (synchronous)
response = httpx.get('https://api.example.com/users')
print(response.status_code) # 200
print(response.headers['content-type']) # application/json
print(response.text) # Response text
data = response.json() # Parse as JSON
print(data)
# GET request with URL parameters
params = {'page': 1, 'limit': 10, 'sort': 'created_at'}
response = httpx.get('https://api.example.com/users', params=params)
print(response.url) # https://api.example.com/users?page=1&limit=10&sort=created_at
# POST request (sending JSON)
user_data = {
'name': 'John Doe',
'email': '[email protected]',
'age': 30
}
response = httpx.post(
'https://api.example.com/users',
json=user_data, # Automatically sets Content-Type: application/json
headers={'Authorization': 'Bearer your-token'}
)
if response.status_code == 201:
created_user = response.json()
print(f"User created: ID={created_user['id']}")
else:
print(f"Error: {response.status_code} - {response.text}")
# POST request (sending form data)
form_data = {'username': 'testuser', 'password': 'secret123'}
response = httpx.post('https://api.example.com/login', data=form_data)
# PUT request (data update)
updated_data = {'name': 'Jane Doe', 'email': '[email protected]'}
response = httpx.put(
'https://api.example.com/users/123',
json=updated_data,
headers={'Authorization': 'Bearer your-token'}
)
# DELETE request
response = httpx.delete(
'https://api.example.com/users/123',
headers={'Authorization': 'Bearer your-token'}
)
if response.status_code == 204:
print("User deleted successfully")
# Detailed response attribute inspection
print(f"Status code: {response.status_code}")
print(f"Reason phrase: {response.reason_phrase}")
print(f"Headers: {response.headers}")
print(f"Encoding: {response.encoding}")
print(f"Request URL: {response.url}")
print(f"HTTP version: {response.http_version}")
# Basic asynchronous request example
import asyncio
async def async_request_example():
async with httpx.AsyncClient() as client:
response = await client.get('https://api.example.com/users')
data = response.json()
print(f"Async fetched data: {len(data)} items")
return data
# Execution example
# asyncio.run(async_request_example())
Advanced Configuration and Customization (Headers, Authentication, Timeout, etc.)
import httpx
import ssl
# Custom header configuration
headers = {
'User-Agent': 'MyApp/1.0 (HTTPX Python)',
'Accept': 'application/json',
'Accept-Language': 'en-US,ja-JP',
'X-API-Version': 'v2',
'X-Request-ID': 'req-12345'
}
response = httpx.get('https://api.example.com/data', headers=headers)
# Basic authentication
auth = httpx.BasicAuth('username', 'password')
response = httpx.get('https://api.example.com/private', auth=auth)
# Or authentication tuple (requests compatible)
response = httpx.get('https://api.example.com/private', auth=('username', 'password'))
# Digest authentication
digest_auth = httpx.DigestAuth('username', 'password')
response = httpx.get('https://api.example.com/digest', auth=digest_auth)
# NetRC authentication (using .netrc file)
netrc_auth = httpx.NetRCAuth()
response = httpx.get('https://api.example.com/netrc', auth=netrc_auth)
# Bearer Token authentication
headers = {'Authorization': 'Bearer your-jwt-token'}
response = httpx.get('https://api.example.com/protected', headers=headers)
# Timeout configuration
try:
# Connection timeout 5 seconds, read timeout 10 seconds
timeout = httpx.Timeout(5.0, read=10.0)
response = httpx.get('https://api.example.com/slow', timeout=timeout)
# Simple timeout (overall 15 seconds)
response = httpx.get('https://api.example.com/data', timeout=15.0)
# Disable timeout
response = httpx.get('https://api.example.com/unlimited', timeout=None)
except httpx.TimeoutException:
print("Request timed out")
# SSL configuration and client certificates
ssl_context = ssl.create_default_context()
ssl_context.load_cert_chain('/path/to/client.pem')
response = httpx.get(
'https://secure-api.example.com/data',
verify=ssl_context # Custom SSL context
)
# Disable SSL certificate verification (development only)
response = httpx.get('https://self-signed.example.com/', verify=False)
# Proxy configuration
proxies = {
'http://': 'http://proxy.example.com:8080',
'https://': 'http://proxy.example.com:8080'
}
response = httpx.get('https://api.example.com/data', proxies=proxies)
# Authenticated proxy
proxies = {
'http://': 'http://user:[email protected]:8080',
'https://': 'http://user:[email protected]:8080'
}
# Cookie configuration
cookies = {'session_id': 'abc123', 'user_pref': 'dark_mode'}
response = httpx.get('https://api.example.com/user-data', cookies=cookies)
# Redirect control
response = httpx.get(
'https://api.example.com/redirect',
follow_redirects=False # Disable redirects
)
# Enable HTTP/2
with httpx.Client(http2=True) as client:
response = client.get('https://api.example.com/http2-endpoint')
print(f"HTTP version: {response.http_version}")
# Detailed timeout configuration
timeout = httpx.Timeout(
connect=5.0, # Connection timeout
read=10.0, # Read timeout
write=5.0, # Write timeout
pool=15.0 # Pool timeout
)
with httpx.Client(timeout=timeout) as client:
response = client.get('https://api.example.com/data')
Error Handling and Retry Functionality
import httpx
import asyncio
import time
from typing import Optional
# Comprehensive error handling
def safe_request(url: str, **kwargs) -> Optional[httpx.Response]:
try:
response = httpx.get(url, **kwargs)
# HTTP status code check
response.raise_for_status() # Raises HTTPStatusError for 4xx/5xx
return response
except httpx.ConnectError as e:
print(f"Connection error: {e}")
print("Please check your network connection")
except httpx.TimeoutException as e:
print(f"Timeout error: {e}")
print("Request timed out")
except httpx.HTTPStatusError as e:
print(f"HTTP error: {e}")
print(f"Status code: {e.response.status_code}")
print(f"Response: {e.response.text}")
except httpx.TooManyRedirects as e:
print(f"Redirect error: {e}")
print("Too many redirects")
except httpx.RequestError as e:
print(f"Request error: {e}")
print("Unexpected error occurred")
return None
# Usage example
response = safe_request('https://api.example.com/data', timeout=10.0)
if response:
data = response.json()
print(data)
# Manual retry implementation (synchronous version)
def request_with_retry(url: str, max_retries: int = 3, backoff_factor: float = 1.0, **kwargs) -> httpx.Response:
for attempt in range(max_retries + 1):
try:
response = httpx.get(url, **kwargs)
response.raise_for_status()
return response
except httpx.RequestError as e:
if attempt == max_retries:
print(f"Maximum attempts reached: {e}")
raise
wait_time = backoff_factor * (2 ** attempt)
print(f"Attempt {attempt + 1} failed. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
# Asynchronous retry implementation
async def async_request_with_retry(
client: httpx.AsyncClient,
url: str,
max_retries: int = 3,
backoff_factor: float = 1.0,
**kwargs
) -> httpx.Response:
for attempt in range(max_retries + 1):
try:
response = await client.get(url, **kwargs)
response.raise_for_status()
return response
except httpx.RequestError as e:
if attempt == max_retries:
print(f"Maximum attempts reached: {e}")
raise
wait_time = backoff_factor * (2 ** attempt)
print(f"Attempt {attempt + 1} failed. Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
# Usage example
try:
response = request_with_retry(
'https://api.example.com/unstable',
max_retries=3,
backoff_factor=1.0,
timeout=10.0
)
print("Request successful:", response.status_code)
except httpx.RequestError as e:
print("Finally failed:", e)
# Custom authentication with error handling
class CustomAuthWithRetry(httpx.Auth):
def __init__(self, token: str):
self.token = token
def auth_flow(self, request):
# First, set token
request.headers['X-Authentication'] = self.token
response = yield request
# On 401 error, retry with new token
if response.status_code == 401:
# Get new token (in actual implementation, call external API)
new_token = self.refresh_token()
request.headers['X-Authentication'] = new_token
yield request
def refresh_token(self) -> str:
# Token refresh logic
return "new-refreshed-token"
# Status code-specific handling
response = httpx.get('https://api.example.com/status-check')
if response.status_code == 200:
print("Success: ", response.json())
elif response.status_code == 401:
print("Authentication error: Please check your token")
elif response.status_code == 403:
print("Permission error: Access denied")
elif response.status_code == 404:
print("Not found: Resource does not exist")
elif response.status_code == 429:
print("Rate limit: Please wait before retrying")
elif response.status_code >= 500:
print("Server error: Problem on server side")
else:
print(f"Unexpected status: {response.status_code}")
# Detailed HTTP status error handling
try:
response = httpx.get('https://api.example.com/may-fail')
response.raise_for_status()
except httpx.HTTPStatusError as exc:
print(f"Error response {exc.response.status_code} while requesting {exc.request.url!r}")
# Check response content
error_details = exc.response.json() if exc.response.headers.get('content-type') == 'application/json' else exc.response.text
print(f"Error details: {error_details}")
Concurrent Processing and Asynchronous Requests
import httpx
import asyncio
from typing import List, Dict, Any
# Parallel requests using asynchronous client
async def fetch_multiple_urls_async(urls: List[str]) -> List[Dict[str, Any]]:
async with httpx.AsyncClient() as client:
tasks = []
for url in urls:
task = asyncio.create_task(fetch_url_async(client, url))
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({
'url': urls[i],
'success': False,
'error': str(result)
})
else:
processed_results.append(result)
return processed_results
async def fetch_url_async(client: httpx.AsyncClient, url: str) -> Dict[str, Any]:
try:
response = await client.get(url, timeout=10.0)
response.raise_for_status()
return {
'url': url,
'status_code': response.status_code,
'content_length': len(response.content),
'success': True,
'data': response.json() if 'application/json' in response.headers.get('content-type', '') else None
}
except httpx.RequestError as e:
return {
'url': url,
'success': False,
'error': str(e)
}
# Usage example
urls = [
'https://api.example.com/users',
'https://api.example.com/posts',
'https://api.example.com/comments',
'https://api.example.com/categories'
]
# Run in async environment
# results = await fetch_multiple_urls_async(urls)
# successful_results = [r for r in results if r['success']]
# print(f"Success: {len(successful_results)}/{len(urls)}")
# Concurrent connection control using semaphore
async def fetch_with_semaphore(urls: List[str], max_concurrent: int = 5) -> List[Dict[str, Any]]:
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_limit(client: httpx.AsyncClient, url: str) -> Dict[str, Any]:
async with semaphore:
return await fetch_url_async(client, url)
async with httpx.AsyncClient() as client:
tasks = [fetch_with_limit(client, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r if not isinstance(r, Exception) else {'url': 'unknown', 'success': False, 'error': str(r)} for r in results]
# Pagination-aware asynchronous data fetching
async def fetch_all_pages_async(base_url: str, headers: Dict[str, str] = None, max_pages: int = None) -> List[Dict[str, Any]]:
all_data = []
page = 1
async with httpx.AsyncClient() as client:
if headers:
client.headers.update(headers)
while True:
try:
params = {'page': page, 'per_page': 100}
response = await client.get(base_url, params=params, timeout=10.0)
response.raise_for_status()
data = response.json()
if not data or (isinstance(data, list) and len(data) == 0):
break # End if no data
if isinstance(data, dict) and 'items' in data:
items = data['items']
all_data.extend(items)
# End if no next page
if not data.get('has_more', True) or len(items) == 0:
break
else:
all_data.extend(data)
print(f"Page {page} completed: {len(data if isinstance(data, list) else data.get('items', []))} items")
page += 1
# Check maximum page limit
if max_pages and page > max_pages:
break
# Wait to reduce API load
await asyncio.sleep(0.1)
except httpx.RequestError as e:
print(f"Error on page {page}: {e}")
break
print(f"Total items fetched: {len(all_data)}")
return all_data
# Streaming response processing
async def stream_large_response(url: str) -> None:
async with httpx.AsyncClient() as client:
async with client.stream('GET', url) as response:
response.raise_for_status()
# Check header information
content_length = response.headers.get('content-length')
if content_length:
print(f"Content size: {int(content_length):,} bytes")
# Process by chunks
downloaded = 0
async for chunk in response.aiter_bytes(chunk_size=8192):
downloaded += len(chunk)
print(f"Downloaded: {downloaded:,} bytes", end='\r')
# Perform actual chunk processing (file writing, etc.) here
print(f"\nDownload completed: {downloaded:,} bytes")
# Efficient batch processing
async def process_items_in_batches(items: List[str], batch_size: int = 10) -> List[Dict[str, Any]]:
all_results = []
async with httpx.AsyncClient() as client:
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
print(f"Processing batch {i//batch_size + 1}: {len(batch)} items")
# Parallel processing within batch
tasks = [fetch_url_async(client, f'https://api.example.com/items/{item}') for item in batch]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
# Error handling
for result in batch_results:
if isinstance(result, Exception):
all_results.append({'success': False, 'error': str(result)})
else:
all_results.append(result)
# Wait between batches
if i + batch_size < len(items):
await asyncio.sleep(1.0)
return all_results
# WebSocket-like streaming processing
async def handle_streaming_events(url: str) -> None:
async with httpx.AsyncClient() as client:
async with client.stream('GET', url) as response:
async for line in response.aiter_lines():
if line.startswith('data: '):
# Server-Sent Events format data processing
event_data = line[6:] # Remove 'data: ' prefix
try:
import json
data = json.loads(event_data)
print(f"Event received: {data}")
# Process event data here
except json.JSONDecodeError:
print(f"Text event: {event_data}")
Framework Integration and Practical Examples
import httpx
import asyncio
from typing import Optional, Dict, Any
import json
from pathlib import Path
# Asynchronous HTTP client for FastAPI integration
class AsyncAPIClient:
def __init__(self, base_url: str, token: Optional[str] = None, http2: bool = True):
self.base_url = base_url.rstrip('/')
self.token = token
self.http2 = http2
async def __aenter__(self):
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'User-Agent': 'AsyncAPIClient/1.0'
}
if self.token:
headers['Authorization'] = f'Bearer {self.token}'
self.client = httpx.AsyncClient(
headers=headers,
timeout=httpx.Timeout(30.0),
http2=self.http2,
follow_redirects=True
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.client.aclose()
async def get(self, endpoint: str, **kwargs) -> Dict[str, Any]:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
return await self._make_request('GET', url, **kwargs)
async def post(self, endpoint: str, data: Optional[Dict] = None, **kwargs) -> Dict[str, Any]:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
return await self._make_request('POST', url, json=data, **kwargs)
async def put(self, endpoint: str, data: Optional[Dict] = None, **kwargs) -> Dict[str, Any]:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
return await self._make_request('PUT', url, json=data, **kwargs)
async def delete(self, endpoint: str, **kwargs) -> Optional[Dict[str, Any]]:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
return await self._make_request('DELETE', url, **kwargs)
async def _make_request(self, method: str, url: str, **kwargs) -> Optional[Dict[str, Any]]:
try:
response = await self.client.request(method, url, **kwargs)
response.raise_for_status()
if response.content:
return response.json()
return None
except httpx.RequestError as e:
print(f"API Error: {method} {url} - {e}")
raise
# Usage example
async def api_client_example():
async with AsyncAPIClient('https://api.example.com/v1', token='your-jwt-token') as client:
# Get user list
users = await client.get('users', params={'page': 1, 'limit': 50})
# Create new user
new_user = await client.post('users', data={
'name': 'John Doe',
'email': '[email protected]'
})
# Update user
updated_user = await client.put(f'users/{new_user["id"]}', data={
'name': 'Jane Doe'
})
# Custom authentication class (automatic token refresh)
class AutoRefreshAuth(httpx.Auth):
def __init__(self, client_id: str, client_secret: str, token_url: str):
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self.access_token = None
self.refresh_token = None
def auth_flow(self, request):
# Get new token if first time or invalid token
if not self.access_token:
self._get_new_token()
request.headers['Authorization'] = f'Bearer {self.access_token}'
response = yield request
# On 401 error, refresh token and retry
if response.status_code == 401:
self._refresh_access_token()
request.headers['Authorization'] = f'Bearer {self.access_token}'
yield request
def _get_new_token(self):
# Get token with client authentication
auth_data = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'client_credentials'
}
response = httpx.post(self.token_url, data=auth_data)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data['access_token']
self.refresh_token = token_data.get('refresh_token')
def _refresh_access_token(self):
# Update access token with refresh token
if self.refresh_token:
refresh_data = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'refresh_token',
'refresh_token': self.refresh_token
}
response = httpx.post(self.token_url, data=refresh_data)
if response.status_code == 200:
token_data = response.json()
self.access_token = token_data['access_token']
return
# Get new token if refresh fails
self._get_new_token()
# File upload (multipart)
async def upload_file_async(file_path: Path, upload_url: str, additional_fields: Dict[str, str] = None) -> Dict[str, Any]:
"""Asynchronous file upload"""
async with httpx.AsyncClient(timeout=httpx.Timeout(300.0)) as client:
files = {'file': (file_path.name, file_path.open('rb'), 'application/octet-stream')}
data = additional_fields or {}
try:
response = await client.post(
upload_url,
files=files,
data=data,
headers={'Authorization': 'Bearer your-token'}
)
response.raise_for_status()
return response.json()
finally:
files['file'][1].close()
# Large file streaming download
async def download_large_file_async(url: str, local_path: Path, chunk_size: int = 8192) -> None:
"""Large file asynchronous streaming download"""
async with httpx.AsyncClient(timeout=httpx.Timeout(None)) as client:
async with client.stream('GET', url) as response:
response.raise_for_status()
# Get file size
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
with local_path.open('wb') as f:
async for chunk in response.aiter_bytes(chunk_size=chunk_size):
f.write(chunk)
downloaded += len(chunk)
# Progress display
if total_size > 0:
progress = (downloaded / total_size) * 100
print(f"Download progress: {progress:.1f}%", end='\r')
print(f"\nDownload completed: {local_path}")
# Proxy server integration (FastAPI)
from fastapi import FastAPI, Request, Response
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.api_route("/proxy/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def proxy_requests(request: Request, path: str):
"""Proxy endpoint for external APIs"""
target_url = f"https://external-api.example.com/{path}"
# Forward request headers (excluding some)
excluded_headers = {'host', 'content-length', 'transfer-encoding'}
headers = {k: v for k, v in request.headers.items() if k.lower() not in excluded_headers}
async with httpx.AsyncClient() as client:
# Get request body
body = await request.body()
try:
response = await client.request(
method=request.method,
url=target_url,
headers=headers,
content=body,
params=request.query_params,
timeout=30.0
)
# Forward response headers (excluding some)
excluded_response_headers = {'content-encoding', 'content-length', 'transfer-encoding', 'connection'}
response_headers = {k: v for k, v in response.headers.items() if k.lower() not in excluded_response_headers}
return Response(
content=response.content,
status_code=response.status_code,
headers=response_headers
)
except httpx.RequestError as e:
return Response(
content=json.dumps({'error': str(e)}),
status_code=502,
headers={'content-type': 'application/json'}
)
# WebSocket-like streaming transfer
@app.get("/stream-proxy/{path:path}")
async def stream_proxy(path: str):
"""Streaming response proxy"""
target_url = f"https://streaming-api.example.com/{path}"
async def generate_stream():
async with httpx.AsyncClient() as client:
async with client.stream('GET', target_url) as response:
async for chunk in response.aiter_bytes():
yield chunk
return StreamingResponse(
generate_stream(),
media_type='application/octet-stream'
)
# Usage example
# async def main():
# await api_client_example()
# await upload_file_async(Path('/path/to/file.pdf'), 'https://api.example.com/upload')
# await download_large_file_async('https://api.example.com/large-file.zip', Path('/tmp/file.zip'))
# asyncio.run(main())