GRequests

Python RequestsライブラリからインスパイアされたGo HTTPクライアント。直感的で読みやすいAPIを提供し、Go開発者にとって親しみやすいインターフェースを実現。自動JSON処理、セッション管理、ファイルアップロード、認証機能を内蔵。

HTTPクライアントPython非同期Gevent並行処理

GitHub概要

spyoungtech/grequests

Requests + Gevent = <3

スター4,560
ウォッチ113
フォーク330
作成日:2012年5月10日
言語:Python
ライセンス:BSD 2-Clause "Simplified" License

トピックス

なし

スター履歴

spyoungtech/grequests Star History
データ取得日時: 2025/7/18 01:39

ライブラリ

GRequests

概要

GRequestsは「Requests + Gevent = 非同期HTTPリクエスト」をコンセプトに開発された、Pythonの非同期HTTPクライアントライブラリです。人気のRequestsライブラリとgeventコルーチンライブラリを組み合わせ、馴染みのあるRequestsAPIを保ちながら複数のHTTPリクエストを並行処理で実行可能。同期的なAPIで非同期処理を実現し、I/Oバウンドなタスクでのパフォーマンスを大幅に向上。既存のRequestsベースのコードに最小限の変更で並行処理機能を追加できる実用的なライブラリとして、Web API統合やスクレイピングで活用されています。

詳細

GRequests 2025年版は、geventベースの協調的マルチタスキングによって複数のHTTPリクエストを効率的に並行実行します。Requestsライブラリと完全に互換性のあるAPIを提供し、既存コードの移行コストを最小化。grequests.map()grequests.imap()によるバッチ処理で、大量のHTTPリクエストを同時送信し、ネットワークI/O待機時間を有効活用します。geventのmonkey patchingにより標準ライブラリのソケット操作が非同期化され、透過的な並行処理を実現。Web API統合、データ収集、複数エンドポイント監視等のI/Oバウンドタスクで真価を発揮します。

主な特徴

  • Requestsとの完全互換性: 馴染みのあるRequests APIで非同期処理を実現
  • Gevent協調的マルチタスキング: I/Oバウンドタスクでの高効率な並行処理
  • バッチリクエスト処理: map/imap関数による一括リクエスト送信
  • 透明な非同期化: monkey patchingによるシームレスな非同期実行
  • 例外ハンドリング: カスタム例外ハンドラーによる堅牢なエラー処理
  • セッション管理: 接続プールとCookie管理の継承

メリット・デメリット

メリット

  • 既存のRequestsコードに最小限の変更で並行処理を導入
  • 複数HTTPリクエストでのI/O待機時間を劇的に短縮
  • Requestsの豊富な機能(認証、プロキシ、SSL)をそのまま活用
  • シンプルなAPIで非同期処理の複雑性を隠蔽
  • Web API統合やスクレイピングでの高いパフォーマンス向上
  • geventエコシステムとの優れた統合性

デメリット

  • geventのmonkey patchingによる他ライブラリとの競合リスク
  • CPUバウンドタスクでは性能向上効果が限定的
  • 真の非同期処理ではなく協調的マルチタスキング
  • モダンな async/await 構文に対応していない
  • メンテナンス状況が限定的で活発な開発が停滞
  • デバッグ時にgeventの特性理解が必要

参考ページ

書き方の例

インストールと基本セットアップ

# GRequestsのインストール
pip install grequests

# 依存関係(gevent)も自動インストールされる
pip install gevent requests

# Pythonでの動作確認
python -c "import grequests; print('GRequests installed successfully')"

# バージョン確認
python -c "import grequests; import gevent; print(f'grequests with gevent {gevent.__version__}')"

基本的な並行リクエスト処理

import grequests
import time

# 複数のURLを定義
urls = [
    'https://httpbin.org/delay/1',  # 1秒遅延
    'https://httpbin.org/delay/2',  # 2秒遅延
    'https://httpbin.org/delay/1',  # 1秒遅延
    'https://jsonplaceholder.typicode.com/posts/1',
    'https://jsonplaceholder.typicode.com/posts/2'
]

# 同期版(参考)- 合計5秒以上かかる
def sync_requests():
    import requests
    start_time = time.time()
    responses = []
    
    for url in urls:
        response = requests.get(url)
        responses.append(response)
    
    end_time = time.time()
    print(f"同期処理: {end_time - start_time:.2f}秒")
    return responses

