Requests

Python向けの最も人気のHTTPライブラリ。「人間のためのHTTP」をコンセプトにシンプルで直感的なAPIを提供。セッション管理、SSL検証、自動リダイレクト、認証、Cookie処理、ファイルアップロードなど包括的な機能を持つ。

HTTPクライアントPythonシンプル認証セッション管理

GitHub概要

psf/requests

A simple, yet elegant, HTTP library.

スター53,399
ウォッチ1,313
フォーク9,568
作成日:2011年2月13日
言語:Python
ライセンス:Apache License 2.0

トピックス

clientcookiesforhumanshttphumanspythonpython-requestsrequests

スター履歴

psf/requests Star History
データ取得日時: 2025/10/22 10:04

ライブラリ

Requests

概要

Requestsは「Python向けのシンプルで使いやすいHTTPライブラリ」として開発された、Pythonエコシステムで最も人気のあるHTTPクライアントライブラリです。「人間のためのHTTP」をコンセプトに、複雑なHTTP処理をシンプルで直感的なAPIで提供。認証、セッション管理、SSL証明書検証、自動JSON/XML解析など、Web API統合に必要な機能を包括的にサポートし、Python開発者にとって事実上の標準ライブラリとして地位を確立しています。

詳細

Requests 2025年版はPython HTTP通信の決定版として確固たる地位を維持し続けています。15年以上の開発実績により成熟したAPIと優れた安定性を誇り、Django、Flask、FastAPI等の主要Webフレームワーク環境で広く採用されています。シンプルで読みやすいコード記述を重視した設計により、HTTP通信をPythonicで自然な方法で実装可能。セッション管理、認証システム、プロキシサポート、SSL/TLS設定、ストリーミング処理など企業レベルのHTTP通信要件を満たす豊富な機能を提供します。

主な特徴

  • シンプルなAPI: 直感的で読みやすいHTTPリクエスト記述
  • 包括的認証サポート: Basic、Digest、OAuth、カスタム認証対応
  • セッション管理: Cookie保持と効率的な接続プーリング
  • 自動コンテンツ解析: JSON、XML、HTML等の自動パース機能
  • SSL/TLS完全サポート: 証明書検証とクライアント証明書対応
  • ストリーミング処理: 大容量ファイルの効率的な処理

メリット・デメリット

メリット

  • Pythonエコシステムでの圧倒的な普及率と豊富な学習リソース
  • シンプルで直感的なAPIによる高い開発効率とコード可読性
  • 包括的な認証機能とセッション管理による企業レベル対応
  • 豊富なコミュニティサポートとサードパーティ拡張機能
  • Django、Flask等のWebフレームワークとの優れた統合性
  • 安定した長期サポートと後方互換性の維持

デメリット

  • 同期処理ベースでCPUバウンドなタスクに不向き
  • 非同期処理には別途aiohttp等の専用ライブラリが必要
  • HTTP/2サポートが限定的(requests-http2が必要)
  • 大量の並列リクエストでのパフォーマンス制約
  • 依存ライブラリが多くバンドルサイズが比較的大きい
  • 古いPythonバージョンサポート終了による移行課題

参考ページ

書き方の例

インストールと基本セットアップ

# Requestsのインストール
pip install requests

# セキュリティ強化版(推奨)
pip install requests[security]

# 依存関係を含む完全インストール
pip install requests[socks]

# Python環境での確認
python -c "import requests; print(requests.__version__)"

基本的なリクエスト(GET/POST/PUT/DELETE)

import requests

# 基本的なGETリクエスト
response = requests.get('https://api.example.com/users')
print(response.status_code)  # 200
print(response.headers['content-type'])  # application/json
print(response.text)  # レスポンステキスト
data = response.json()  # JSONとして解析
print(data)

# クエリパラメータ付きGETリクエスト
params = {'page': 1, 'limit': 10, 'sort': 'created_at'}
response = requests.get('https://api.example.com/users', params=params)
print(response.url)  # https://api.example.com/users?page=1&limit=10&sort=created_at

# POSTリクエスト(JSON送信)
user_data = {
    'name': '田中太郎',
    'email': '[email protected]',
    'age': 30
}

response = requests.post(
    'https://api.example.com/users',
    json=user_data,  # 自動的にContent-Type: application/jsonが設定される
    headers={'Authorization': 'Bearer your-token'}
)

