aiohttp
Asynchronous HTTP client and server library for Python. Achieves high-performance async communication based on asyncio. Comprehensive async web framework providing HTTP client functionality plus web server, WebSocket, and middleware support.
GitHub Overview
aio-libs/aiohttp
Asynchronous HTTP client/server framework for asyncio and Python
Topics
Star History
Library
aiohttp
Overview
aiohttp is developed as "an asynchronous HTTP client-server library for Python" - a comprehensive web framework based on asyncio. In addition to HTTP client functionality, it provides integrated HTTP server, WebSocket, middleware, and routing capabilities. Optimized for asynchronous programming with Python 3.8+, it fully supports async/await syntax. Enabling high-performance I/O processing and scalable web application development, it has established itself as a major component of the Python asynchronous web ecosystem alongside FastAPI and Starlette.
Details
aiohttp 2025 edition continues to evolve with the maturation of Python asynchronous programming and is widely adopted for enterprise-level asynchronous web application development. Beyond being just an HTTP client, it provides a comprehensive solution integrating web server, WebSocket, streaming, and middleware systems. Through deep integration with asyncio, it efficiently handles thousands of simultaneous connections, demonstrating its power in microservice architecture and real-time application development. With a rich ecosystem and extensibility, it supports everything from small prototypes to large-scale production environments.
Key Features
- Client-Server Integration: Provides both HTTP client and server functionality
- Complete WebSocket Support: Real-time communication and streaming processing
- Async-First Design: High-performance I/O processing through complete asyncio integration
- Middleware System: Flexible request processing pipeline
- Session Management: Advanced cookie, authentication, and connection pooling
- Streaming Processing: Efficient processing capabilities for large volumes of data
Pros and Cons
Pros
- Consistent development experience through integrated HTTP client and server functionality
- High throughput and concurrent connection handling through asynchronous processing
- Real-time application development support through WebSocket support
- Rich middleware ecosystem and extension features
- Mature session management and authentication systems
- Excellent compatibility with microservices architecture
Cons
- High learning cost requiring understanding of asynchronous programming
- More complex compared to simple synchronous APIs like requests
- Debugging and error handling more difficult than synchronous processing
- Large overhead for small, one-off requests
- Requires Python 3.8+ with no support for older versions
- Initial configuration and setup more complex than requests
Reference Pages
Code Examples
Installation and Basic Setup
# Install aiohttp
pip install aiohttp
# Security-enhanced version (recommended)
pip install aiohttp[speedups]
# Additional tools for development environment
pip install aiohttp[speedups,dev]
# Check dependencies
python -c "import aiohttp; print(aiohttp.__version__)"
Basic Requests (GET/POST/PUT/DELETE)
import asyncio
import aiohttp
# Basic GET request
async def basic_get_request():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/users') as response:
print(f"Status: {response.status}")
print(f"Headers: {response.headers}")
# Get JSON response
data = await response.json()
print(f"User data: {data}")
# Get text response
# text = await response.text()
# print(f"Text: {text}")
# GET request with parameters
async def get_with_params():
async with aiohttp.ClientSession() as session:
params = {
'page': 1,
'limit': 10,
'sort': 'created_at',
'filter': 'active'
}
async with session.get('https://api.example.com/users', params=params) as response:
if response.status == 200:
data = await response.json()
print(f"Retrieved data: {len(data)} items")
return data
else:
print(f"Error: {response.status}")
return None
# POST request (sending JSON)
async def post_json_request():
user_data = {
'name': 'John Doe',
'email': '[email protected]',
'age': 30,
'department': 'Development'
}
headers = {
'Authorization': 'Bearer your-jwt-token',
'Content-Type': 'application/json'
}
async with aiohttp.ClientSession() as session:
async with session.post(
'https://api.example.com/users',
json=user_data,
headers=headers
) as response:
if response.status == 201:
created_user = await response.json()
print(f"User created: ID={created_user['id']}")
return created_user
else:
error_text = await response.text()
print(f"Creation failed: {response.status} - {error_text}")
return None
# POST request (sending form data)
async def post_form_request():
form_data = aiohttp.FormData()
form_data.add_field('username', 'testuser')
form_data.add_field('password', 'secret123')
form_data.add_field('remember_me', 'true')
async with aiohttp.ClientSession() as session:
async with session.post('https://api.example.com/login', data=form_data) as response:
if response.status == 200:
result = await response.json()
print(f"Login successful: {result}")
return result
else:
print(f"Login failed: {response.status}")
return None
# PUT request (data update)
async def put_request():
update_data = {
'name': 'Jane Doe',
'email': '[email protected]',
'age': 31
}
headers = {'Authorization': 'Bearer your-jwt-token'}
async with aiohttp.ClientSession() as session:
async with session.put(
'https://api.example.com/users/123',
json=update_data,
headers=headers
) as response:
if response.status == 200:
updated_user = await response.json()
print(f"User updated: {updated_user}")
return updated_user
else:
error_text = await response.text()
print(f"Update failed: {response.status} - {error_text}")
return None
# DELETE request
async def delete_request():
headers = {'Authorization': 'Bearer your-jwt-token'}
async with aiohttp.ClientSession() as session:
async with session.delete('https://api.example.com/users/123', headers=headers) as response:
if response.status == 204:
print("User deleted successfully")
return True
else:
error_text = await response.text()
print(f"Deletion failed: {response.status} - {error_text}")
return False
# Get detailed response information
async def detailed_response_info():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/info') as response:
print(f"Status code: {response.status}")
print(f"Reason: {response.reason}")
print(f"HTTP version: {response.version}")
print(f"URL: {response.url}")
print(f"History: {response.history}")
print(f"Headers: {dict(response.headers)}")
print(f"Content-Type: {response.content_type}")
print(f"Charset: {response.charset}")
# Execution example
async def main():
await basic_get_request()
await get_with_params()
await post_json_request()
await post_form_request()
await put_request()
await delete_request()
await detailed_response_info()
# asyncio.run(main())
Advanced Configuration and Customization (Headers, Authentication, Timeout, etc.)
import aiohttp
import asyncio
import ssl
# Custom headers and session configuration
async def custom_session_example():
headers = {
'User-Agent': 'MyApp/1.0 (aiohttp Python)',
'Accept': 'application/json',
'Accept-Language': 'en-US,ja-JP;q=0.9',
'X-API-Version': 'v2',
'X-Request-ID': 'req-12345'
}
timeout = aiohttp.ClientTimeout(total=30, connect=10)
async with aiohttp.ClientSession(
headers=headers,
timeout=timeout,
connector=aiohttp.TCPConnector(limit=100, ttl_dns_cache=300)
) as session:
async with session.get('https://api.example.com/data') as response:
data = await response.json()
return data
# Basic authentication configuration
async def basic_auth_example():
auth = aiohttp.BasicAuth('username', 'password')
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/private', auth=auth) as response:
if response.status == 200:
data = await response.json()
print(f"Authentication successful: {data}")
return data
else:
print(f"Authentication failed: {response.status}")
return None
# Detailed timeout configuration
async def timeout_configuration():
# Detailed timeout settings
timeout = aiohttp.ClientTimeout(
total=30, # Total timeout (30 seconds)
connect=5, # Connection timeout (5 seconds)
sock_read=10, # Socket read timeout (10 seconds)
sock_connect=5 # Socket connection timeout (5 seconds)
)
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
async with session.get('https://api.example.com/slow-endpoint') as response:
data = await response.json()
return data
except asyncio.TimeoutError:
print("Request timed out")
return None
# SSL certificate and HTTPS configuration
async def ssl_configuration():
# Disable SSL certificate verification (development only)
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
connector = aiohttp.TCPConnector(ssl=ssl_context)
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get('https://self-signed.example.com/api') as response:
data = await response.json()
return data
# Client certificate authentication
async def client_certificate_auth():
ssl_context = ssl.create_default_context()
ssl_context.load_cert_chain('/path/to/client.crt', '/path/to/client.key')
connector = aiohttp.TCPConnector(ssl=ssl_context)
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get('https://secure-api.example.com/data') as response:
data = await response.json()
return data
# Proxy configuration
async def proxy_configuration():
# HTTP proxy configuration
proxy = 'http://proxy.example.com:8080'
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data', proxy=proxy) as response:
data = await response.json()
return data
# Authenticated proxy
async def authenticated_proxy():
proxy = 'http://user:[email protected]:8080'
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data', proxy=proxy) as response:
data = await response.json()
return data
# Cookie management
async def cookie_management():
# Session management with Cookie Jar
jar = aiohttp.CookieJar(unsafe=True) # unsafe=True accepts non-HTTPS cookies
async with aiohttp.ClientSession(cookie_jar=jar) as session:
# Login and get session cookies
login_data = {'username': 'user', 'password': 'pass'}
async with session.post('https://api.example.com/login', json=login_data) as response:
if response.status == 200:
print("Login successful, cookies saved")
# Subsequent requests automatically send cookies
async with session.get('https://api.example.com/protected') as response:
data = await response.json()
return data
# Custom connector configuration
async def custom_connector():
connector = aiohttp.TCPConnector(
limit=100, # Total connection limit
limit_per_host=30, # Per-host connection limit
ttl_dns_cache=300, # DNS cache TTL (seconds)
use_dns_cache=True, # Use DNS cache
keepalive_timeout=30, # Keep-Alive timeout
enable_cleanup_closed=True # Cleanup closed connections
)
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get('https://api.example.com/data') as response:
data = await response.json()
return data
# Request with retry functionality
async def request_with_retry(url, max_retries=3, backoff_factor=1):
async with aiohttp.ClientSession() as session:
for attempt in range(max_retries + 1):
try:
async with session.get(url) as response:
if response.status == 200:
return await response.json()
elif response.status in [500, 502, 503, 504]:
if attempt < max_retries:
wait_time = backoff_factor * (2 ** attempt)
print(f"Server error {response.status}. Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
continue
# Non-retryable errors
response.raise_for_status()
except aiohttp.ClientError as e:
if attempt < max_retries:
wait_time = backoff_factor * (2 ** attempt)
print(f"Connection error: {e}. Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
else:
raise
return None
Error Handling and Retry Functionality
import aiohttp
import asyncio
import logging
from typing import Optional, Dict, Any
# Comprehensive error handling
async def comprehensive_error_handling():
async with aiohttp.ClientSession() as session:
try:
async with session.get('https://api.example.com/users', timeout=10) as response:
# Explicitly check HTTP status errors
response.raise_for_status()
data = await response.json()
return data
except aiohttp.ClientConnectionError as e:
print(f"Connection error: {e}")
print("Please check network connection or DNS settings")
except aiohttp.ClientTimeout as e:
print(f"Timeout error: {e}")
print("Request timed out")
except aiohttp.ClientResponseError as e:
print(f"HTTP response error: {e}")
print(f"Status code: {e.status}")
print(f"Response text: {await e.response.text()}")
except aiohttp.ClientPayloadError as e:
print(f"Payload error: {e}")
print("Error occurred while reading response")
except aiohttp.ClientError as e:
print(f"Client error: {e}")
print("Unexpected client error occurred")
except asyncio.TimeoutError:
print("Asynchronous processing timed out")
except Exception as e:
print(f"Unexpected error: {e}")
return None
# Advanced retry class
class RetryableSession:
def __init__(self, max_retries=3, backoff_factor=1.0, retry_statuses=None):
self.max_retries = max_retries
self.backoff_factor = backoff_factor
self.retry_statuses = retry_statuses or [500, 502, 503, 504, 429]
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def request_with_retry(self, method: str, url: str, **kwargs) -> Optional[aiohttp.ClientResponse]:
for attempt in range(self.max_retries + 1):
try:
async with self.session.request(method, url, **kwargs) as response:
if response.status not in self.retry_statuses:
return response
if attempt < self.max_retries:
wait_time = self.backoff_factor * (2 ** attempt)
print(f"Status {response.status} retry {attempt + 1}/{self.max_retries}. Waiting {wait_time} seconds...")
await asyncio.sleep(wait_time)
continue
else:
return response
except (aiohttp.ClientConnectionError, aiohttp.ClientTimeout) as e:
if attempt < self.max_retries:
wait_time = self.backoff_factor * (2 ** attempt)
print(f"Connection error retry {attempt + 1}/{self.max_retries}: {e}. Waiting {wait_time} seconds...")
await asyncio.sleep(wait_time)
else:
raise
return None
# Retry session usage example
async def retry_session_example():
async with RetryableSession(max_retries=3, backoff_factor=1.5) as retry_session:
response = await retry_session.request_with_retry('GET', 'https://api.example.com/unstable')
if response:
data = await response.json()
return data
return None
# Detailed error-specific processing
async def detailed_error_processing():
async with aiohttp.ClientSession() as session:
try:
async with session.get('https://api.example.com/data') as response:
if response.status == 200:
return await response.json()
elif response.status == 401:
print("Authentication error: Please check your token")
return {'error': 'authentication_required'}
elif response.status == 403:
print("Permission error: Access denied")
return {'error': 'permission_denied'}
elif response.status == 404:
print("Not found: Resource does not exist")
return {'error': 'not_found'}
elif response.status == 429:
# Rate limiting handling
retry_after = response.headers.get('Retry-After')
if retry_after:
print(f"Rate limit: Please retry after {retry_after} seconds")
await asyncio.sleep(int(retry_after))
# Add retry logic here
return {'error': 'rate_limited'}
elif response.status >= 500:
print("Server error: Problem on server side")
return {'error': 'server_error'}
else:
print(f"Unexpected status: {response.status}")
return {'error': 'unexpected_status'}
except Exception as e:
print(f"Error during processing: {e}")
return {'error': 'processing_failed'}
# Response validation and error handling
async def response_validation():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as response:
# Content-Type check
content_type = response.headers.get('Content-Type', '')
if 'application/json' not in content_type:
print(f"Unexpected Content-Type: {content_type}")
return None
try:
data = await response.json()
# Data structure validation
if not isinstance(data, dict):
print("Response is not a JSON object")
return None
# Check required fields
required_fields = ['id', 'name', 'status']
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
print(f"Missing required fields: {missing_fields}")
return None
return data
except aiohttp.ContentTypeError as e:
print(f"JSON parse error: {e}")
text_content = await response.text()
print(f"Response content: {text_content[:200]}...")
return None
# Circuit breaker pattern implementation
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
async def call(self, func, *args, **kwargs):
if self.state == 'OPEN':
import time
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = 'HALF_OPEN'
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise
def on_success(self):
self.failure_count = 0
self.state = 'CLOSED'
def on_failure(self):
self.failure_count += 1
import time
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
# Circuit breaker usage example
circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
async def protected_api_call():
async def api_request():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/unreliable') as response:
return await response.json()
try:
return await circuit_breaker.call(api_request)
except Exception as e:
print(f"Circuit breaker protection: {e}")
return None
Concurrent Processing and Asynchronous Requests
import aiohttp
import asyncio
from typing import List, Dict, Any, Optional
import time
# Fetch multiple URLs in parallel
async def fetch_multiple_urls(urls: List[str]) -> List[Dict[str, Any]]:
results = []
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
task = asyncio.create_task(fetch_single_url(session, url))
tasks.append(task)
completed_tasks = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(completed_tasks):
if isinstance(result, Exception):
results.append({
'url': urls[i],
'success': False,
'error': str(result),
'status_code': None
})
else:
results.append(result)
return results
async def fetch_single_url(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]:
try:
start_time = time.time()
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
end_time = time.time()
content_length = len(await response.read())
return {
'url': url,
'status_code': response.status,
'success': response.status == 200,
'content_length': content_length,
'response_time': end_time - start_time,
'headers': dict(response.headers)
}
except Exception as e:
return {
'url': url,
'success': False,
'error': str(e),
'status_code': None
}
# Control concurrent connections with semaphore
async def controlled_concurrent_requests(urls: List[str], max_concurrent: int = 5) -> List[Dict[str, Any]]:
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_semaphore(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]:
async with semaphore:
return await fetch_single_url(session, url)
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_semaphore(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Pagination-aware full data fetching
async def fetch_all_paginated_data(base_url: str, headers: Optional[Dict[str, str]] = None) -> List[Dict[str, Any]]:
all_data = []
page = 1
has_more = True
session_headers = headers or {}
async with aiohttp.ClientSession(headers=session_headers) as session:
while has_more:
try:
params = {'page': page, 'per_page': 100}
async with session.get(base_url, params=params) as response:
if response.status == 200:
page_data = await response.json()
if isinstance(page_data, dict) and 'items' in page_data:
items = page_data['items']
if not items:
break
all_data.extend(items)
has_more = page_data.get('has_more', False)
elif isinstance(page_data, list):
if not page_data:
break
all_data.extend(page_data)
has_more = len(page_data) == 100 # Full page suggests more data
else:
break
print(f"Page {page} completed: {len(page_data if isinstance(page_data, list) else items)} items")
page += 1
# Reduce API load
await asyncio.sleep(0.1)
else:
print(f"Error on page {page}: {response.status}")
break
except Exception as e:
print(f"Exception on page {page}: {e}")
break
print(f"Total {len(all_data)} items fetched")
return all_data
# Gradual data processing (response-dependent)
async def dependent_requests_pipeline():
async with aiohttp.ClientSession() as session:
# Step 1: Get user information
async with session.get('https://api.example.com/user/profile') as response:
if response.status != 200:
return {'error': 'Failed to get user profile'}
user_profile = await response.json()
user_id = user_profile['id']
# Step 2: Get user's posts in parallel
posts_tasks = []
for category in ['tech', 'personal', 'work']:
task = session.get(f'https://api.example.com/users/{user_id}/posts/{category}')
posts_tasks.append(task)
posts_responses = await asyncio.gather(*posts_tasks)
# Step 3: Process posts data for each category
all_posts = {}
for i, response in enumerate(posts_responses):
category = ['tech', 'personal', 'work'][i]
async with response:
if response.status == 200:
posts_data = await response.json()
all_posts[category] = posts_data
else:
all_posts[category] = []
return {
'user_profile': user_profile,
'posts': all_posts
}
# WebSocket client functionality
async def websocket_client_example():
session = aiohttp.ClientSession()
try:
async with session.ws_connect('wss://api.example.com/websocket') as ws:
print("WebSocket connection established")
# Send initial message
await ws.send_str(json.dumps({
'type': 'subscribe',
'channel': 'updates'
}))
# Message receiving loop
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
print(f"Received: {data}")
# Response to specific messages
if data.get('type') == 'ping':
await ws.send_str(json.dumps({'type': 'pong'}))
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"WebSocket error: {ws.exception()}")
break
elif msg.type == aiohttp.WSMsgType.CLOSE:
print("WebSocket connection closed")
break
except Exception as e:
print(f"WebSocket connection error: {e}")
finally:
await session.close()
# Streaming download
async def streaming_download(url: str, file_path: str, chunk_size: int = 8192):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
total_size = int(response.headers.get('Content-Length', 0))
downloaded = 0
with open(file_path, 'wb') as file:
async for chunk in response.content.iter_chunked(chunk_size):
file.write(chunk)
downloaded += len(chunk)
if total_size > 0:
progress = (downloaded / total_size) * 100
print(f"Download progress: {progress:.1f}%", end='\r')
print(f"\nDownload completed: {file_path}")
return True
else:
print(f"Download failed: {response.status}")
return False
# Batch processing for request optimization
async def batch_process_requests(items: List[str], batch_size: int = 10):
all_results = []
async with aiohttp.ClientSession() as session:
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
print(f"Processing batch {i//batch_size + 1}: {len(batch)} items")
# Parallel processing within batch
tasks = []
for item in batch:
task = fetch_single_url(session, f'https://api.example.com/items/{item}')
tasks.append(task)
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for result in batch_results:
if isinstance(result, Exception):
all_results.append({'success': False, 'error': str(result)})
else:
all_results.append(result)
# Wait between batches
if i + batch_size < len(items):
await asyncio.sleep(0.5)
return all_results
# Usage example
async def main_parallel_example():
urls = [
'https://api.example.com/users',
'https://api.example.com/posts',
'https://api.example.com/comments',
'https://api.example.com/categories'
]
print("Executing parallel requests...")
results = await fetch_multiple_urls(urls)
successful_requests = [r for r in results if r['success']]
print(f"Success: {len(successful_requests)}/{len(urls)}")
print("\nTesting concurrent connection control...")
controlled_results = await controlled_concurrent_requests(urls, max_concurrent=2)
print("\nTesting pagination...")
paginated_data = await fetch_all_paginated_data('https://api.example.com/posts')
print("\nDependent request pipeline...")
pipeline_result = await dependent_requests_pipeline()
# asyncio.run(main_parallel_example())
Framework Integration and Practical Examples
import aiohttp
import asyncio
import json
from typing import Optional, Dict, Any, List
from pathlib import Path
import mimetypes
# Advanced API client for FastAPI integration
class AsyncAPIClient:
def __init__(self, base_url: str, token: Optional[str] = None):
self.base_url = base_url.rstrip('/')
self.token = token
self.session = None
async def __aenter__(self):
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'User-Agent': 'AsyncAPIClient/1.0 (aiohttp)'
}
if self.token:
headers['Authorization'] = f'Bearer {self.token}'
timeout = aiohttp.ClientTimeout(total=30, connect=5)
connector = aiohttp.TCPConnector(limit=100, ttl_dns_cache=300)
self.session = aiohttp.ClientSession(
headers=headers,
timeout=timeout,
connector=connector
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
async with self.session.get(url, params=params) as response:
response.raise_for_status()
return await response.json()
async def post(self, endpoint: str, data: Optional[Dict] = None, json_data: Optional[Dict] = None) -> Dict[str, Any]:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
kwargs = {}
if json_data:
kwargs['json'] = json_data
if data:
kwargs['data'] = data
async with self.session.post(url, **kwargs) as response:
response.raise_for_status()
return await response.json()
async def put(self, endpoint: str, data: Optional[Dict] = None) -> Dict[str, Any]:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
async with self.session.put(url, json=data) as response:
response.raise_for_status()
return await response.json()
async def delete(self, endpoint: str) -> bool:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
async with self.session.delete(url) as response:
response.raise_for_status()
return response.status == 204
# Usage example
async def api_client_example():
async with AsyncAPIClient('https://api.example.com/v1', token='your-jwt-token') as client:
# Get user list
users = await client.get('users', params={'page': 1, 'limit': 50})
print(f"Retrieved users: {len(users)}")
# Create new user
new_user = await client.post('users', json_data={
'name': 'John Doe',
'email': '[email protected]',
'department': 'Development'
})
print(f"Created user: {new_user['id']}")
# Update user information
updated_user = await client.put(f'users/{new_user["id"]}', data={
'name': 'Jane Doe',
'department': 'Technology'
})
print(f"Update completed: {updated_user['name']}")
# Multipart file upload
async def upload_file_multipart(file_path: Path, upload_url: str, additional_fields: Optional[Dict[str, str]] = None):
"""File upload in multipart format"""
# Auto-detect file MIME type
mime_type, _ = mimetypes.guess_type(str(file_path))
if not mime_type:
mime_type = 'application/octet-stream'
data = aiohttp.FormData()
data.add_field('file',
file_path.open('rb'),
filename=file_path.name,
content_type=mime_type)
# Configure additional fields
if additional_fields:
for key, value in additional_fields.items():
data.add_field(key, value)
headers = {'Authorization': 'Bearer your-upload-token'}
async with aiohttp.ClientSession() as session:
try:
async with session.post(upload_url, data=data, headers=headers) as response:
response.raise_for_status()
result = await response.json()
print(f"Upload successful: {result}")
return result
except aiohttp.ClientError as e:
print(f"Upload error: {e}")
raise
finally:
# Cleanup file handles
for field in data._fields:
if hasattr(field[2], 'close'):
field[2].close()
# File upload with progress
async def upload_with_progress(file_path: Path, upload_url: str):
"""File upload with progress display"""
file_size = file_path.stat().st_size
uploaded = 0
class ProgressReader:
def __init__(self, file_obj):
self.file_obj = file_obj
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.file_obj.close()
def read(self, size=-1):
nonlocal uploaded
chunk = self.file_obj.read(size)
if chunk:
uploaded += len(chunk)
progress = (uploaded / file_size) * 100
print(f"Upload progress: {progress:.1f}%", end='\r')
return chunk
async with ProgressReader(file_path.open('rb')) as progress_file:
data = aiohttp.FormData()
data.add_field('file', progress_file, filename=file_path.name)
async with aiohttp.ClientSession() as session:
async with session.post(upload_url, data=data) as response:
response.raise_for_status()
print(f"\nUpload completed: {file_path.name}")
return await response.json()
# Automatic authentication token refresh functionality
class TokenManager:
def __init__(self, client_id: str, client_secret: str, token_url: str):
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self.access_token = None
self.refresh_token = None
self.token_expires_at = None
async def get_valid_token(self) -> str:
if self.access_token and self.token_expires_at:
import time
if time.time() < self.token_expires_at:
return self.access_token
await self._refresh_token()
return self.access_token
async def _refresh_token(self):
auth_data = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'client_credentials'
}
async with aiohttp.ClientSession() as session:
async with session.post(self.token_url, data=auth_data) as response:
response.raise_for_status()
token_data = await response.json()
self.access_token = token_data['access_token']
self.refresh_token = token_data.get('refresh_token')
import time
expires_in = token_data.get('expires_in', 3600)
self.token_expires_at = time.time() + expires_in - 60 # 60-second buffer
# Authenticated HTTP client
class AuthenticatedClient:
def __init__(self, base_url: str, token_manager: TokenManager):
self.base_url = base_url.rstrip('/')
self.token_manager = token_manager
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def _make_request(self, method: str, endpoint: str, **kwargs):
token = await self.token_manager.get_valid_token()
headers = kwargs.pop('headers', {})
headers['Authorization'] = f'Bearer {token}'
url = f"{self.base_url}/{endpoint.lstrip('/')}"
async with self.session.request(method, url, headers=headers, **kwargs) as response:
if response.status == 401:
# Force token refresh and retry
await self.token_manager._refresh_token()
token = await self.token_manager.get_valid_token()
headers['Authorization'] = f'Bearer {token}'
async with self.session.request(method, url, headers=headers, **kwargs) as retry_response:
retry_response.raise_for_status()
return await retry_response.json()
else:
response.raise_for_status()
return await response.json()
# Server-Sent Events (SSE) client
async def sse_client(url: str):
"""Server-Sent Events client"""
async with aiohttp.ClientSession() as session:
async with session.get(url, headers={'Accept': 'text/event-stream'}) as response:
if response.status == 200:
print("SSE connection established")
async for line in response.content:
line = line.decode('utf-8').strip()
if line.startswith('data: '):
event_data = line[6:] # Remove 'data: ' prefix
try:
data = json.loads(event_data)
print(f"Event received: {data}")
# Event type-specific processing
event_type = data.get('type')
if event_type == 'user_update':
await handle_user_update(data)
elif event_type == 'system_notification':
await handle_system_notification(data)
except json.JSONDecodeError:
print(f"Text event: {event_data}")
elif line.startswith('event: '):
event_type = line[7:]
print(f"Event type: {event_type}")
else:
print(f"SSE connection failed: {response.status}")
async def handle_user_update(data):
print(f"User update: {data}")
async def handle_system_notification(data):
print(f"System notification: {data}")
# FastAPI server integration example
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
# Execute HTTP request in background task
async def background_api_call(user_id: int):
async with AsyncAPIClient('https://external-api.example.com') as client:
user_data = await client.get(f'users/{user_id}')
# Data processing logic
processed_data = process_user_data(user_data)
# Send result to another API
await client.post('analytics/user-activity', json_data=processed_data)
@app.post("/trigger-analysis/{user_id}")
async def trigger_user_analysis(user_id: int, background_tasks: BackgroundTasks):
background_tasks.add_task(background_api_call, user_id)
return {"message": "Analysis started in background"}
def process_user_data(user_data):
# Data processing logic
from datetime import datetime
return {
"user_id": user_data["id"],
"activity_score": calculate_activity_score(user_data),
"processed_at": datetime.now().isoformat()
}
def calculate_activity_score(user_data):
# Activity score calculation
return user_data.get("login_count", 0) * 10
# Usage examples
async def integration_examples():
# API client example
await api_client_example()
# File upload example
file_path = Path("/path/to/document.pdf")
if file_path.exists():
await upload_file_multipart(
file_path,
'https://api.example.com/upload',
additional_fields={'category': 'documents', 'public': 'false'}
)
# SSE client example
# await sse_client('https://api.example.com/events')
# asyncio.run(integration_examples())