GRequests
Go HTTP client inspired by Python Requests library. Provides intuitive and readable API, achieving familiar interface for Go developers. Built-in automatic JSON processing, session management, file upload, and authentication capabilities.
GitHub Overview
Topics
Star History
Library
GRequests
Overview
GRequests is a Python asynchronous HTTP client library developed with the concept "Requests + Gevent = Asynchronous HTTP Requests". It combines the popular Requests library with the gevent coroutine library, enabling concurrent execution of multiple HTTP requests while maintaining the familiar Requests API. By implementing asynchronous processing through a synchronous API, it dramatically improves performance for I/O-bound tasks. As a practical library that can add concurrent processing capabilities to existing Requests-based code with minimal changes, it is utilized in web API integration and scraping applications.
Details
GRequests 2025 edition efficiently executes multiple HTTP requests concurrently through gevent-based cooperative multitasking. It provides fully compatible API with the Requests library, minimizing migration costs for existing code. Through batch processing with grequests.map() and grequests.imap(), it simultaneously sends large numbers of HTTP requests, effectively utilizing network I/O wait times. Gevent's monkey patching makes standard library socket operations asynchronous, achieving transparent concurrent processing. It demonstrates its true value in I/O-bound tasks such as web API integration, data collection, and multi-endpoint monitoring.
Key Features
- Full Requests Compatibility: Implements asynchronous processing with familiar Requests API
- Gevent Cooperative Multitasking: Highly efficient concurrent processing for I/O-bound tasks
- Batch Request Processing: Bulk request sending through map/imap functions
- Transparent Asynchronization: Seamless asynchronous execution via monkey patching
- Exception Handling: Robust error handling through custom exception handlers
- Session Management: Inheritance of connection pooling and cookie management
Pros and Cons
Pros
- Introduces concurrent processing to existing Requests code with minimal changes
- Dramatically reduces I/O wait times for multiple HTTP requests
- Leverages Requests' rich features (authentication, proxy, SSL) as-is
- Hides complexity of asynchronous processing through simple API
- High performance improvement in web API integration and scraping
- Excellent integration with gevent ecosystem
Cons
- Risk of conflicts with other libraries due to gevent's monkey patching
- Limited performance improvement for CPU-bound tasks
- Cooperative multitasking rather than true asynchronous processing
- No support for modern async/await syntax
- Limited maintenance status with stalled active development
- Requires understanding of gevent characteristics during debugging
Reference Pages
Code Examples
Installation and Basic Setup
# Install GRequests
pip install grequests
# Dependencies (gevent) are automatically installed
pip install gevent requests
# Verify installation in Python
python -c "import grequests; print('GRequests installed successfully')"
# Check versions
python -c "import grequests; import gevent; print(f'grequests with gevent {gevent.__version__}')"
Basic Concurrent Request Processing
import grequests
import time
# Define multiple URLs
urls = [
'https://httpbin.org/delay/1', # 1 second delay
'https://httpbin.org/delay/2', # 2 second delay
'https://httpbin.org/delay/1', # 1 second delay
'https://jsonplaceholder.typicode.com/posts/1',
'https://jsonplaceholder.typicode.com/posts/2'
]
# Synchronous version (reference) - takes 5+ seconds total
def sync_requests():
import requests
start_time = time.time()
responses = []
for url in urls:
response = requests.get(url)
responses.append(response)
end_time = time.time()
print(f"Synchronous processing: {end_time - start_time:.2f} seconds")
return responses
# Asynchronous version (GRequests) - completes in ~2 seconds
def async_requests():
start_time = time.time()
# Create list of unsent AsyncRequest objects
reqs = (grequests.get(url) for url in urls)
# Send requests concurrently
responses = grequests.map(reqs)
end_time = time.time()
print(f"Asynchronous processing: {end_time - start_time:.2f} seconds")
return responses
# Performance comparison
print("=== Synchronous vs Asynchronous Request Comparison ===")
sync_responses = sync_requests()
async_responses = async_requests()
# Result verification
print("\n=== Response Results ===")
for i, (sync_resp, async_resp) in enumerate(zip(sync_responses, async_responses)):
if async_resp:
print(f"URL {i+1}: Status {async_resp.status_code}, Content-Type: {async_resp.headers.get('content-type', 'N/A')}")
else:
print(f"URL {i+1}: Request failed")
Error Handling and Exception Processing
import grequests
from requests.exceptions import Timeout, ConnectionError
# Define custom exception handler
def exception_handler(request, exception):
"""
Custom handler for request exceptions
"""
print(f"Error occurred: {request.url}")
print(f"Exception type: {type(exception).__name__}")
print(f"Exception details: {exception}")
# Can add logging or notification processing here
return None # Return None to include in result list
# URL list with mixed success/failure scenarios
urls = [
'https://httpbin.org/status/200', # Success
'https://httpbin.org/status/404', # 404 error
'https://httpbin.org/delay/10', # Timeout
'http://nonexistent-domain-12345.com', # DNS resolution failure
'https://httpbin.org/status/500', # Server error
'https://jsonplaceholder.typicode.com/posts/1' # Success
]
def robust_async_requests():
# Create requests with timeout settings
reqs = [
grequests.get(url, timeout=5) if 'delay' in url
else grequests.get(url)
for url in urls
]
# Execute with exception handler
responses = grequests.map(reqs, exception_handler=exception_handler)
# Analyze results
successful = [r for r in responses if r and r.status_code == 200]
failed = [r for r in responses if r is None]
errors = [r for r in responses if r and r.status_code != 200]
print(f"\n=== Result Summary ===")
print(f"Successful: {len(successful)} requests")
print(f"Failed: {len(failed)} requests")
print(f"HTTP Errors: {len(errors)} requests")
# Detailed results display
print(f"\n=== Detailed Results ===")
for i, (url, response) in enumerate(zip(urls, responses)):
if response is None:
print(f"{i+1}. {url} -> Failed due to exception")
elif response.status_code == 200:
content_length = len(response.content)
print(f"{i+1}. {url} -> Success (Size: {content_length} bytes)")
else:
print(f"{i+1}. {url} -> HTTP Error {response.status_code}")
# Test error handling execution
robust_async_requests()
Advanced Concurrent Processing and Performance Optimization
import grequests
import time
from concurrent.futures import ThreadPoolExecutor
def advanced_grequests_patterns():
"""Advanced usage patterns for GRequests"""
# Generate large number of URLs (simulating actual API endpoints)
base_urls = [
'https://jsonplaceholder.typicode.com/posts/',
'https://jsonplaceholder.typicode.com/users/',
'https://jsonplaceholder.typicode.com/comments/'
]
urls = []
for base in base_urls:
for i in range(1, 21): # 20 items each
urls.append(f"{base}{i}")
print(f"Total requests: {len(urls)}")
# Pattern 1: grequests.map() - batch processing
def batch_processing():
start_time = time.time()
reqs = (grequests.get(url, timeout=10) for url in urls)
responses = grequests.map(reqs, size=20) # Specify gevent pool size
end_time = time.time()
successful = [r for r in responses if r and r.status_code == 200]
print(f"Batch processing: {end_time - start_time:.2f}s, Success: {len(successful)}/{len(urls)}")
return responses
# Pattern 2: grequests.imap() - streaming processing
def streaming_processing():
start_time = time.time()
reqs = (grequests.get(url, timeout=10) for url in urls)
successful_count = 0
total_size = 0
# Process responses sequentially (improved memory efficiency)
for response in grequests.imap(reqs, size=15):
if response and response.status_code == 200:
successful_count += 1
total_size += len(response.content)
# Progress display
if successful_count % 10 == 0:
print(f"Progress: {successful_count} completed")
end_time = time.time()
print(f"Streaming processing: {end_time - start_time:.2f}s, Success: {successful_count}/{len(urls)}")
print(f"Total data size: {total_size:,} bytes")
# Pattern 3: Chunked processing (for handling large numbers of requests)
def chunked_processing(chunk_size=30):
start_time = time.time()
all_responses = []
# Divide URLs into chunks
url_chunks = [urls[i:i + chunk_size] for i in range(0, len(urls), chunk_size)]
for i, chunk in enumerate(url_chunks):
print(f"Processing chunk {i+1}/{len(url_chunks)}...")
reqs = (grequests.get(url, timeout=10) for url in chunk)
responses = grequests.map(reqs, size=10)
all_responses.extend(responses)
# Wait between chunks (reduce server load)
time.sleep(0.5)
end_time = time.time()
successful = [r for r in all_responses if r and r.status_code == 200]
print(f"Chunked processing: {end_time - start_time:.2f}s, Success: {len(successful)}/{len(urls)}")
return all_responses
# Execute each pattern
print("\n=== Pattern 1: Batch Processing ===")
batch_processing()
print("\n=== Pattern 2: Streaming Processing ===")
streaming_processing()
print("\n=== Pattern 3: Chunked Processing ===")
chunked_processing()
# Test advanced patterns execution
advanced_grequests_patterns()
Session Management and Authentication
import grequests
import requests
from requests.auth import HTTPBasicAuth
def session_and_auth_examples():
"""Examples of session management and authentication"""
# 1. Efficient requests using sessions
session = requests.Session()
session.headers.update({
'User-Agent': 'GRequests-Client/1.0',
'Accept': 'application/json'
})
# API key authentication setup
session.headers['X-API-Key'] = 'your-api-key-here'
# Basic authentication setup
session.auth = HTTPBasicAuth('username', 'password')
urls = [
'https://httpbin.org/basic-auth/username/password',
'https://httpbin.org/headers',
'https://httpbin.org/user-agent'
]
# Create AsyncRequests using session
reqs = [grequests.get(url, session=session) for url in urls]
responses = grequests.map(reqs)
print("=== Session Authentication Test ===")
for url, response in zip(urls, responses):
if response:
print(f"URL: {url}")
print(f"Status: {response.status_code}")
if response.headers.get('content-type', '').startswith('application/json'):
print(f"Response: {response.json()}")
print("-" * 50)
# 2. Simultaneous testing of multiple authentication methods
auth_tests = [
{
'url': 'https://httpbin.org/basic-auth/user1/pass1',
'auth': HTTPBasicAuth('user1', 'pass1')
},
{
'url': 'https://httpbin.org/bearer',
'headers': {'Authorization': 'Bearer token123'}
},
{
'url': 'https://httpbin.org/headers',
'headers': {'X-Custom-Auth': 'custom-token'}
}
]
auth_requests = []
for test in auth_tests:
req = grequests.get(
test['url'],
auth=test.get('auth'),
headers=test.get('headers', {}),
timeout=10
)
auth_requests.append(req)
auth_responses = grequests.map(auth_requests)
print("\n=== Multiple Authentication Methods Test ===")
for test, response in zip(auth_tests, auth_responses):
if response:
print(f"URL: {test['url']}")
print(f"Status: {response.status_code}")
print(f"Auth Success: {'Yes' if response.status_code == 200 else 'No'}")
else:
print(f"URL: {test['url']} - Request failed")
print("-" * 30)
# Test session management execution
session_and_auth_examples()
POST Requests and Form Data Submission
import grequests
import json
from datetime import datetime
def post_data_examples():
"""Examples of POST requests and data submission"""
# 1. Send multiple JSON data via POST
post_data = [
{
'title': 'GRequests Test 1',
'body': 'Asynchronous POST submission test',
'userId': 1
},
{
'title': 'GRequests Test 2',
'body': 'Data posting with concurrent processing',
'userId': 2
},
{
'title': 'GRequests Test 3',
'body': 'Batch POST processing verification',
'userId': 3
}
]
# Create JSON POST requests
json_reqs = []
for data in post_data:
req = grequests.post(
'https://jsonplaceholder.typicode.com/posts',
json=data,
headers={'Content-Type': 'application/json'},
timeout=10
)
json_reqs.append(req)
print("=== Concurrent JSON POST Submission ===")
start_time = datetime.now()
json_responses = grequests.map(json_reqs)
end_time = datetime.now()
for i, (data, response) in enumerate(zip(post_data, json_responses)):
if response and response.status_code == 201:
created_post = response.json()
print(f"POST {i+1}: Success - ID {created_post.get('id')}")
print(f" Title: {created_post.get('title')}")
else:
print(f"POST {i+1}: Failed")
duration = (end_time - start_time).total_seconds()
print(f"Processing time: {duration:.2f} seconds")
# 2. Form data POST submission
form_data_list = [
{'username': 'user1', 'email': '[email protected]', 'action': 'register'},
{'username': 'user2', 'email': '[email protected]', 'action': 'register'},
{'username': 'user3', 'email': '[email protected]', 'action': 'register'}
]
form_reqs = []
for form_data in form_data_list:
req = grequests.post(
'https://httpbin.org/post',
data=form_data, # application/x-www-form-urlencoded
timeout=10
)
form_reqs.append(req)
print("\n=== Concurrent Form POST Submission ===")
form_responses = grequests.map(form_reqs)
for i, (form_data, response) in enumerate(zip(form_data_list, form_responses)):
if response and response.status_code == 200:
response_data = response.json()
received_form = response_data.get('form', {})
print(f"Form {i+1}: Success")
print(f" Sent data: {received_form}")
else:
print(f"Form {i+1}: Failed")
# 3. Concurrent file upload processing (simulated)
def simulate_file_uploads():
file_uploads = []
for i in range(3):
# For actual file uploads, use files parameter
mock_file_data = {
'file_name': f'document_{i+1}.txt',
'file_content': f'This is content of file {i+1}',
'file_size': len(f'This is content of file {i+1}')
}
req = grequests.post(
'https://httpbin.org/post',
json=mock_file_data, # Actually files={'file': open('path', 'rb')}
headers={'Content-Type': 'application/json'},
timeout=30
)
file_uploads.append(req)
print("\n=== Concurrent File Upload (Simulated) ===")
upload_responses = grequests.map(file_uploads)
for i, response in enumerate(upload_responses):
if response and response.status_code == 200:
print(f"Upload {i+1}: Success")
else:
print(f"Upload {i+1}: Failed")
simulate_file_uploads()
# Test POST data submission execution
post_data_examples()
Practical Use Case: Web API Integration and Monitoring
import grequests
import json
from datetime import datetime, timedelta
import time
class APIMonitor:
"""Concurrent monitoring class for multiple API endpoints"""
def __init__(self):
self.endpoints = {
'jsonplaceholder': 'https://jsonplaceholder.typicode.com/posts/1',
'httpbin_status': 'https://httpbin.org/status/200',
'httpbin_delay': 'https://httpbin.org/delay/1',
'httpbin_json': 'https://httpbin.org/json'
}
self.results = []
def health_check(self):
"""Health check for all endpoints"""
print(f"=== Health Check Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ===")
start_time = time.time()
# Create health check requests
health_reqs = []
for name, url in self.endpoints.items():
req = grequests.get(url, timeout=5)
health_reqs.append((name, req))
# Execute concurrently
reqs_only = [req for _, req in health_reqs]
responses = grequests.map(reqs_only)
end_time = time.time()
# Analyze results
results = {}
for (name, _), response in zip(health_reqs, responses):
if response:
results[name] = {
'status': 'UP',
'status_code': response.status_code,
'response_time': response.elapsed.total_seconds(),
'content_length': len(response.content)
}
else:
results[name] = {
'status': 'DOWN',
'status_code': None,
'response_time': None,
'content_length': 0
}
# Generate report
print(f"Check completion time: {end_time - start_time:.2f} seconds")
print("\n--- Endpoint Status ---")
for name, result in results.items():
status_emoji = "✅" if result['status'] == 'UP' else "❌"
print(f"{status_emoji} {name}:")
print(f" Status: {result['status']}")
if result['status'] == 'UP':
print(f" HTTP Status: {result['status_code']}")
print(f" Response Time: {result['response_time']:.3f}s")
print(f" Data Size: {result['content_length']} bytes")
self.results.append({
'timestamp': datetime.now(),
'total_time': end_time - start_time,
'results': results
})
return results
def load_test(self, concurrent_requests=50):
"""Execute load test"""
print(f"\n=== Load Test Started: {concurrent_requests} concurrent requests ===")
# Multiple requests to single endpoint
test_url = 'https://httpbin.org/delay/0.5'
start_time = time.time()
reqs = [grequests.get(test_url, timeout=10) for _ in range(concurrent_requests)]
responses = grequests.map(reqs, size=20) # Limit gevent pool size
end_time = time.time()
# Analyze results
successful = [r for r in responses if r and r.status_code == 200]
failed = [r for r in responses if r is None]
total_time = end_time - start_time
avg_response_time = sum(r.elapsed.total_seconds() for r in successful) / len(successful) if successful else 0
print(f"Load test results:")
print(f" Total execution time: {total_time:.2f}s")
print(f" Success: {len(successful)}/{concurrent_requests}")
print(f" Failed: {len(failed)}")
print(f" Average response time: {avg_response_time:.3f}s")
print(f" Requests per second: {concurrent_requests/total_time:.1f} req/sec")
def data_collection(self):
"""Data collection from multiple sources"""
print(f"\n=== Data Collection Started ===")
data_sources = [
('posts', 'https://jsonplaceholder.typicode.com/posts'),
('users', 'https://jsonplaceholder.typicode.com/users'),
('albums', 'https://jsonplaceholder.typicode.com/albums'),
('photos', 'https://jsonplaceholder.typicode.com/photos?_limit=10'),
('comments', 'https://jsonplaceholder.typicode.com/comments?_limit=20')
]
collection_reqs = []
for name, url in data_sources:
req = grequests.get(url, timeout=15)
collection_reqs.append((name, req))
start_time = time.time()
reqs_only = [req for _, req in collection_reqs]
responses = grequests.map(reqs_only)
end_time = time.time()
collected_data = {}
total_items = 0
for (name, _), response in zip(collection_reqs, responses):
if response and response.status_code == 200:
data = response.json()
collected_data[name] = data
item_count = len(data) if isinstance(data, list) else 1
total_items += item_count
print(f"✅ {name}: {item_count} items collected")
else:
print(f"❌ {name}: Collection failed")
print(f"\nData collection completed:")
print(f" Collection time: {end_time - start_time:.2f}s")
print(f" Total items: {total_items}")
print(f" Success rate: {len(collected_data)}/{len(data_sources)} ({len(collected_data)/len(data_sources)*100:.1f}%)")
return collected_data
# API monitoring execution example
def run_api_monitoring():
monitor = APIMonitor()
# Execute health check
monitor.health_check()
# Execute load test
monitor.load_test(concurrent_requests=30)
# Execute data collection
collected = monitor.data_collection()
print(f"\n=== Monitoring Session Complete ===")
print(f"Executed monitoring history: {len(monitor.results)} sessions")
# Execute practical usage example
run_api_monitoring()
This way, GRequests can add concurrent processing capabilities to existing Requests code with minimal changes, dramatically improving performance for I/O-bound tasks. It demonstrates its true value in API integration, web scraping, and monitoring systems.