if response.status_code == 201:
    created_user = response.json()
    print(f"ユーザー作成完了: ID={created_user['id']}")
else:
    print(f"エラー: {response.status_code} - {response.text}")

# POSTリクエスト(フォームデータ送信)
form_data = {'username': 'testuser', 'password': 'secret123'}
response = requests.post('https://api.example.com/login', data=form_data)

# PUTリクエスト(データ更新)
updated_data = {'name': '田中次郎', 'email': '[email protected]'}
response = requests.put(
    'https://api.example.com/users/123',
    json=updated_data,
    headers={'Authorization': 'Bearer your-token'}
)

# DELETEリクエスト
response = requests.delete(
    'https://api.example.com/users/123',
    headers={'Authorization': 'Bearer your-token'}
)

if response.status_code == 204:
    print("ユーザー削除完了")

# レスポンス属性の詳細確認
print(f"ステータスコード: {response.status_code}")
print(f"リーズン: {response.reason}")
print(f"ヘッダー: {response.headers}")
print(f"エンコーディング: {response.encoding}")
print(f"リクエストURL: {response.url}")
print(f"履歴: {response.history}")  # リダイレクト履歴

高度な設定とカスタマイズ(ヘッダー、認証、タイムアウト等)

import requests
from requests.auth import HTTPBasicAuth, HTTPDigestAuth

# カスタムヘッダーの設定
headers = {
    'User-Agent': 'MyApp/1.0 (Python Requests)',
    'Accept': 'application/json',
    'Accept-Language': 'ja-JP,en-US',
    'X-API-Version': 'v2',
    'X-Request-ID': 'req-12345'
}

response = requests.get('https://api.example.com/data', headers=headers)

# Basic認証
response = requests.get(
    'https://api.example.com/private',
    auth=('username', 'password')
)

# または明示的にHTTPBasicAuthを使用
auth = HTTPBasicAuth('username', 'password')
response = requests.get('https://api.example.com/private', auth=auth)

# Digest認証
digest_auth = HTTPDigestAuth('username', 'password')
response = requests.get('https://api.example.com/digest', auth=digest_auth)

# Bearer Token認証
headers = {'Authorization': 'Bearer your-jwt-token'}
response = requests.get('https://api.example.com/protected', headers=headers)

# タイムアウト設定
try:
    # 接続タイムアウト5秒、読み込みタイムアウト10秒
    response = requests.get('https://api.example.com/slow', timeout=(5, 10))
    
    # 全体で15秒のタイムアウト
    response = requests.get('https://api.example.com/data', timeout=15)
    
except requests.exceptions.Timeout:
    print("リクエストがタイムアウトしました")

# SSL設定とクライアント証明書
response = requests.get(
    'https://secure-api.example.com/data',
    cert=('/path/to/client.cert', '/path/to/client.key'),  # クライアント証明書
    verify='/path/to/ca-bundle.crt'  # CA証明書バンドル
)

# SSL証明書検証の無効化(開発時のみ)
response = requests.get('https://self-signed.example.com/', verify=False)

# プロキシ設定
proxies = {
    'http': 'http://proxy.example.com:8080',
    'https': 'http://proxy.example.com:8080'
}

response = requests.get('https://api.example.com/data', proxies=proxies)

# 認証付きプロキシ
proxies = {
    'http': 'http://user:[email protected]:8080',
    'https': 'http://user:[email protected]:8080'
}

# Cookie設定
cookies = {'session_id': 'abc123', 'user_pref': 'dark_mode'}
response = requests.get('https://api.example.com/user-data', cookies=cookies)

# リダイレクト制御
response = requests.get(
    'https://api.example.com/redirect',
    allow_redirects=False,  # リダイレクトを無効化
    max_redirects=5  # 最大リダイレクト数
)

エラーハンドリングとリトライ機能

import requests
from requests.exceptions import (
    ConnectionError, Timeout, RequestException, 
    HTTPError, TooManyRedirects
)
import time
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter

