PyJWT

ライブラリ認証JWTトークンセキュリティPythonAPI認証Webトークン

ライブラリ

PyJWT

概要

PyJWTは、Pythonで最も人気のあるJSON Web Token(JWT)の実装ライブラリです。JWTの生成、検証、デコードを簡単に行うことができ、RESTful APIやWebアプリケーションでの認証・認可システムの構築に広く使用されています。対称暗号化(HMAC)から非対称暗号化(RSA、ECDSA、EdDSA)まで、幅広いアルゴリズムをサポートしています。

詳細

PyJWTは、JWT(JSON Web Token)の標準仕様RFC 7519に準拠したPython実装ライブラリです。2015年にJosé Padillaによって開発され、現在では世界中の多くのWebアプリケーションやAPIサービスで使用されています。JWTは、安全にパーティ間で情報を伝送するためのコンパクトで自己完結型のトークン形式で、認証・認可の仕組みに特に適しています。

PyJWTライブラリの主な特徴として、シンプルで直感的なAPIデザイン、豊富な暗号化アルゴリズムサポート、JWKS(JSON Web Key Set)エンドポイントとの統合機能、包括的なエラーハンドリング機能が挙げられます。2025年現在、最新バージョンは2.10.1で、セキュリティアップデートと機能拡張が継続的に行われています。

特に企業環境での利用においては、Auth0、AWS Cognito、Azure AD、Firebase AuthenticationなどのIDプロバイダーとの統合が容易であり、OAuth 2.0/OpenID Connectのフローとも親和性が高いため、現代的なWebアプリケーション開発において重要な位置を占めています。

メリット・デメリット

メリット

  • シンプルなAPI設計: jwt.encode()jwt.decode()の2つの主要関数でほぼすべての操作が可能
  • 豊富なアルゴリズムサポート: HS256, RS256, ES256, EdDSA, PS256など主要アルゴリズムを包括的にカバー
  • セキュリティ重視: 署名検証、有効期限チェック、クレーム検証などの安全機能がデフォルトで有効
  • JWKS統合機能: JSON Web Key Setエンドポイントからの自動鍵取得とローテーション対応
  • 包括的なエラーハンドリング: 期限切れ、署名無効、アルゴリズム不正など詳細なエラー分類
  • 高いパフォーマンス: Cレベルの暗号化ライブラリとの効率的な統合
  • 活発なコミュニティ: 継続的なセキュリティアップデートと機能改善

デメリット

  • 依存関係の複雑さ: 非対称アルゴリズム使用時はcryptographyライブラリが必要
  • JWTの制約: トークンサイズが大きくなりがちで、リボケーション(無効化)が困難
  • 暗号化未対応: JWE(JSON Web Encryption)は未サポート(別ライブラリが必要)
  • 設定の複雑さ: 本格的なセキュリティ設定には深い暗号学知識が必要
  • デバッグの難しさ: トークン内容がBase64エンコードされており、直接的な内容確認が困難

参考ページ

書き方の例

インストールと基本設定

# 基本インストール(HMAC系アルゴリズムのみ)
pip install PyJWT

# 非対称アルゴリズム対応版のインストール
pip install PyJWT[crypto]

# ライブラリのインポート
import jwt
import datetime
from datetime import timezone

# 基本的な設定
SECRET_KEY = "your-secret-key-here"  # 本番環境では環境変数から取得
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

JWTトークンの生成と検証

import jwt
from datetime import datetime, timedelta, timezone

def create_access_token(data: dict, expires_delta: timedelta = None):
    """アクセストークンの生成"""
    to_encode = data.copy()
    
    # 有効期限の設定
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    
    to_encode.update({"exp": expire})
    to_encode.update({"iat": datetime.now(timezone.utc)})  # 発行時刻
    to_encode.update({"iss": "your-app-name"})  # 発行者
    
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def verify_token(token: str):
    """トークンの検証とデコード"""
    try:
        payload = jwt.decode(
            token, 
            SECRET_KEY, 
            algorithms=[ALGORITHM],
            options={"verify_exp": True, "verify_iss": True}
        )
        return payload
    except jwt.ExpiredSignatureError:
        raise ValueError("トークンの有効期限が切れています")
    except jwt.InvalidTokenError:
        raise ValueError("無効なトークンです")

