GRequests

Go HTTP client inspired by Python Requests library. Provides intuitive and readable API, achieving familiar interface for Go developers. Built-in automatic JSON processing, session management, file upload, and authentication capabilities.

HTTP ClientPythonAsynchronousGeventConcurrency

GitHub Overview

spyoungtech/grequests

Requests + Gevent = <3

Stars4,560
Watchers113
Forks330
Created:May 10, 2012
Language:Python
License:BSD 2-Clause "Simplified" License

Topics

None

Star History

spyoungtech/grequests Star History
Data as of: 7/18/2025, 01:39 AM

Library

GRequests

Overview

GRequests is a Python asynchronous HTTP client library developed with the concept "Requests + Gevent = Asynchronous HTTP Requests". It combines the popular Requests library with the gevent coroutine library, enabling concurrent execution of multiple HTTP requests while maintaining the familiar Requests API. By implementing asynchronous processing through a synchronous API, it dramatically improves performance for I/O-bound tasks. As a practical library that can add concurrent processing capabilities to existing Requests-based code with minimal changes, it is utilized in web API integration and scraping applications.

Details

GRequests 2025 edition efficiently executes multiple HTTP requests concurrently through gevent-based cooperative multitasking. It provides fully compatible API with the Requests library, minimizing migration costs for existing code. Through batch processing with grequests.map() and grequests.imap(), it simultaneously sends large numbers of HTTP requests, effectively utilizing network I/O wait times. Gevent's monkey patching makes standard library socket operations asynchronous, achieving transparent concurrent processing. It demonstrates its true value in I/O-bound tasks such as web API integration, data collection, and multi-endpoint monitoring.

Key Features

  • Full Requests Compatibility: Implements asynchronous processing with familiar Requests API
  • Gevent Cooperative Multitasking: Highly efficient concurrent processing for I/O-bound tasks
  • Batch Request Processing: Bulk request sending through map/imap functions
  • Transparent Asynchronization: Seamless asynchronous execution via monkey patching
  • Exception Handling: Robust error handling through custom exception handlers
  • Session Management: Inheritance of connection pooling and cookie management

Pros and Cons

Pros

  • Introduces concurrent processing to existing Requests code with minimal changes
  • Dramatically reduces I/O wait times for multiple HTTP requests
  • Leverages Requests' rich features (authentication, proxy, SSL) as-is
  • Hides complexity of asynchronous processing through simple API
  • High performance improvement in web API integration and scraping
  • Excellent integration with gevent ecosystem

Cons

  • Risk of conflicts with other libraries due to gevent's monkey patching
  • Limited performance improvement for CPU-bound tasks
  • Cooperative multitasking rather than true asynchronous processing
  • No support for modern async/await syntax
  • Limited maintenance status with stalled active development
  • Requires understanding of gevent characteristics during debugging

Reference Pages

Code Examples

Installation and Basic Setup

# Install GRequests
pip install grequests

# Dependencies (gevent) are automatically installed
pip install gevent requests

# Verify installation in Python
python -c "import grequests; print('GRequests installed successfully')"

# Check versions
python -c "import grequests; import gevent; print(f'grequests with gevent {gevent.__version__}')"

Basic Concurrent Request Processing

import grequests
import time

# Define multiple URLs
urls = [
    'https://httpbin.org/delay/1',  # 1 second delay
    'https://httpbin.org/delay/2',  # 2 second delay
    'https://httpbin.org/delay/1',  # 1 second delay
    'https://jsonplaceholder.typicode.com/posts/1',
    'https://jsonplaceholder.typicode.com/posts/2'
]

# Synchronous version (reference) - takes 5+ seconds total
def sync_requests():
    import requests
    start_time = time.time()
    responses = []
    
    for url in urls:
        response = requests.get(url)
        responses.append(response)
    
    end_time = time.time()
    print(f"Synchronous processing: {end_time - start_time:.2f} seconds")
    return responses

# Asynchronous version (GRequests) - completes in ~2 seconds
def async_requests():
    start_time = time.time()
    
    # Create list of unsent AsyncRequest objects
    reqs = (grequests.get(url) for url in urls)
    
    # Send requests concurrently
    responses = grequests.map(reqs)
    
    end_time = time.time()
    print(f"Asynchronous processing: {end_time - start_time:.2f} seconds")
    return responses