# 包括的なエラーハンドリング
def safe_request(url, **kwargs):
    try:
        response = requests.get(url, **kwargs)
        
        # HTTPステータスコードのチェック
        response.raise_for_status()  # 4xx/5xxでHTTPErrorを発生
        
        return response
        
    except ConnectionError as e:
        print(f"接続エラー: {e}")
        print("ネットワーク接続を確認してください")
    except Timeout as e:
        print(f"タイムアウトエラー: {e}")
        print("リクエストがタイムアウトしました")
    except HTTPError as e:
        print(f"HTTPエラー: {e}")
        print(f"ステータスコード: {response.status_code}")
        print(f"レスポンス: {response.text}")
    except TooManyRedirects as e:
        print(f"リダイレクトエラー: {e}")
        print("リダイレクトが多すぎます")
    except RequestException as e:
        print(f"リクエストエラー: {e}")
        print("予期しないエラーが発生しました")
    
    return None

# 使用例
response = safe_request('https://api.example.com/data', timeout=10)
if response:
    data = response.json()
    print(data)

# 手動リトライ実装
def request_with_retry(url, max_retries=3, backoff_factor=1, **kwargs):
    for attempt in range(max_retries + 1):
        try:
            response = requests.get(url, **kwargs)
            response.raise_for_status()
            return response
            
        except requests.exceptions.RequestException as e:
            if attempt == max_retries:
                print(f"最大試行回数に達しました: {e}")
                raise
            
            wait_time = backoff_factor * (2 ** attempt)
            print(f"試行 {attempt + 1} 失敗. {wait_time}秒後に再試行...")
            time.sleep(wait_time)

# 使用例
try:
    response = request_with_retry(
        'https://api.example.com/unstable',
        max_retries=3,
        backoff_factor=1,
        timeout=10
    )
    print("リクエスト成功:", response.status_code)
except requests.exceptions.RequestException as e:
    print("最終的に失敗:", e)

# urllib3のRetryクラスを使用した高度なリトライ設定
retry_strategy = Retry(
    total=5,  # 総試行回数
    status_forcelist=[429, 500, 502, 503, 504],  # リトライ対象ステータス
    method_whitelist=["HEAD", "GET", "OPTIONS"],  # リトライ対象メソッド
    backoff_factor=1,  # バックオフ係数
    respect_retry_after_header=True  # Retry-Afterヘッダーを尊重
)

adapter = HTTPAdapter(max_retries=retry_strategy)
session = requests.Session()
session.mount("http://", adapter)
session.mount("https://", adapter)

# セッションを使用したリクエスト
response = session.get('https://api.example.com/data', timeout=10)

# ステータスコード別処理
response = requests.get('https://api.example.com/status-check')

if response.status_code == 200:
    print("正常: ", response.json())
elif response.status_code == 401:
    print("認証エラー: トークンを確認してください")
elif response.status_code == 403:
    print("権限エラー: アクセス権限がありません")
elif response.status_code == 404:
    print("見つかりません: リソースが存在しません")
elif response.status_code == 429:
    print("レート制限: しばらく待ってから再試行してください")
elif response.status_code >= 500:
    print("サーバーエラー: サーバー側の問題です")
else:
    print(f"予期しないステータス: {response.status_code}")

並行処理と非同期リクエスト

import requests
import concurrent.futures
from threading import Thread
import queue
import time

# 複数URLの並列取得
def fetch_url(url):
    try:
        response = requests.get(url, timeout=10)
        return {
            'url': url,
            'status_code': response.status_code,
            'content_length': len(response.content),
            'success': True
        }
    except requests.exceptions.RequestException as e:
        return {
            'url': url,
            'error': str(e),
            'success': False
        }

# ThreadPoolExecutorを使用した並列処理
urls = [
    'https://api.example.com/users',
    'https://api.example.com/posts',
    'https://api.example.com/comments',
    'https://api.example.com/categories'
]

def parallel_fetch_threadpool(urls, max_workers=5):
    results = []
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 全URLを並列実行にサブミット
        future_to_url = {executor.submit(fetch_url, url): url for url in urls}
        
        for future in concurrent.futures.as_completed(future_to_url):
            result = future.result()
            results.append(result)
            
            if result['success']:
                print(f"成功: {result['url']} - {result['status_code']}")
            else:
                print(f"失敗: {result['url']} - {result['error']}")
    
    return results

# 使用例
results = parallel_fetch_threadpool(urls)
successful_results = [r for r in results if r['success']]
print(f"成功: {len(successful_results)}/{len(urls)}")

# セッションを使った効率的な並列処理
def fetch_with_session(session, url):
    try:
        response = session.get(url, timeout=10)
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"エラー {url}: {e}")
        return None