# 非同期版(GRequests)- 2秒程度で完了
def async_requests():
    start_time = time.time()
    
    # 未送信のAsyncRequestオブジェクトリストを作成
    reqs = (grequests.get(url) for url in urls)
    
    # 並行してリクエストを送信
    responses = grequests.map(reqs)
    
    end_time = time.time()
    print(f"非同期処理: {end_time - start_time:.2f}秒")
    return responses

# 実行比較
print("=== 同期 vs 非同期リクエスト比較 ===")
sync_responses = sync_requests()
async_responses = async_requests()

# 結果確認
print("\n=== レスポンス結果 ===")
for i, (sync_resp, async_resp) in enumerate(zip(sync_responses, async_responses)):
    if async_resp:
        print(f"URL {i+1}: Status {async_resp.status_code}, Content-Type: {async_resp.headers.get('content-type', 'N/A')}")
    else:
        print(f"URL {i+1}: リクエスト失敗")

エラーハンドリングと例外処理

import grequests
from requests.exceptions import Timeout, ConnectionError

# カスタム例外ハンドラーの定義
def exception_handler(request, exception):
    """
    リクエスト例外のカスタムハンドラー
    """
    print(f"エラー発生: {request.url}")
    print(f"例外タイプ: {type(exception).__name__}")
    print(f"例外詳細: {exception}")
    
    # ログ記録や通知処理をここに追加可能
    return None  # Noneを返すことで結果リストに含める

# 成功・失敗が混在するURLリスト
urls = [
    'https://httpbin.org/status/200',      # 成功
    'https://httpbin.org/status/404',      # 404エラー
    'https://httpbin.org/delay/10',        # タイムアウト
    'http://nonexistent-domain-12345.com', # DNS解決失敗
    'https://httpbin.org/status/500',      # サーバーエラー
    'https://jsonplaceholder.typicode.com/posts/1'  # 成功
]

def robust_async_requests():
    # タイムアウト設定付きリクエスト作成
    reqs = [
        grequests.get(url, timeout=5) if 'delay' in url 
        else grequests.get(url) 
        for url in urls
    ]
    
    # 例外ハンドラー付きで実行
    responses = grequests.map(reqs, exception_handler=exception_handler)
    
    # 結果の分析
    successful = [r for r in responses if r and r.status_code == 200]
    failed = [r for r in responses if r is None]
    errors = [r for r in responses if r and r.status_code != 200]
    
    print(f"\n=== 結果サマリー ===")
    print(f"成功: {len(successful)}件")
    print(f"失敗: {len(failed)}件")
    print(f"HTTPエラー: {len(errors)}件")
    
    # 詳細結果表示
    print(f"\n=== 詳細結果 ===")
    for i, (url, response) in enumerate(zip(urls, responses)):
        if response is None:
            print(f"{i+1}. {url} -> 例外により失敗")
        elif response.status_code == 200:
            content_length = len(response.content)
            print(f"{i+1}. {url} -> 成功 (Size: {content_length} bytes)")
        else:
            print(f"{i+1}. {url} -> HTTPエラー {response.status_code}")

# エラーハンドリングのテスト実行
robust_async_requests()

高度な並行処理とパフォーマンス最適化

import grequests
import time
from concurrent.futures import ThreadPoolExecutor