# 使用例
user_data = {"sub": "user123", "username": "john_doe", "role": "admin"}
token = create_access_token(user_data)
print(f"生成されたトークン: {token}")

# トークンの検証
decoded_data = verify_token(token)
print(f"デコードされたデータ: {decoded_data}")

ペイロードの管理とクレーム

import jwt
from datetime import datetime, timedelta, timezone

def create_comprehensive_token(user_id: str, username: str, roles: list, permissions: list):
    """包括的なクレームを含むトークンの生成"""
    now = datetime.now(timezone.utc)
    
    payload = {
        # 標準クレーム(Registered Claims)
        "iss": "https://your-app.com",  # 発行者
        "sub": user_id,                  # 対象者(ユーザーID)
        "aud": "your-app-users",         # 対象者(アプリケーション)
        "exp": now + timedelta(hours=1), # 有効期限
        "nbf": now,                      # 有効開始時間
        "iat": now,                      # 発行時刻
        "jti": f"token_{user_id}_{int(now.timestamp())}",  # トークンID
        
        # カスタムクレーム(Private Claims)
        "username": username,
        "roles": roles,
        "permissions": permissions,
        "last_login": now.isoformat(),
        "is_admin": "admin" in roles
    }
    
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

def validate_claims(token: str, required_role: str = None):
    """詳細なクレーム検証"""
    try:
        payload = jwt.decode(
            token,
            SECRET_KEY,
            algorithms=[ALGORITHM],
            audience="your-app-users",
            issuer="https://your-app.com",
            options={
                "verify_signature": True,
                "verify_exp": True,
                "verify_nbf": True,
                "verify_iat": True,
                "verify_aud": True,
                "verify_iss": True
            }
        )
        
        # ロール権限チェック
        if required_role and required_role not in payload.get("roles", []):
            raise ValueError(f"必要なロール '{required_role}' を持っていません")
        
        return payload
        
    except jwt.InvalidAudienceError:
        raise ValueError("無効な対象者です")
    except jwt.InvalidIssuerError:
        raise ValueError("無効な発行者です")
    except jwt.ImmatureSignatureError:
        raise ValueError("トークンはまだ有効ではありません")

# 使用例
token = create_comprehensive_token(
    user_id="user123",
    username="john_doe",
    roles=["user", "admin"],
    permissions=["read", "write", "delete"]
)

# 管理者権限が必要な操作
admin_data = validate_claims(token, required_role="admin")
print(f"管理者データ: {admin_data}")

高度なJWT機能(リフレッシュトークン等)

import jwt
import secrets
from datetime import datetime, timedelta, timezone