def parallel_fetch_with_session(urls, max_workers=5):
    results = []
    
    # セッションは接続プールを共有するため効率的
    session = requests.Session()
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(fetch_with_session, session, url) for url in urls]
        
        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            if result:
                results.append(result)
    
    session.close()
    return results

# ページネーション対応の段階的データ取得
def fetch_all_pages(base_url, headers=None, max_pages=None):
    all_data = []
    page = 1
    session = requests.Session()
    
    if headers:
        session.headers.update(headers)
    
    while True:
        try:
            params = {'page': page, 'per_page': 100}
            response = session.get(base_url, params=params, timeout=10)
            response.raise_for_status()
            
            data = response.json()
            
            if not data or (isinstance(data, list) and len(data) == 0):
                break  # データがない場合は終了
            
            if isinstance(data, dict) and 'items' in data:
                items = data['items']
                all_data.extend(items)
                
                # 次のページがない場合は終了
                if not data.get('has_more', True) or len(items) == 0:
                    break
            else:
                all_data.extend(data)
            
            print(f"ページ {page} 取得完了: {len(data if isinstance(data, list) else data.get('items', []))}件")
            page += 1
            
            # 最大ページ制限チェック
            if max_pages and page > max_pages:
                break
            
            # API負荷軽減のための待機
            time.sleep(0.1)
            
        except requests.exceptions.RequestException as e:
            print(f"ページ {page} でエラー: {e}")
            break
    
    session.close()
    print(f"総取得件数: {len(all_data)}件")
    return all_data

# 使用例
headers = {'Authorization': 'Bearer your-token'}
all_posts = fetch_all_pages('https://api.example.com/posts', headers=headers)

# 重複リクエスト防止とキャッシュ
class RequestCache:
    def __init__(self):
        self.cache = {}
        self.pending = {}
    
    def get_or_fetch(self, url, **kwargs):
        if url in self.cache:
            print(f"キャッシュヒット: {url}")
            return self.cache[url]
        
        if url in self.pending:
            print(f"実行中の重複リクエスト: {url}")
            return None
        
        try:
            self.pending[url] = True
            response = requests.get(url, **kwargs)
            response.raise_for_status()
            
            result = response.json()
            self.cache[url] = result
            return result
            
        finally:
            self.pending.pop(url, None)

# キャッシュ使用例
cache = RequestCache()
data1 = cache.get_or_fetch('https://api.example.com/users/123')
data2 = cache.get_or_fetch('https://api.example.com/users/123')  # キャッシュから取得

フレームワーク統合と実用例

import requests
from requests.auth import AuthBase
import json
import os
from datetime import datetime, timedelta

# Django/Flask統合用のAPIクライアント
class APIClient:
    def __init__(self, base_url, token=None):
        self.base_url = base_url.rstrip('/')
        self.session = requests.Session()
        self.session.headers.update({
            'Content-Type': 'application/json',
            'Accept': 'application/json',
            'User-Agent': 'MyApp/1.0'
        })
        
        if token:
            self.session.headers['Authorization'] = f'Bearer {token}'
    
    def get(self, endpoint, **kwargs):
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        return self._make_request('GET', url, **kwargs)
    
    def post(self, endpoint, data=None, **kwargs):
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        return self._make_request('POST', url, json=data, **kwargs)
    
    def put(self, endpoint, data=None, **kwargs):
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        return self._make_request('PUT', url, json=data, **kwargs)
    
    def delete(self, endpoint, **kwargs):
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        return self._make_request('DELETE', url, **kwargs)
    
    def _make_request(self, method, url, **kwargs):
        try:
            response = self.session.request(method, url, timeout=30, **kwargs)
            response.raise_for_status()
            
            if response.content:
                return response.json()
            return None
            
        except requests.exceptions.RequestException as e:
            print(f"API Error: {method} {url} - {e}")
            raise
    
    def close(self):
        self.session.close()

# 使用例
client = APIClient('https://api.example.com/v1', token='your-jwt-token')

try:
    # ユーザー一覧取得
    users = client.get('users', params={'page': 1, 'limit': 50})
    
    # 新しいユーザー作成
    new_user = client.post('users', data={
        'name': '田中太郎',
        'email': '[email protected]'
    })
    
    # ユーザー更新
    updated_user = client.put(f'users/{new_user["id"]}', data={
        'name': '田中次郎'
    })
    
finally:
    client.close()