def advanced_grequests_patterns():
    """高度なGRequestsの使用パターン"""
    
    # 大量URLの生成(実際のAPIエンドポイント想定)
    base_urls = [
        'https://jsonplaceholder.typicode.com/posts/',
        'https://jsonplaceholder.typicode.com/users/',
        'https://jsonplaceholder.typicode.com/comments/'
    ]
    
    urls = []
    for base in base_urls:
        for i in range(1, 21):  # 各20件ずつ
            urls.append(f"{base}{i}")
    
    print(f"総リクエスト数: {len(urls)}件")
    
    # パターン1: grequests.map() - 一括処理
    def batch_processing():
        start_time = time.time()
        reqs = (grequests.get(url, timeout=10) for url in urls)
        responses = grequests.map(reqs, size=20)  # geventプールサイズ指定
        end_time = time.time()
        
        successful = [r for r in responses if r and r.status_code == 200]
        print(f"一括処理: {end_time - start_time:.2f}秒, 成功: {len(successful)}/{len(urls)}")
        return responses
    
    # パターン2: grequests.imap() - 逐次処理
    def streaming_processing():
        start_time = time.time()
        reqs = (grequests.get(url, timeout=10) for url in urls)
        
        successful_count = 0
        total_size = 0
        
        # レスポンスを順次処理(メモリ効率向上)
        for response in grequests.imap(reqs, size=15):
            if response and response.status_code == 200:
                successful_count += 1
                total_size += len(response.content)
                
                # 進捗表示
                if successful_count % 10 == 0:
                    print(f"進捗: {successful_count}件完了")
        
        end_time = time.time()
        print(f"ストリーミング処理: {end_time - start_time:.2f}秒, 成功: {successful_count}/{len(urls)}")
        print(f"総データサイズ: {total_size:,} bytes")
    
    # パターン3: チャンク処理(大量リクエスト対応)
    def chunked_processing(chunk_size=30):
        start_time = time.time()
        all_responses = []
        
        # URLをチャンクに分割
        url_chunks = [urls[i:i + chunk_size] for i in range(0, len(urls), chunk_size)]
        
        for i, chunk in enumerate(url_chunks):
            print(f"チャンク {i+1}/{len(url_chunks)} 処理中...")
            
            reqs = (grequests.get(url, timeout=10) for url in chunk)
            responses = grequests.map(reqs, size=10)
            all_responses.extend(responses)
            
            # チャンク間の待機(サーバー負荷軽減)
            time.sleep(0.5)
        
        end_time = time.time()
        successful = [r for r in all_responses if r and r.status_code == 200]
        print(f"チャンク処理: {end_time - start_time:.2f}秒, 成功: {len(successful)}/{len(urls)}")
        return all_responses
    
    # 各パターンの実行
    print("\n=== パターン1: 一括処理 ===")
    batch_processing()
    
    print("\n=== パターン2: ストリーミング処理 ===")
    streaming_processing()
    
    print("\n=== パターン3: チャンク処理 ===")
    chunked_processing()

# 高度なパターンのテスト実行
advanced_grequests_patterns()

セッション管理と認証

import grequests
import requests
from requests.auth import HTTPBasicAuth

def session_and_auth_examples():
    """セッション管理と認証の例"""
    
    # 1. セッションを使用した効率的なリクエスト
    session = requests.Session()
    session.headers.update({
        'User-Agent': 'GRequests-Client/1.0',
        'Accept': 'application/json'
    })
    
    # APIキー認証設定
    session.headers['X-API-Key'] = 'your-api-key-here'
    
    # Basic認証設定
    session.auth = HTTPBasicAuth('username', 'password')
    
    urls = [
        'https://httpbin.org/basic-auth/username/password',
        'https://httpbin.org/headers',
        'https://httpbin.org/user-agent'
    ]
    
    # セッション使用のAsyncRequestを作成
    reqs = [grequests.get(url, session=session) for url in urls]
    responses = grequests.map(reqs)
    
    print("=== セッション認証テスト ===")
    for url, response in zip(urls, responses):
        if response:
            print(f"URL: {url}")
            print(f"Status: {response.status_code}")
            if response.headers.get('content-type', '').startswith('application/json'):
                print(f"Response: {response.json()}")
            print("-" * 50)
    
    # 2. 複数認証方式の同時テスト
    auth_tests = [
        {
            'url': 'https://httpbin.org/basic-auth/user1/pass1',
            'auth': HTTPBasicAuth('user1', 'pass1')
        },
        {
            'url': 'https://httpbin.org/bearer',
            'headers': {'Authorization': 'Bearer token123'}
        },
        {
            'url': 'https://httpbin.org/headers',
            'headers': {'X-Custom-Auth': 'custom-token'}
        }
    ]
    
    auth_requests = []
    for test in auth_tests:
        req = grequests.get(
            test['url'],
            auth=test.get('auth'),
            headers=test.get('headers', {}),
            timeout=10
        )
        auth_requests.append(req)
    
    auth_responses = grequests.map(auth_requests)
    
    print("\n=== 複数認証方式テスト ===")
    for test, response in zip(auth_tests, auth_responses):
        if response:
            print(f"URL: {test['url']}")
            print(f"Status: {response.status_code}")
            print(f"Auth成功: {'Yes' if response.status_code == 200 else 'No'}")
        else:
            print(f"URL: {test['url']} - リクエスト失敗")
        print("-" * 30)

# セッション管理のテスト実行
session_and_auth_examples()

POSTリクエストとフォームデータ送信

import grequests
import json
from datetime import datetime