# Performance comparison
print("=== Synchronous vs Asynchronous Request Comparison ===")
sync_responses = sync_requests()
async_responses = async_requests()

# Result verification
print("\n=== Response Results ===")
for i, (sync_resp, async_resp) in enumerate(zip(sync_responses, async_responses)):
    if async_resp:
        print(f"URL {i+1}: Status {async_resp.status_code}, Content-Type: {async_resp.headers.get('content-type', 'N/A')}")
    else:
        print(f"URL {i+1}: Request failed")

Error Handling and Exception Processing

import grequests
from requests.exceptions import Timeout, ConnectionError

# Define custom exception handler
def exception_handler(request, exception):
    """
    Custom handler for request exceptions
    """
    print(f"Error occurred: {request.url}")
    print(f"Exception type: {type(exception).__name__}")
    print(f"Exception details: {exception}")
    
    # Can add logging or notification processing here
    return None  # Return None to include in result list

# URL list with mixed success/failure scenarios
urls = [
    'https://httpbin.org/status/200',      # Success
    'https://httpbin.org/status/404',      # 404 error
    'https://httpbin.org/delay/10',        # Timeout
    'http://nonexistent-domain-12345.com', # DNS resolution failure
    'https://httpbin.org/status/500',      # Server error
    'https://jsonplaceholder.typicode.com/posts/1'  # Success
]

def robust_async_requests():
    # Create requests with timeout settings
    reqs = [
        grequests.get(url, timeout=5) if 'delay' in url 
        else grequests.get(url) 
        for url in urls
    ]
    
    # Execute with exception handler
    responses = grequests.map(reqs, exception_handler=exception_handler)
    
    # Analyze results
    successful = [r for r in responses if r and r.status_code == 200]
    failed = [r for r in responses if r is None]
    errors = [r for r in responses if r and r.status_code != 200]
    
    print(f"\n=== Result Summary ===")
    print(f"Successful: {len(successful)} requests")
    print(f"Failed: {len(failed)} requests")
    print(f"HTTP Errors: {len(errors)} requests")
    
    # Detailed results display
    print(f"\n=== Detailed Results ===")
    for i, (url, response) in enumerate(zip(urls, responses)):
        if response is None:
            print(f"{i+1}. {url} -> Failed due to exception")
        elif response.status_code == 200:
            content_length = len(response.content)
            print(f"{i+1}. {url} -> Success (Size: {content_length} bytes)")
        else:
            print(f"{i+1}. {url} -> HTTP Error {response.status_code}")

# Test error handling execution
robust_async_requests()

Advanced Concurrent Processing and Performance Optimization

import grequests
import time
from concurrent.futures import ThreadPoolExecutor

def advanced_grequests_patterns():
    """Advanced usage patterns for GRequests"""
    
    # Generate large number of URLs (simulating actual API endpoints)
    base_urls = [
        'https://jsonplaceholder.typicode.com/posts/',
        'https://jsonplaceholder.typicode.com/users/',
        'https://jsonplaceholder.typicode.com/comments/'
    ]
    
    urls = []
    for base in base_urls:
        for i in range(1, 21):  # 20 items each
            urls.append(f"{base}{i}")
    
    print(f"Total requests: {len(urls)}")
    
    # Pattern 1: grequests.map() - batch processing
    def batch_processing():
        start_time = time.time()
        reqs = (grequests.get(url, timeout=10) for url in urls)
        responses = grequests.map(reqs, size=20)  # Specify gevent pool size
        end_time = time.time()
        
        successful = [r for r in responses if r and r.status_code == 200]
        print(f"Batch processing: {end_time - start_time:.2f}s, Success: {len(successful)}/{len(urls)}")
        return responses
    
    # Pattern 2: grequests.imap() - streaming processing
    def streaming_processing():
        start_time = time.time()
        reqs = (grequests.get(url, timeout=10) for url in urls)
        
        successful_count = 0
        total_size = 0
        
        # Process responses sequentially (improved memory efficiency)
        for response in grequests.imap(reqs, size=15):
            if response and response.status_code == 200:
                successful_count += 1
                total_size += len(response.content)
                
                # Progress display
                if successful_count % 10 == 0:
                    print(f"Progress: {successful_count} completed")
        
        end_time = time.time()
        print(f"Streaming processing: {end_time - start_time:.2f}s, Success: {successful_count}/{len(urls)}")
        print(f"Total data size: {total_size:,} bytes")
    
    # Pattern 3: Chunked processing (for handling large numbers of requests)
    def chunked_processing(chunk_size=30):
        start_time = time.time()
        all_responses = []
        
        # Divide URLs into chunks
        url_chunks = [urls[i:i + chunk_size] for i in range(0, len(urls), chunk_size)]
        
        for i, chunk in enumerate(url_chunks):
            print(f"Processing chunk {i+1}/{len(url_chunks)}...")
            
            reqs = (grequests.get(url, timeout=10) for url in chunk)
            responses = grequests.map(reqs, size=10)
            all_responses.extend(responses)
            
            # Wait between chunks (reduce server load)
            time.sleep(0.5)
        
        end_time = time.time()
        successful = [r for r in all_responses if r and r.status_code == 200]
        print(f"Chunked processing: {end_time - start_time:.2f}s, Success: {len(successful)}/{len(urls)}")
        return all_responses
    
    # Execute each pattern
    print("\n=== Pattern 1: Batch Processing ===")
    batch_processing()
    
    print("\n=== Pattern 2: Streaming Processing ===")
    streaming_processing()
    
    print("\n=== Pattern 3: Chunked Processing ===")
    chunked_processing()