class JWTManager:
    """JWT管理クラス"""
    
    def __init__(self, secret_key: str, algorithm: str = "HS256"):
        self.secret_key = secret_key
        self.algorithm = algorithm
        self.refresh_tokens = {}  # 実際の運用ではRedisなどを使用
    
    def create_token_pair(self, user_id: str, user_data: dict):
        """アクセストークンとリフレッシュトークンのペア生成"""
        now = datetime.now(timezone.utc)
        
        # アクセストークン(短期間有効)
        access_payload = {
            "sub": user_id,
            "type": "access",
            "exp": now + timedelta(minutes=15),
            "iat": now,
            **user_data
        }
        access_token = jwt.encode(access_payload, self.secret_key, algorithm=self.algorithm)
        
        # リフレッシュトークン(長期間有効)
        refresh_jti = secrets.token_urlsafe(32)
        refresh_payload = {
            "sub": user_id,
            "type": "refresh",
            "jti": refresh_jti,
            "exp": now + timedelta(days=30),
            "iat": now
        }
        refresh_token = jwt.encode(refresh_payload, self.secret_key, algorithm=self.algorithm)
        
        # リフレッシュトークンをストレージに保存
        self.refresh_tokens[refresh_jti] = {
            "user_id": user_id,
            "created_at": now,
            "is_active": True
        }
        
        return {
            "access_token": access_token,
            "refresh_token": refresh_token,
            "token_type": "bearer",
            "expires_in": 900  # 15分
        }
    
    def refresh_access_token(self, refresh_token: str):
        """リフレッシュトークンを使用したアクセストークンの更新"""
        try:
            payload = jwt.decode(
                refresh_token,
                self.secret_key,
                algorithms=[self.algorithm]
            )
            
            if payload.get("type") != "refresh":
                raise ValueError("無効なトークンタイプです")
            
            jti = payload.get("jti")
            if not jti or jti not in self.refresh_tokens:
                raise ValueError("無効なリフレッシュトークンです")
            
            token_info = self.refresh_tokens[jti]
            if not token_info["is_active"]:
                raise ValueError("リフレッシュトークンが無効化されています")
            
            user_id = payload["sub"]
            
            # 新しいアクセストークンを生成
            now = datetime.now(timezone.utc)
            new_access_payload = {
                "sub": user_id,
                "type": "access",
                "exp": now + timedelta(minutes=15),
                "iat": now
            }
            
            new_access_token = jwt.encode(
                new_access_payload,
                self.secret_key,
                algorithm=self.algorithm
            )
            
            return {
                "access_token": new_access_token,
                "token_type": "bearer",
                "expires_in": 900
            }
            
        except jwt.ExpiredSignatureError:
            raise ValueError("リフレッシュトークンの有効期限が切れています")
        except jwt.InvalidTokenError:
            raise ValueError("無効なリフレッシュトークンです")
    
    def revoke_refresh_token(self, refresh_token: str):
        """リフレッシュトークンの無効化"""
        try:
            payload = jwt.decode(
                refresh_token,
                self.secret_key,
                algorithms=[self.algorithm]
            )
            jti = payload.get("jti")
            if jti in self.refresh_tokens:
                self.refresh_tokens[jti]["is_active"] = False
                return True
        except jwt.InvalidTokenError:
            pass
        return False

# 使用例
jwt_manager = JWTManager(SECRET_KEY)

# トークンペアの生成
user_data = {"username": "john_doe", "role": "user"}
tokens = jwt_manager.create_token_pair("user123", user_data)
print(f"トークンペア: {tokens}")

# アクセストークンの更新
new_tokens = jwt_manager.refresh_access_token(tokens["refresh_token"])
print(f"新しいアクセストークン: {new_tokens}")

Flask/Django統合

# Flask統合例
from flask import Flask, request, jsonify, g
from functools import wraps
import jwt

app = Flask(__name__)
app.config['JWT_SECRET_KEY'] = 'your-secret-key'

def token_required(f):
    """JWT認証デコレータ"""
    @wraps(f)
    def decorated(*args, **kwargs):
        token = request.headers.get('Authorization')
        
        if not token:
            return jsonify({'message': 'トークンが必要です'}), 401
        
        try:
            # "Bearer "プレフィックスを削除
            if token.startswith('Bearer '):
                token = token[7:]
            
            data = jwt.decode(
                token,
                app.config['JWT_SECRET_KEY'],
                algorithms=['HS256']
            )
            g.current_user = data['sub']
            g.user_data = data
            
        except jwt.ExpiredSignatureError:
            return jsonify({'message': 'トークンの有効期限が切れています'}), 401
        except jwt.InvalidTokenError:
            return jsonify({'message': '無効なトークンです'}), 401
        
        return f(*args, **kwargs)
    return decorated