def post_data_examples():
    """POSTリクエストとデータ送信の例"""
    
    # 1. 複数のJSONデータをPOST送信
    post_data = [
        {
            'title': 'GRequestsテスト1',
            'body': '非同期POST送信のテスト',
            'userId': 1
        },
        {
            'title': 'GRequestsテスト2', 
            'body': '並行処理でのデータ投稿',
            'userId': 2
        },
        {
            'title': 'GRequestsテスト3',
            'body': 'バッチPOST処理の検証',
            'userId': 3
        }
    ]
    
    # JSON POSTリクエストの作成
    json_reqs = []
    for data in post_data:
        req = grequests.post(
            'https://jsonplaceholder.typicode.com/posts',
            json=data,
            headers={'Content-Type': 'application/json'},
            timeout=10
        )
        json_reqs.append(req)
    
    print("=== 並行JSON POST送信 ===")
    start_time = datetime.now()
    json_responses = grequests.map(json_reqs)
    end_time = datetime.now()
    
    for i, (data, response) in enumerate(zip(post_data, json_responses)):
        if response and response.status_code == 201:
            created_post = response.json()
            print(f"POST {i+1}: 成功 - ID {created_post.get('id')}")
            print(f"  タイトル: {created_post.get('title')}")
        else:
            print(f"POST {i+1}: 失敗")
    
    duration = (end_time - start_time).total_seconds()
    print(f"処理時間: {duration:.2f}秒")
    
    # 2. フォームデータのPOST送信
    form_data_list = [
        {'username': 'user1', 'email': '[email protected]', 'action': 'register'},
        {'username': 'user2', 'email': '[email protected]', 'action': 'register'},
        {'username': 'user3', 'email': '[email protected]', 'action': 'register'}
    ]
    
    form_reqs = []
    for form_data in form_data_list:
        req = grequests.post(
            'https://httpbin.org/post',
            data=form_data,  # application/x-www-form-urlencoded
            timeout=10
        )
        form_reqs.append(req)
    
    print("\n=== 並行フォームPOST送信 ===")
    form_responses = grequests.map(form_reqs)
    
    for i, (form_data, response) in enumerate(zip(form_data_list, form_responses)):
        if response and response.status_code == 200:
            response_data = response.json()
            received_form = response_data.get('form', {})
            print(f"フォーム {i+1}: 成功")
            print(f"  送信データ: {received_form}")
        else:
            print(f"フォーム {i+1}: 失敗")
    
    # 3. ファイルアップロードの並行処理(模擬)
    def simulate_file_uploads():
        file_uploads = []
        for i in range(3):
            # 実際のファイルアップロード時は files パラメーターを使用
            mock_file_data = {
                'file_name': f'document_{i+1}.txt',
                'file_content': f'This is content of file {i+1}',
                'file_size': len(f'This is content of file {i+1}')
            }
            
            req = grequests.post(
                'https://httpbin.org/post',
                json=mock_file_data,  # 実際は files={'file': open('path', 'rb')}
                headers={'Content-Type': 'application/json'},
                timeout=30
            )
            file_uploads.append(req)
        
        print("\n=== 並行ファイルアップロード(模擬) ===")
        upload_responses = grequests.map(file_uploads)
        
        for i, response in enumerate(upload_responses):
            if response and response.status_code == 200:
                print(f"アップロード {i+1}: 成功")
            else:
                print(f"アップロード {i+1}: 失敗")
    
    simulate_file_uploads()

# POSTデータ送信のテスト実行
post_data_examples()

実際の活用例:Web API統合とモニタリング

import grequests
import json
from datetime import datetime, timedelta
import time