# Test advanced patterns execution
advanced_grequests_patterns()

Session Management and Authentication

import grequests
import requests
from requests.auth import HTTPBasicAuth

def session_and_auth_examples():
    """Examples of session management and authentication"""
    
    # 1. Efficient requests using sessions
    session = requests.Session()
    session.headers.update({
        'User-Agent': 'GRequests-Client/1.0',
        'Accept': 'application/json'
    })
    
    # API key authentication setup
    session.headers['X-API-Key'] = 'your-api-key-here'
    
    # Basic authentication setup
    session.auth = HTTPBasicAuth('username', 'password')
    
    urls = [
        'https://httpbin.org/basic-auth/username/password',
        'https://httpbin.org/headers',
        'https://httpbin.org/user-agent'
    ]
    
    # Create AsyncRequests using session
    reqs = [grequests.get(url, session=session) for url in urls]
    responses = grequests.map(reqs)
    
    print("=== Session Authentication Test ===")
    for url, response in zip(urls, responses):
        if response:
            print(f"URL: {url}")
            print(f"Status: {response.status_code}")
            if response.headers.get('content-type', '').startswith('application/json'):
                print(f"Response: {response.json()}")
            print("-" * 50)
    
    # 2. Simultaneous testing of multiple authentication methods
    auth_tests = [
        {
            'url': 'https://httpbin.org/basic-auth/user1/pass1',
            'auth': HTTPBasicAuth('user1', 'pass1')
        },
        {
            'url': 'https://httpbin.org/bearer',
            'headers': {'Authorization': 'Bearer token123'}
        },
        {
            'url': 'https://httpbin.org/headers',
            'headers': {'X-Custom-Auth': 'custom-token'}
        }
    ]
    
    auth_requests = []
    for test in auth_tests:
        req = grequests.get(
            test['url'],
            auth=test.get('auth'),
            headers=test.get('headers', {}),
            timeout=10
        )
        auth_requests.append(req)
    
    auth_responses = grequests.map(auth_requests)
    
    print("\n=== Multiple Authentication Methods Test ===")
    for test, response in zip(auth_tests, auth_responses):
        if response:
            print(f"URL: {test['url']}")
            print(f"Status: {response.status_code}")
            print(f"Auth Success: {'Yes' if response.status_code == 200 else 'No'}")
        else:
            print(f"URL: {test['url']} - Request failed")
        print("-" * 30)

# Test session management execution
session_and_auth_examples()

POST Requests and Form Data Submission

import grequests
import json
from datetime import datetime