# 自動認証更新クライアント
class AuthRefreshClient:
    def __init__(self, base_url, client_id, client_secret):
        self.base_url = base_url
        self.client_id = client_id
        self.client_secret = client_secret
        self.access_token = None
        self.token_expires_at = None
        self.session = requests.Session()
    
    def _ensure_valid_token(self):
        if (not self.access_token or 
            not self.token_expires_at or 
            datetime.now() >= self.token_expires_at):
            self._refresh_token()
    
    def _refresh_token(self):
        auth_data = {
            'client_id': self.client_id,
            'client_secret': self.client_secret,
            'grant_type': 'client_credentials'
        }
        
        response = requests.post(
            f"{self.base_url}/oauth/token",
            data=auth_data,
            timeout=10
        )
        response.raise_for_status()
        
        token_data = response.json()
        self.access_token = token_data['access_token']
        expires_in = token_data.get('expires_in', 3600)
        self.token_expires_at = datetime.now() + timedelta(seconds=expires_in)
        
        self.session.headers['Authorization'] = f'Bearer {self.access_token}'
    
    def request(self, method, endpoint, **kwargs):
        self._ensure_valid_token()
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        return self.session.request(method, url, **kwargs)

# カスタム認証クラス
class APIKeyAuth(AuthBase):
    def __init__(self, api_key, header_name='X-API-Key'):
        self.api_key = api_key
        self.header_name = header_name
    
    def __call__(self, r):
        r.headers[self.header_name] = self.api_key
        return r

# 使用例
api_auth = APIKeyAuth('your-api-key-here')
response = requests.get('https://api.example.com/data', auth=api_auth)

# ファイルアップロード
def upload_file(file_path, upload_url, additional_fields=None):
    """ファイルをマルチパート形式でアップロード"""
    
    files = {'file': open(file_path, 'rb')}
    data = additional_fields or {}
    
    try:
        response = requests.post(
            upload_url,
            files=files,
            data=data,
            headers={'Authorization': 'Bearer your-token'},
            timeout=300  # 5分のタイムアウト
        )
        response.raise_for_status()
        return response.json()
        
    finally:
        files['file'].close()

# 使用例
upload_result = upload_file(
    '/path/to/document.pdf',
    'https://api.example.com/upload',
    additional_fields={'category': 'documents', 'public': 'false'}
)

# ストリーミングダウンロード
def download_large_file(url, local_filename, chunk_size=8192):
    """大きなファイルをストリーミングでダウンロード"""
    
    with requests.get(url, stream=True, timeout=30) as response:
        response.raise_for_status()
        
        # ファイルサイズの取得(可能な場合)
        total_size = int(response.headers.get('content-length', 0))
        downloaded = 0
        
        with open(local_filename, 'wb') as f:
            for chunk in response.iter_content(chunk_size=chunk_size):
                if chunk:  # フィルタリング
                    f.write(chunk)
                    downloaded += len(chunk)
                    
                    # 進捗表示
                    if total_size > 0:
                        progress = (downloaded / total_size) * 100
                        print(f"ダウンロード進捗: {progress:.1f}%", end='\r')
        
        print(f"\nダウンロード完了: {local_filename}")

# 使用例
download_large_file(
    'https://api.example.com/files/large-dataset.zip',
    '/tmp/dataset.zip'
)

# Webhook受信サーバー(Flask統合例)
from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/webhook', methods=['POST'])
def handle_webhook():
    """外部APIからのWebhook受信処理"""
    
    # 送信者検証
    expected_token = os.environ.get('WEBHOOK_SECRET')
    received_token = request.headers.get('X-Webhook-Token')
    
    if not received_token or received_token != expected_token:
        return jsonify({'error': 'Unauthorized'}), 401
    
    # Webhookデータ処理
    webhook_data = request.json
    
    # 外部APIへの通知
    try:
        notification_response = requests.post(
            'https://internal-api.example.com/notifications',
            json={
                'event': webhook_data.get('event'),
                'timestamp': datetime.now().isoformat(),
                'data': webhook_data
            },
            headers={'Authorization': 'Bearer internal-token'},
            timeout=10
        )
        notification_response.raise_for_status()
        
    except requests.exceptions.RequestException as e:
        print(f"通知送信エラー: {e}")
        return jsonify({'error': 'Notification failed'}), 500
    
    return jsonify({'status': 'processed'}), 200