class APIMonitor:
    """複数APIエンドポイントの並行監視クラス"""
    
    def __init__(self):
        self.endpoints = {
            'jsonplaceholder': 'https://jsonplaceholder.typicode.com/posts/1',
            'httpbin_status': 'https://httpbin.org/status/200',
            'httpbin_delay': 'https://httpbin.org/delay/1',
            'httpbin_json': 'https://httpbin.org/json'
        }
        self.results = []
    
    def health_check(self):
        """全エンドポイントのヘルスチェック"""
        print(f"=== ヘルスチェック開始: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ===")
        
        start_time = time.time()
        
        # ヘルスチェックリクエスト作成
        health_reqs = []
        for name, url in self.endpoints.items():
            req = grequests.get(url, timeout=5)
            health_reqs.append((name, req))
        
        # 並行実行
        reqs_only = [req for _, req in health_reqs]
        responses = grequests.map(reqs_only)
        
        end_time = time.time()
        
        # 結果分析
        results = {}
        for (name, _), response in zip(health_reqs, responses):
            if response:
                results[name] = {
                    'status': 'UP',
                    'status_code': response.status_code,
                    'response_time': response.elapsed.total_seconds(),
                    'content_length': len(response.content)
                }
            else:
                results[name] = {
                    'status': 'DOWN',
                    'status_code': None,
                    'response_time': None,
                    'content_length': 0
                }
        
        # レポート出力
        print(f"チェック完了時間: {end_time - start_time:.2f}秒")
        print("\n--- 各エンドポイント状況 ---")
        for name, result in results.items():
            status_emoji = "✅" if result['status'] == 'UP' else "❌"
            print(f"{status_emoji} {name}:")
            print(f"   状態: {result['status']}")
            if result['status'] == 'UP':
                print(f"   ステータス: {result['status_code']}")
                print(f"   応答時間: {result['response_time']:.3f}秒")
                print(f"   データサイズ: {result['content_length']} bytes")
        
        self.results.append({
            'timestamp': datetime.now(),
            'total_time': end_time - start_time,
            'results': results
        })
        
        return results
    
    def load_test(self, concurrent_requests=50):
        """負荷テスト実行"""
        print(f"\n=== 負荷テスト開始: 同時{concurrent_requests}リクエスト ===")
        
        # 単一エンドポイントに複数リクエスト
        test_url = 'https://httpbin.org/delay/0.5'
        
        start_time = time.time()
        reqs = [grequests.get(test_url, timeout=10) for _ in range(concurrent_requests)]
        responses = grequests.map(reqs, size=20)  # geventプールサイズ制限
        end_time = time.time()
        
        # 結果分析
        successful = [r for r in responses if r and r.status_code == 200]
        failed = [r for r in responses if r is None]
        
        total_time = end_time - start_time
        avg_response_time = sum(r.elapsed.total_seconds() for r in successful) / len(successful) if successful else 0
        
        print(f"負荷テスト結果:")
        print(f"  総実行時間: {total_time:.2f}秒")
        print(f"  成功: {len(successful)}/{concurrent_requests}")
        print(f"  失敗: {len(failed)}")
        print(f"  平均応答時間: {avg_response_time:.3f}秒")
        print(f"  秒間リクエスト数: {concurrent_requests/total_time:.1f} req/sec")
    
    def data_collection(self):
        """複数ソースからのデータ収集"""
        print(f"\n=== データ収集開始 ===")
        
        data_sources = [
            ('posts', 'https://jsonplaceholder.typicode.com/posts'),
            ('users', 'https://jsonplaceholder.typicode.com/users'),
            ('albums', 'https://jsonplaceholder.typicode.com/albums'),
            ('photos', 'https://jsonplaceholder.typicode.com/photos?_limit=10'),
            ('comments', 'https://jsonplaceholder.typicode.com/comments?_limit=20')
        ]
        
        collection_reqs = []
        for name, url in data_sources:
            req = grequests.get(url, timeout=15)
            collection_reqs.append((name, req))
        
        start_time = time.time()
        reqs_only = [req for _, req in collection_reqs]
        responses = grequests.map(reqs_only)
        end_time = time.time()
        
        collected_data = {}
        total_items = 0
        
        for (name, _), response in zip(collection_reqs, responses):
            if response and response.status_code == 200:
                data = response.json()
                collected_data[name] = data
                item_count = len(data) if isinstance(data, list) else 1
                total_items += item_count
                print(f"✅ {name}: {item_count}件収集")
            else:
                print(f"❌ {name}: 収集失敗")
        
        print(f"\nデータ収集完了:")
        print(f"  収集時間: {end_time - start_time:.2f}秒")
        print(f"  総アイテム数: {total_items}件")
        print(f"  収集成功率: {len(collected_data)}/{len(data_sources)} ({len(collected_data)/len(data_sources)*100:.1f}%)")
        
        return collected_data

# APIモニタリングの実行例
def run_api_monitoring():
    monitor = APIMonitor()
    
    # ヘルスチェック実行
    monitor.health_check()
    
    # 負荷テスト実行
    monitor.load_test(concurrent_requests=30)
    
    # データ収集実行
    collected = monitor.data_collection()
    
    print(f"\n=== 監視セッション完了 ===")
    print(f"実行した監視履歴: {len(monitor.results)}件")

# 実際の使用例実行
run_api_monitoring()

このように、GRequestsは既存のRequestsコードに最小限の変更で並行処理機能を追加でき、I/Oバウンドなタスクでのパフォーマンスを大幅に向上させることができます。API統合、Webスクレイピング、監視システムなどで真価を発揮します。