def post_data_examples():
    """Examples of POST requests and data submission"""
    
    # 1. Send multiple JSON data via POST
    post_data = [
        {
            'title': 'GRequests Test 1',
            'body': 'Asynchronous POST submission test',
            'userId': 1
        },
        {
            'title': 'GRequests Test 2', 
            'body': 'Data posting with concurrent processing',
            'userId': 2
        },
        {
            'title': 'GRequests Test 3',
            'body': 'Batch POST processing verification',
            'userId': 3
        }
    ]
    
    # Create JSON POST requests
    json_reqs = []
    for data in post_data:
        req = grequests.post(
            'https://jsonplaceholder.typicode.com/posts',
            json=data,
            headers={'Content-Type': 'application/json'},
            timeout=10
        )
        json_reqs.append(req)
    
    print("=== Concurrent JSON POST Submission ===")
    start_time = datetime.now()
    json_responses = grequests.map(json_reqs)
    end_time = datetime.now()
    
    for i, (data, response) in enumerate(zip(post_data, json_responses)):
        if response and response.status_code == 201:
            created_post = response.json()
            print(f"POST {i+1}: Success - ID {created_post.get('id')}")
            print(f"  Title: {created_post.get('title')}")
        else:
            print(f"POST {i+1}: Failed")
    
    duration = (end_time - start_time).total_seconds()
    print(f"Processing time: {duration:.2f} seconds")
    
    # 2. Form data POST submission
    form_data_list = [
        {'username': 'user1', 'email': '[email protected]', 'action': 'register'},
        {'username': 'user2', 'email': '[email protected]', 'action': 'register'},
        {'username': 'user3', 'email': '[email protected]', 'action': 'register'}
    ]
    
    form_reqs = []
    for form_data in form_data_list:
        req = grequests.post(
            'https://httpbin.org/post',
            data=form_data,  # application/x-www-form-urlencoded
            timeout=10
        )
        form_reqs.append(req)
    
    print("\n=== Concurrent Form POST Submission ===")
    form_responses = grequests.map(form_reqs)
    
    for i, (form_data, response) in enumerate(zip(form_data_list, form_responses)):
        if response and response.status_code == 200:
            response_data = response.json()
            received_form = response_data.get('form', {})
            print(f"Form {i+1}: Success")
            print(f"  Sent data: {received_form}")
        else:
            print(f"Form {i+1}: Failed")
    
    # 3. Concurrent file upload processing (simulated)
    def simulate_file_uploads():
        file_uploads = []
        for i in range(3):
            # For actual file uploads, use files parameter
            mock_file_data = {
                'file_name': f'document_{i+1}.txt',
                'file_content': f'This is content of file {i+1}',
                'file_size': len(f'This is content of file {i+1}')
            }
            
            req = grequests.post(
                'https://httpbin.org/post',
                json=mock_file_data,  # Actually files={'file': open('path', 'rb')}
                headers={'Content-Type': 'application/json'},
                timeout=30
            )
            file_uploads.append(req)
        
        print("\n=== Concurrent File Upload (Simulated) ===")
        upload_responses = grequests.map(file_uploads)
        
        for i, response in enumerate(upload_responses):
            if response and response.status_code == 200:
                print(f"Upload {i+1}: Success")
            else:
                print(f"Upload {i+1}: Failed")
    
    simulate_file_uploads()

# Test POST data submission execution
post_data_examples()

Practical Use Case: Web API Integration and Monitoring

import grequests
import json
from datetime import datetime, timedelta
import time

