Authlib
認証ライブラリ
Authlib
概要
AuthlibはPython向けの包括的なOAuth、OpenID Connectクライアント・サーバー構築ライブラリです。2025年現在、バージョン1.6.0においてPython3.9+をサポートし、OAuth 1.0、OAuth 2.0、JWT等の各種RFC実装を提供する統一されたライブラリとして地位を確立しています。JWS、JWE、JWK、JWA、JWTの完全実装を含み、Flask、Django、Requests、HTTPX、Starlette、FastAPI等の主要フレームワークとの統合をサポートします。モノリシック設計により、低レベルの仕様実装から高レベルのフレームワーク統合まで、すべてが同期された一貫性のある開発体験を提供します。Lepture(lepture)氏によって開発・メンテナンスされており、Python認証エコシステムにおける信頼性の高い選択肢として広く採用されています。
詳細
Authlib 1.6系は、業界標準のRFC準拠実装により、エンタープライズレベルの認証・認可システム構築を支援します。OAuth 2.0の全グラント(認可コード、クライアントクレデンシャル、パスワード、暗黙的、リフレッシュトークン)、OpenID Connectの全フロー(Code、Implicit、Hybrid)を包括的にサポートし、JWT Bearer Token Grant(RFC7523)、Client Credentials(RFC6749)、PKCE(RFC7636)等の拡張仕様にも対応します。JOSEライブラリとして、JWS、JWE、JWK、JWA、JWTの完全実装を提供し、RSA、ECDSA、EdDSA等の多様な暗号化アルゴリズムに対応します。フレームワーク統合では、Flask OAuth、Django OAuth、Requestsクライアント、HTTPXクライアント等の専用統合パッケージにより、各フレームワークの特性を活かした実装が可能です。
主な特徴
- 包括的RFC対応: OAuth 1.0/2.0、OpenID Connect、JWT、JOSEの完全実装
- モノリシック設計: 仕様実装からフレームワーク統合まで一貫した設計
- マルチフレームワーク: Flask、Django、FastAPI、Starlette等への統合
- JOSE完全対応: JWS、JWE、JWK、JWA、JWTの業界標準実装
- クライアント・サーバー両対応: 認証クライアントと認証サーバーの両方をサポート
- 拡張性: カスタムグラント、カスタム認証方式の柔軟な実装
メリット・デメリット
メリット
- Python認証エコシステムで最も包括的で信頼性の高いライブラリ
- 業界標準RFC準拠により互換性とセキュリティを保証
- モノリシック設計で一貫した開発体験と学習コストの削減
- 主要Pythonフレームワークとの深い統合で効率的な開発
- JOSEライブラリとしても独立して利用可能
- 活発な開発コミュニティと豊富なドキュメント
デメリット
- OAuth 2.0/OpenID Connectの深い理解が必要で学習コストが高い
- モノリシック設計のため必要最小限の機能のみの利用でもサイズが大きい
- 複雑な認証フローの実装には細かな設定とカスタマイズが必要
- エンタープライズ向け機能が豊富で小規模プロジェクトには過剰な場合がある
- フレームワーク固有の統合実装により、移植性に制約がある場合がある
- 高度な暗号化機能の利用には暗号学の知識が必要
参考ページ
書き方の例
基本的なインストールとOAuth 2.0クライアント設定
# Authlibのインストール
pip install authlib
# 追加のHTTPクライアント(必要に応じて)
pip install httpx # HTTPXクライアント使用の場合
pip install requests # Requestsクライアント使用の場合
# Webフレームワーク統合(必要に応じて)
pip install flask # Flask統合の場合
pip install django # Django統合の場合
pip install fastapi uvicorn # FastAPI統合の場合
# oauth_client.py - OAuth 2.0クライアント基本設定
from authlib.integrations.requests_client import OAuth2Session
from authlib.integrations.httpx_client import AsyncOAuth2Client
from authlib.oauth2.rfc6749.util import generate_token
import os
class OAuthClient:
def __init__(self, client_id, client_secret, authorization_url, token_url):
self.client_id = client_id
self.client_secret = client_secret
self.authorization_url = authorization_url
self.token_url = token_url
self.redirect_uri = 'http://localhost:8080/callback'
def create_session(self):
"""OAuth2Sessionを作成"""
return OAuth2Session(
self.client_id,
self.client_secret,
redirect_uri=self.redirect_uri,
scope='read write'
)
def get_authorization_url(self):
"""認証URLを生成"""
session = self.create_session()
authorization_url, state = session.create_authorization_url(
self.authorization_url
)
return authorization_url, state
def fetch_token(self, authorization_response_url, state):
"""アクセストークンを取得"""
session = self.create_session()
token = session.fetch_token(
self.token_url,
authorization_response=authorization_response_url,
state=state
)
return token
# 使用例
oauth_client = OAuthClient(
client_id='your-client-id',
client_secret='your-client-secret',
authorization_url='https://example.com/oauth/authorize',
token_url='https://example.com/oauth/token'
)
# 認証URL生成
auth_url, state = oauth_client.get_authorization_url()
print(f"認証URL: {auth_url}")
Flask統合とOpenID Connect実装
# flask_oidc_app.py - Flask OpenID Connect アプリケーション
from flask import Flask, redirect, url_for, session, request, jsonify
from authlib.integrations.flask_client import OAuth
from authlib.oidc.core import UserInfo
app = Flask(__name__)
app.secret_key = 'your-secret-key'
# OAuth設定
oauth = OAuth(app)
# Google OpenID Connect クライアント登録
google = oauth.register(
'google',
client_id='your-google-client-id',
client_secret='your-google-client-secret',
server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
client_kwargs={
'scope': 'openid profile email'
}
)
# GitHub OAuth クライアント登録
github = oauth.register(
'github',
client_id='your-github-client-id',
client_secret='your-github-client-secret',
access_token_url='https://github.com/login/oauth/access_token',
authorize_url='https://github.com/login/oauth/authorize',
api_base_url='https://api.github.com/',
client_kwargs={'scope': 'user:email'},
)
@app.route('/')
def index():
user = session.get('user')
if user:
return f'<h1>Hello, {user["name"]}!</h1><a href="/logout">Logout</a>'
return '''
<h1>OAuth Login</h1>
<a href="/login/google">Login with Google</a><br>
<a href="/login/github">Login with GitHub</a>
'''
@app.route('/login/<provider>')
def login(provider):
client = oauth.create_client(provider)
redirect_uri = url_for('authorize', provider=provider, _external=True)
return client.authorize_redirect(redirect_uri)
@app.route('/authorize/<provider>')
def authorize(provider):
client = oauth.create_client(provider)
token = client.authorize_access_token()
if provider == 'google':
# OpenID Connect - userinfo は自動的に id_token から取得
userinfo = token.get('userinfo')
if userinfo:
session['user'] = {
'name': userinfo['name'],
'email': userinfo['email'],
'provider': provider
}
elif provider == 'github':
# GitHub API を使用してユーザー情報取得
resp = client.get('user', token=token)
userinfo = resp.json()
session['user'] = {
'name': userinfo['name'] or userinfo['login'],
'email': userinfo.get('email'),
'provider': provider
}
return redirect(url_for('index'))
@app.route('/logout')
def logout():
session.pop('user', None)
return redirect(url_for('index'))
@app.route('/api/profile')
def profile():
user = session.get('user')
if not user:
return jsonify({'error': 'Not authenticated'}), 401
return jsonify(user)
if __name__ == '__main__':
app.run(debug=True, port=8080)
Django統合とカスタム認証サーバー実装
# django_auth_server.py - Django OAuth 2.0認証サーバー
from django.contrib.auth.models import User
from django.db import models
from authlib.integrations.django_oauth2 import (
AuthorizationServer,
ResourceProtector
)
from authlib.oauth2.rfc6749 import grants
from authlib.oidc.core import grants as oidc_grants, UserInfo
from authlib.common.security import generate_token
# クライアントモデル
class OAuth2Client(models.Model):
client_id = models.CharField(max_length=48, unique=True, db_index=True)
client_secret = models.CharField(max_length=120)
client_name = models.CharField(max_length=120)
redirect_uris = models.TextField(default='')
default_scopes = models.TextField(default='')
grant_types = models.TextField(default='')
response_types = models.TextField(default='')
def get_client_id(self):
return self.client_id
def get_default_redirect_uri(self):
return self.redirect_uris.split()[0] if self.redirect_uris else ''
def get_allowed_scope(self, scope):
if not scope:
return ''
allowed = set(self.default_scopes.split())
scopes = set(scope.split())
return ' '.join([s for s in scopes if s in allowed])
def check_redirect_uri(self, redirect_uri):
return redirect_uri in self.redirect_uris.split()
def has_client_secret(self):
return bool(self.client_secret)
def check_client_secret(self, client_secret):
return self.client_secret == client_secret
def check_token_endpoint_auth_method(self, method):
return method in ['client_secret_basic', 'client_secret_post']
def check_response_type(self, response_type):
return response_type in self.response_types.split()
def check_grant_type(self, grant_type):
return grant_type in self.grant_types.split()
# 認証コードモデル
class OAuth2AuthorizationCode(models.Model):
code = models.CharField(max_length=120, unique=True)
client_id = models.CharField(max_length=48, db_index=True)
redirect_uri = models.TextField(default='')
response_type = models.TextField(default='')
scope = models.TextField(default='')
user = models.ForeignKey(User, on_delete=models.CASCADE)
auth_time = models.IntegerField()
nonce = models.CharField(max_length=120, default='') # OpenID Connect用
def is_expired(self):
# 10分で期限切れ
import time
return time.time() - self.auth_time > 600
def get_redirect_uri(self):
return self.redirect_uri
def get_scope(self):
return self.scope
# トークンモデル
class OAuth2Token(models.Model):
client_id = models.CharField(max_length=48, db_index=True)
token_type = models.CharField(max_length=40, default='Bearer')
access_token = models.CharField(max_length=255, unique=True, null=False)
refresh_token = models.CharField(max_length=255, db_index=True)
scope = models.TextField(default='')
revoked = models.BooleanField(default=False)
issued_at = models.IntegerField()
expires_in = models.IntegerField()
user = models.ForeignKey(User, on_delete=models.CASCADE)
def get_scope(self):
return self.scope
def get_expires_in(self):
return self.expires_in
def is_expired(self):
import time
return time.time() > self.issued_at + self.expires_in
# カスタム認証コードグラント
class AuthorizationCodeGrant(grants.AuthorizationCodeGrant):
TOKEN_ENDPOINT_AUTH_METHODS = [
'client_secret_basic',
'client_secret_post',
'none'
]
def save_authorization_code(self, code, request):
import time
nonce = request.data.get('nonce')
auth_code = OAuth2AuthorizationCode(
code=code,
client_id=request.client.client_id,
redirect_uri=request.redirect_uri,
scope=request.scope,
user=request.user,
auth_time=int(time.time()),
nonce=nonce or ''
)
auth_code.save()
return auth_code
def query_authorization_code(self, code, client):
try:
auth_code = OAuth2AuthorizationCode.objects.get(
code=code, client_id=client.client_id
)
if auth_code.is_expired():
return None
return auth_code
except OAuth2AuthorizationCode.DoesNotExist:
return None
def delete_authorization_code(self, authorization_code):
authorization_code.delete()
def authenticate_user(self, authorization_code):
return authorization_code.user
# OpenID Connect Code Extension
class OpenIDCode(oidc_grants.OpenIDCode):
def exists_nonce(self, nonce, request):
try:
OAuth2AuthorizationCode.objects.get(
client_id=request.client_id, nonce=nonce
)
return True
except OAuth2AuthorizationCode.DoesNotExist:
return False
def get_jwt_config(self, grant):
# JWT設定(実際の環境では適切な秘密鍵を使用)
return {
'key': '''-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEA...
-----END RSA PRIVATE KEY-----''',
'alg': 'RS256',
'iss': 'https://your-auth-server.com',
'exp': 3600
}
def generate_user_info(self, user, scope):
user_info = UserInfo(sub=str(user.pk), name=user.get_full_name())
if 'email' in scope:
user_info['email'] = user.email
if 'profile' in scope:
user_info.update({
'given_name': user.first_name,
'family_name': user.last_name,
'preferred_username': user.username
})
return user_info
# 認証サーバーセットアップ
authorization_server = AuthorizationServer()
# グラント登録
authorization_server.register_grant(
AuthorizationCodeGrant,
[OpenIDCode(require_nonce=True)]
)
# プロテクター設定
require_oauth = ResourceProtector()
# views.py での使用例
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
@csrf_exempt
def authorize(request):
# 認証エンドポイント
try:
grant = authorization_server.get_consent_grant(request)
except OAuth2Error as error:
return JsonResponse(error.get_body(), status=error.status_code)
# ユーザー認証とコンセント処理
if request.method == 'GET':
# 認証フォーム表示
try:
grant = authorization_server.validate_consent_request(request)
context = {
'grant': grant,
'user': request.user
}
return render(request, 'oauth2/authorize.html', context)
except OAuth2Error as error:
return JsonResponse(error.get_body(), status=error.status_code)
# コンセント処理
if request.POST.get('confirm'):
return authorization_server.create_authorization_response(request)
else:
return authorization_server.create_authorization_response(request, grant_user=None)
@csrf_exempt
def issue_token(request):
# トークンエンドポイント
return authorization_server.create_token_response(request)
@require_oauth('profile')
def api_profile(request):
# 保護されたリソース
user = request.oauth_token.user
return JsonResponse({
'sub': str(user.pk),
'name': user.get_full_name(),
'email': user.email
})
HTTPXクライアントとJWT操作
# jwt_operations.py - JWT操作とHTTPXクライアント
import asyncio
from authlib.integrations.httpx_client import AsyncOAuth2Client
from authlib.jose import JsonWebSignature, JsonWebKey, jwt
from authlib.common.security import generate_token
import httpx
class JWTManager:
def __init__(self, private_key_pem, algorithm='RS256'):
self.private_key = private_key_pem
self.algorithm = algorithm
self.jws = JsonWebSignature()
def create_jwt_token(self, payload, headers=None):
"""JWTトークンを生成"""
if headers is None:
headers = {'alg': self.algorithm}
return jwt.encode(headers, payload, self.private_key)
def decode_jwt_token(self, token, public_key):
"""JWTトークンをデコード"""
try:
return jwt.decode(token, public_key)
except Exception as e:
raise ValueError(f"JWT decode error: {e}")
def create_client_assertion(self, client_id, token_endpoint):
"""クライアント認証用JWTアサーション生成"""
import time
payload = {
'iss': client_id,
'sub': client_id,
'aud': token_endpoint,
'iat': int(time.time()),
'exp': int(time.time()) + 300, # 5分後に期限切れ
'jti': generate_token()
}
return self.create_jwt_token(payload)
class AsyncOAuthClient:
def __init__(self, client_id, private_key, token_endpoint):
self.client_id = client_id
self.private_key = private_key
self.token_endpoint = token_endpoint
self.jwt_manager = JWTManager(private_key)
async def get_client_credentials_token(self, scope=None):
"""クライアントクレデンシャルグラントでトークン取得"""
async with httpx.AsyncClient() as client:
# Client assertion作成
client_assertion = self.jwt_manager.create_client_assertion(
self.client_id,
self.token_endpoint
)
data = {
'grant_type': 'client_credentials',
'client_assertion_type': 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
'client_assertion': client_assertion
}
if scope:
data['scope'] = scope
response = await client.post(
self.token_endpoint,
data=data,
headers={'Content-Type': 'application/x-www-form-urlencoded'}
)
if response.status_code == 200:
return response.json()
else:
response.raise_for_status()
async def refresh_token(self, refresh_token):
"""リフレッシュトークンでアクセストークン更新"""
async with httpx.AsyncClient() as client:
client_assertion = self.jwt_manager.create_client_assertion(
self.client_id,
self.token_endpoint
)
data = {
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
'client_assertion_type': 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
'client_assertion': client_assertion
}
response = await client.post(
self.token_endpoint,
data=data,
headers={'Content-Type': 'application/x-www-form-urlencoded'}
)
return response.json()
# 使用例
async def main():
# 秘密鍵(実際の環境では適切に管理)
private_key = '''-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEA...
-----END RSA PRIVATE KEY-----'''
oauth_client = AsyncOAuthClient(
client_id='your-client-id',
private_key=private_key,
token_endpoint='https://auth-server.com/oauth/token'
)
try:
# クライアントクレデンシャルグラントでトークン取得
token_response = await oauth_client.get_client_credentials_token(
scope='read write'
)
print("Access Token:", token_response['access_token'])
print("Token Type:", token_response['token_type'])
print("Expires In:", token_response['expires_in'])
# APIコール例
access_token = token_response['access_token']
async with httpx.AsyncClient() as client:
headers = {'Authorization': f'Bearer {access_token}'}
api_response = await client.get(
'https://api.example.com/data',
headers=headers
)
print("API Response:", api_response.json())
except Exception as e:
print(f"Error: {e}")
# 実行
if __name__ == '__main__':
asyncio.run(main())
FastAPI統合とリソース保護
# fastapi_oauth.py - FastAPI OAuth 2.0リソースサーバー
from fastapi import FastAPI, Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from authlib.jose import jwt, JsonWebKey
from authlib.jose.errors import InvalidClaimsError, ExpiredTokenError
import httpx
import time
from typing import Optional
app = FastAPI(title="OAuth 2.0 Protected API")
# セキュリティスキーム
security = HTTPBearer()
class JWTBearer:
def __init__(self, jwks_uri: str, algorithms: list = None):
self.jwks_uri = jwks_uri
self.algorithms = algorithms or ['RS256']
self.jwks_cache = {}
self.cache_expiry = 0
async def get_jwks(self):
"""JWKS を取得(キャッシュ機能付き)"""
current_time = time.time()
if current_time > self.cache_expiry:
async with httpx.AsyncClient() as client:
response = await client.get(self.jwks_uri)
response.raise_for_status()
self.jwks_cache = response.json()
self.cache_expiry = current_time + 3600 # 1時間キャッシュ
return self.jwks_cache
async def verify_token(self, token: str):
"""JWTトークンを検証"""
try:
# JWKS取得
jwks_data = await self.get_jwks()
jwks = JsonWebKey.import_key_set(jwks_data)
# JWT デコード・検証
claims = jwt.decode(token, jwks)
claims.validate()
return claims
except ExpiredTokenError:
raise HTTPException(status_code=401, detail="Token has expired")
except InvalidClaimsError as e:
raise HTTPException(status_code=401, detail=f"Invalid token claims: {e}")
except Exception as e:
raise HTTPException(status_code=401, detail=f"Token verification failed: {e}")
# JWT Bearer インスタンス
jwt_bearer = JWTBearer(jwks_uri="https://auth-server.com/.well-known/jwks.json")
async def get_current_user(credentials: HTTPAuthorizationCredentials = Security(security)):
"""現在のユーザーを取得"""
token = credentials.credentials
claims = await jwt_bearer.verify_token(token)
return claims
def require_scope(required_scope: str):
"""特定のスコープを要求するデペンデンシー"""
async def scope_dependency(claims: dict = Depends(get_current_user)):
token_scopes = claims.get('scope', '').split()
if required_scope not in token_scopes:
raise HTTPException(
status_code=403,
detail=f"Insufficient scope. Required: {required_scope}"
)
return claims
return scope_dependency
@app.get("/")
async def read_root():
return {"message": "OAuth 2.0 Protected API", "version": "1.0"}
@app.get("/public")
async def public_endpoint():
"""パブリックエンドポイント(認証不要)"""
return {"message": "This is a public endpoint"}
@app.get("/protected")
async def protected_endpoint(claims: dict = Depends(get_current_user)):
"""保護されたエンドポイント(認証必要)"""
return {
"message": "This is a protected endpoint",
"user": {
"sub": claims.get("sub"),
"iss": claims.get("iss"),
"scopes": claims.get("scope", "").split()
}
}
@app.get("/profile")
async def get_profile(claims: dict = Depends(require_scope("profile"))):
"""プロフィール情報(profile スコープ必要)"""
return {
"sub": claims.get("sub"),
"name": claims.get("name"),
"email": claims.get("email"),
"preferred_username": claims.get("preferred_username")
}
@app.get("/admin")
async def admin_endpoint(claims: dict = Depends(require_scope("admin"))):
"""管理者エンドポイント(admin スコープ必要)"""
return {
"message": "Admin access granted",
"admin_data": {
"user_count": 1000,
"server_status": "healthy"
}
}
@app.post("/data")
async def create_data(
data: dict,
claims: dict = Depends(require_scope("write"))
):
"""データ作成(write スコープ必要)"""
return {
"message": "Data created successfully",
"created_by": claims.get("sub"),
"data": data
}
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
return {"error": exc.detail, "status_code": exc.status_code}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
エラーハンドリングとテスト
# test_oauth.py - OAuth 2.0テスト実装
import pytest
import httpx
from authlib.oauth2.rfc6749.errors import OAuth2Error
from authlib.jose import jwt
from authlib.common.security import generate_token
import time
class OAuthTestClient:
def __init__(self, base_url: str):
self.base_url = base_url
self.client = httpx.Client(base_url=base_url)
def test_authorization_code_flow(self, client_id, redirect_uri, scope):
"""認証コードフロー テスト"""
# Step 1: 認証URL生成
auth_params = {
'response_type': 'code',
'client_id': client_id,
'redirect_uri': redirect_uri,
'scope': scope,
'state': generate_token()
}
auth_response = self.client.get('/oauth/authorize', params=auth_params)
assert auth_response.status_code == 200
return auth_params['state']
def test_token_exchange(self, code, client_id, client_secret, redirect_uri):
"""トークン交換テスト"""
token_data = {
'grant_type': 'authorization_code',
'code': code,
'client_id': client_id,
'client_secret': client_secret,
'redirect_uri': redirect_uri
}
token_response = self.client.post('/oauth/token', data=token_data)
if token_response.status_code == 200:
token_data = token_response.json()
assert 'access_token' in token_data
assert 'token_type' in token_data
assert token_data['token_type'].lower() == 'bearer'
return token_data
else:
raise Exception(f"Token exchange failed: {token_response.text}")
def test_protected_resource(self, access_token, resource_path='/api/profile'):
"""保護されたリソースアクセステスト"""
headers = {'Authorization': f'Bearer {access_token}'}
response = self.client.get(resource_path, headers=headers)
return response
class OAuth2ErrorHandler:
"""OAuth 2.0エラーハンドリング"""
@staticmethod
def handle_token_error(error_response):
"""トークンエラーの処理"""
if isinstance(error_response, dict):
error_code = error_response.get('error')
error_description = error_response.get('error_description')
error_mapping = {
'invalid_request': 'リクエストが無効です',
'invalid_client': 'クライアント認証に失敗しました',
'invalid_grant': '認証グラントが無効です',
'unauthorized_client': 'クライアントは認可されていません',
'unsupported_grant_type': 'サポートされていないグラントタイプです',
'invalid_scope': '無効なスコープです'
}
user_message = error_mapping.get(error_code, '不明なエラーが発生しました')
return {
'error': error_code,
'description': error_description,
'user_message': user_message
}
@staticmethod
def handle_jwt_error(jwt_token, public_key):
"""JWT検証エラーの処理"""
try:
claims = jwt.decode(jwt_token, public_key)
claims.validate()
return {'valid': True, 'claims': claims}
except Exception as e:
error_type = type(e).__name__
error_messages = {
'ExpiredTokenError': 'トークンの有効期限が切れています',
'InvalidSignatureError': 'トークンの署名が無効です',
'InvalidClaimsError': 'トークンのクレームが無効です',
'InvalidTokenError': 'トークンの形式が無効です'
}
return {
'valid': False,
'error': error_type,
'message': error_messages.get(error_type, '不明なJWTエラー'),
'details': str(e)
}
# pytest テスト例
@pytest.mark.asyncio
async def test_oauth_flow():
"""OAuth フロー統合テスト"""
test_client = OAuthTestClient('http://localhost:8000')
# テスト設定
client_id = 'test-client'
client_secret = 'test-secret'
redirect_uri = 'http://localhost:8080/callback'
scope = 'read write profile'
try:
# 認証URLテスト
state = test_client.test_authorization_code_flow(
client_id, redirect_uri, scope
)
# 実際のテストではここでブラウザ操作をシミュレート
# 認証コード取得(モック)
mock_code = 'test-authorization-code'
# トークン交換テスト
token_data = test_client.test_token_exchange(
mock_code, client_id, client_secret, redirect_uri
)
# 保護リソースアクセステスト
protected_response = test_client.test_protected_resource(
token_data['access_token']
)
assert protected_response.status_code == 200
profile_data = protected_response.json()
assert 'sub' in profile_data
print("OAuth flow test passed!")
except Exception as e:
pytest.fail(f"OAuth flow test failed: {e}")
if __name__ == '__main__':
# 単体テスト実行
error_handler = OAuth2ErrorHandler()
# エラーハンドリングテスト
sample_error = {
'error': 'invalid_client',
'error_description': 'Client authentication failed'
}
handled_error = error_handler.handle_token_error(sample_error)
print("Error handling test:", handled_error)