@app.route('/login', methods=['POST'])
def login():
    """ログイン処理"""
    username = request.json.get('username')
    password = request.json.get('password')
    
    # 実際の認証処理(省略)
    if username == "admin" and password == "password":
        token_data = {
            'sub': 'user123',
            'username': username,
            'exp': datetime.now(timezone.utc) + timedelta(hours=1)
        }
        token = jwt.encode(token_data, app.config['JWT_SECRET_KEY'], algorithm='HS256')
        return jsonify({'access_token': token})
    
    return jsonify({'message': '認証に失敗しました'}), 401

@app.route('/protected', methods=['GET'])
@token_required
def protected():
    """保護されたエンドポイント"""
    return jsonify({
        'message': f'こんにちは、{g.user_data["username"]}さん!',
        'user_id': g.current_user
    })

# Django統合例(settings.pyとviews.py)

# settings.py
JWT_AUTH = {
    'JWT_SECRET_KEY': 'your-secret-key',
    'JWT_ALGORITHM': 'HS256',
    'JWT_EXPIRATION_DELTA': timedelta(hours=1),
}

# views.py
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from django.views import View
import json
import jwt
from django.conf import settings

class JWTAuthMixin:
    """Django用JWT認証ミックスイン"""
    
    def dispatch(self, request, *args, **kwargs):
        auth_header = request.META.get('HTTP_AUTHORIZATION')
        
        if not auth_header or not auth_header.startswith('Bearer '):
            return JsonResponse({'error': '認証が必要です'}, status=401)
        
        token = auth_header[7:]
        
        try:
            payload = jwt.decode(
                token,
                settings.JWT_AUTH['JWT_SECRET_KEY'],
                algorithms=[settings.JWT_AUTH['JWT_ALGORITHM']]
            )
            request.jwt_user = payload
        except jwt.ExpiredSignatureError:
            return JsonResponse({'error': 'トークンの有効期限が切れています'}, status=401)
        except jwt.InvalidTokenError:
            return JsonResponse({'error': '無効なトークンです'}, status=401)
        
        return super().dispatch(request, *args, **kwargs)

@method_decorator(csrf_exempt, name='dispatch')
class ProtectedView(JWTAuthMixin, View):
    """保護されたビュー"""
    
    def get(self, request):
        return JsonResponse({
            'message': f'こんにちは、{request.jwt_user["username"]}さん!',
            'user_data': request.jwt_user
        })

テストとデバッグ

import jwt
import pytest
from datetime import datetime, timedelta, timezone
import json

class TestJWTFunctions:
    """PyJWTのテストクラス"""
    
    def setup_method(self):
        """テスト前の設定"""
        self.secret_key = "test-secret-key"
        self.algorithm = "HS256"
        self.test_payload = {
            "sub": "test_user",
            "username": "testuser",
            "exp": datetime.now(timezone.utc) + timedelta(minutes=30)
        }
    
    def test_token_creation_and_validation(self):
        """トークンの生成と検証テスト"""
        # トークン生成
        token = jwt.encode(self.test_payload, self.secret_key, algorithm=self.algorithm)
        assert isinstance(token, str)
        
        # トークン検証
        decoded = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
        assert decoded["sub"] == "test_user"
        assert decoded["username"] == "testuser"
    
    def test_expired_token(self):
        """期限切れトークンのテスト"""
        expired_payload = {
            "sub": "test_user",
            "exp": datetime.now(timezone.utc) - timedelta(minutes=1)
        }
        token = jwt.encode(expired_payload, self.secret_key, algorithm=self.algorithm)
        
        with pytest.raises(jwt.ExpiredSignatureError):
            jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
    
    def test_invalid_signature(self):
        """無効な署名のテスト"""
        token = jwt.encode(self.test_payload, self.secret_key, algorithm=self.algorithm)
        wrong_key = "wrong-secret-key"
        
        with pytest.raises(jwt.InvalidSignatureError):
            jwt.decode(token, wrong_key, algorithms=[self.algorithm])
    
    def test_algorithm_validation(self):
        """アルゴリズム検証のテスト"""
        token = jwt.encode(self.test_payload, self.secret_key, algorithm="HS256")
        
        with pytest.raises(jwt.InvalidAlgorithmError):
            jwt.decode(token, self.secret_key, algorithms=["HS512"])