class APIMonitor:
    """Concurrent monitoring class for multiple API endpoints"""
    
    def __init__(self):
        self.endpoints = {
            'jsonplaceholder': 'https://jsonplaceholder.typicode.com/posts/1',
            'httpbin_status': 'https://httpbin.org/status/200',
            'httpbin_delay': 'https://httpbin.org/delay/1',
            'httpbin_json': 'https://httpbin.org/json'
        }
        self.results = []
    
    def health_check(self):
        """Health check for all endpoints"""
        print(f"=== Health Check Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ===")
        
        start_time = time.time()
        
        # Create health check requests
        health_reqs = []
        for name, url in self.endpoints.items():
            req = grequests.get(url, timeout=5)
            health_reqs.append((name, req))
        
        # Execute concurrently
        reqs_only = [req for _, req in health_reqs]
        responses = grequests.map(reqs_only)
        
        end_time = time.time()
        
        # Analyze results
        results = {}
        for (name, _), response in zip(health_reqs, responses):
            if response:
                results[name] = {
                    'status': 'UP',
                    'status_code': response.status_code,
                    'response_time': response.elapsed.total_seconds(),
                    'content_length': len(response.content)
                }
            else:
                results[name] = {
                    'status': 'DOWN',
                    'status_code': None,
                    'response_time': None,
                    'content_length': 0
                }
        
        # Generate report
        print(f"Check completion time: {end_time - start_time:.2f} seconds")
        print("\n--- Endpoint Status ---")
        for name, result in results.items():
            status_emoji = "✅" if result['status'] == 'UP' else "❌"
            print(f"{status_emoji} {name}:")
            print(f"   Status: {result['status']}")
            if result['status'] == 'UP':
                print(f"   HTTP Status: {result['status_code']}")
                print(f"   Response Time: {result['response_time']:.3f}s")
                print(f"   Data Size: {result['content_length']} bytes")
        
        self.results.append({
            'timestamp': datetime.now(),
            'total_time': end_time - start_time,
            'results': results
        })
        
        return results
    
    def load_test(self, concurrent_requests=50):
        """Execute load test"""
        print(f"\n=== Load Test Started: {concurrent_requests} concurrent requests ===")
        
        # Multiple requests to single endpoint
        test_url = 'https://httpbin.org/delay/0.5'
        
        start_time = time.time()
        reqs = [grequests.get(test_url, timeout=10) for _ in range(concurrent_requests)]
        responses = grequests.map(reqs, size=20)  # Limit gevent pool size
        end_time = time.time()
        
        # Analyze results
        successful = [r for r in responses if r and r.status_code == 200]
        failed = [r for r in responses if r is None]
        
        total_time = end_time - start_time
        avg_response_time = sum(r.elapsed.total_seconds() for r in successful) / len(successful) if successful else 0
        
        print(f"Load test results:")
        print(f"  Total execution time: {total_time:.2f}s")
        print(f"  Success: {len(successful)}/{concurrent_requests}")
        print(f"  Failed: {len(failed)}")
        print(f"  Average response time: {avg_response_time:.3f}s")
        print(f"  Requests per second: {concurrent_requests/total_time:.1f} req/sec")
    
    def data_collection(self):
        """Data collection from multiple sources"""
        print(f"\n=== Data Collection Started ===")
        
        data_sources = [
            ('posts', 'https://jsonplaceholder.typicode.com/posts'),
            ('users', 'https://jsonplaceholder.typicode.com/users'),
            ('albums', 'https://jsonplaceholder.typicode.com/albums'),
            ('photos', 'https://jsonplaceholder.typicode.com/photos?_limit=10'),
            ('comments', 'https://jsonplaceholder.typicode.com/comments?_limit=20')
        ]
        
        collection_reqs = []
        for name, url in data_sources:
            req = grequests.get(url, timeout=15)
            collection_reqs.append((name, req))
        
        start_time = time.time()
        reqs_only = [req for _, req in collection_reqs]
        responses = grequests.map(reqs_only)
        end_time = time.time()
        
        collected_data = {}
        total_items = 0
        
        for (name, _), response in zip(collection_reqs, responses):
            if response and response.status_code == 200:
                data = response.json()
                collected_data[name] = data
                item_count = len(data) if isinstance(data, list) else 1
                total_items += item_count
                print(f"✅ {name}: {item_count} items collected")
            else:
                print(f"❌ {name}: Collection failed")
        
        print(f"\nData collection completed:")
        print(f"  Collection time: {end_time - start_time:.2f}s")
        print(f"  Total items: {total_items}")
        print(f"  Success rate: {len(collected_data)}/{len(data_sources)} ({len(collected_data)/len(data_sources)*100:.1f}%)")
        
        return collected_data

# API monitoring execution example
def run_api_monitoring():
    monitor = APIMonitor()
    
    # Execute health check
    monitor.health_check()
    
    # Execute load test
    monitor.load_test(concurrent_requests=30)
    
    # Execute data collection
    collected = monitor.data_collection()
    
    print(f"\n=== Monitoring Session Complete ===")
    print(f"Executed monitoring history: {len(monitor.results)} sessions")

# Execute practical usage example
run_api_monitoring()

This way, GRequests can add concurrent processing capabilities to existing Requests code with minimal changes, dramatically improving performance for I/O-bound tasks. It demonstrates its true value in API integration, web scraping, and monitoring systems.