def debug_jwt_token(token: str):
    """JWTトークンのデバッグ用関数"""
    try:
        # ヘッダーを検証なしで取得
        header = jwt.get_unverified_header(token)
        print("=== JWT ヘッダー ===")
        print(json.dumps(header, indent=2, ensure_ascii=False))
        
        # ペイロードを検証なしで取得(本番環境では非推奨)
        payload = jwt.decode(token, options={"verify_signature": False})
        print("\n=== JWT ペイロード ===")
        print(json.dumps(payload, indent=2, ensure_ascii=False, default=str))
        
        # 有効期限チェック
        if 'exp' in payload:
            exp_time = datetime.fromtimestamp(payload['exp'], tz=timezone.utc)
            now = datetime.now(timezone.utc)
            if exp_time > now:
                remaining = exp_time - now
                print(f"\n✅ トークンは有効(残り時間: {remaining})")
            else:
                print(f"\n❌ トークンは期限切れ({now - exp_time}前に期限切れ)")
        
        # トークンサイズ情報
        print(f"\n=== トークン情報 ===")
        print(f"トークン長: {len(token)} 文字")
        parts = token.split('.')
        print(f"ヘッダー長: {len(parts[0])} 文字")
        print(f"ペイロード長: {len(parts[1])} 文字")
        print(f"署名長: {len(parts[2])} 文字")
        
    except Exception as e:
        print(f"❌ トークンのデバッグに失敗: {e}")

# JWT検証ヘルパー関数
def validate_jwt_with_details(token: str, secret: str, algorithms: list = None):
    """詳細な検証情報を含むJWT検証"""
    if algorithms is None:
        algorithms = ["HS256"]
    
    result = {
        "is_valid": False,
        "payload": None,
        "errors": [],
        "warnings": []
    }
    
    try:
        payload = jwt.decode(token, secret, algorithms=algorithms)
        result["is_valid"] = True
        result["payload"] = payload
        
        # 追加チェック
        now = datetime.now(timezone.utc)
        
        if 'exp' in payload:
            exp_time = datetime.fromtimestamp(payload['exp'], tz=timezone.utc)
            if (exp_time - now).total_seconds() < 300:  # 5分未満
                result["warnings"].append("トークンの有効期限が近づいています")
        
        if 'iat' in payload:
            iat_time = datetime.fromtimestamp(payload['iat'], tz=timezone.utc)
            if (now - iat_time).total_seconds() > 86400:  # 24時間以上
                result["warnings"].append("トークンが発行されてから時間が経っています")
        
    except jwt.ExpiredSignatureError:
        result["errors"].append("トークンの有効期限が切れています")
    except jwt.InvalidSignatureError:
        result["errors"].append("トークンの署名が無効です")
    except jwt.InvalidAlgorithmError:
        result["errors"].append("サポートされていないアルゴリズムです")
    except jwt.InvalidTokenError as e:
        result["errors"].append(f"無効なトークンです: {str(e)}")
    
    return result

# 使用例
if __name__ == "__main__":
    # テスト用トークンの生成
    test_token = jwt.encode(
        {"sub": "test_user", "exp": datetime.now(timezone.utc) + timedelta(minutes=10)},
        "test-secret",
        algorithm="HS256"
    )
    
    # デバッグ情報の表示
    debug_jwt_token(test_token)
    
    # 詳細検証の実行
    validation_result = validate_jwt_with_details(test_token, "test-secret")
    print(f"\n=== 検証結果 ===")
    print(json.dumps(validation_result, indent=2, ensure_ascii=False, default=str))