PyJWT
Authentication Library
PyJWT
Overview
PyJWT is a Python library for encoding and decoding JSON Web Tokens (JWT) according to RFC 7519. It provides a simple and secure way to handle JWT tokens in Python applications, supporting various algorithms including HMAC, RSA, and ECDSA. PyJWT is widely used for authentication and authorization in web applications, APIs, and microservices.
Details
PyJWT implements the JWT standard completely, offering robust token handling capabilities for Python developers. The library supports all standard JWT algorithms and provides comprehensive validation features including signature verification, expiration checking, and audience validation. It's designed with security in mind, requiring explicit algorithm specification to prevent algorithm confusion attacks.
The library is lightweight and has minimal dependencies, making it suitable for both small applications and large-scale systems. PyJWT integrates seamlessly with popular Python web frameworks like Flask, Django, and FastAPI, providing flexible authentication solutions for modern web applications.
Advantages and Disadvantages
Advantages
- Standard Compliance: Full RFC 7519 compliance ensures interoperability
- Security First: Built-in protection against common JWT vulnerabilities
- Algorithm Support: Comprehensive support for HMAC, RSA, and ECDSA algorithms
- Framework Integration: Easy integration with popular Python web frameworks
- Lightweight: Minimal dependencies and small footprint
- Validation Features: Comprehensive token validation including expiration and audience checks
- Active Development: Regular updates and security patches
Disadvantages
- Token Size: JWT tokens can be larger than traditional session IDs
- Stateless Nature: Difficult to revoke tokens before expiration
- Secret Management: Requires careful handling of signing keys
- Learning Curve: JWT concepts may be complex for beginners
- Clock Synchronization: Time-based claims require synchronized clocks across systems
Reference Pages
- Official PyJWT Documentation
- PyJWT GitHub Repository
- JWT.io - JWT Debugger
- RFC 7519 - JSON Web Token
- PyPI Package
Code Examples
Installation and Basic Configuration
# Install PyJWT
# pip install PyJWT
import jwt
import datetime
from jwt.exceptions import InvalidTokenError, ExpiredSignatureError
# Configuration
SECRET_KEY = "your-secret-key-here" # Use a strong, random key in production
ALGORITHM = "HS256"
# Alternative: Use RSA keys for production
# with open('private_key.pem', 'r') as f:
# PRIVATE_KEY = f.read()
# with open('public_key.pem', 'r') as f:
# PUBLIC_KEY = f.read()
# ALGORITHM = "RS256"
print("PyJWT setup complete")
JWT Token Generation and Verification
import jwt
import datetime
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
def generate_jwt_token(user_id, email, role="user"):
"""Generate a JWT token with user information"""
payload = {
'user_id': user_id,
'email': email,
'role': role,
'iat': datetime.datetime.utcnow(), # Issued at
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=24), # Expires in 24 hours
'iss': 'your-app-name', # Issuer
'aud': 'your-app-users' # Audience
}
token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
return token
def verify_jwt_token(token):
"""Verify and decode a JWT token"""
try:
payload = jwt.decode(
token,
SECRET_KEY,
algorithms=[ALGORITHM],
audience='your-app-users',
issuer='your-app-name'
)
return payload
except ExpiredSignatureError:
print("Token has expired")
return None
except InvalidTokenError as e:
print(f"Invalid token: {e}")
return None
# Example usage
user_token = generate_jwt_token(123, "[email protected]", "admin")
print(f"Generated token: {user_token}")
decoded_payload = verify_jwt_token(user_token)
if decoded_payload:
print(f"User ID: {decoded_payload['user_id']}")
print(f"Email: {decoded_payload['email']}")
print(f"Role: {decoded_payload['role']}")
Payload Management and Claims
import jwt
import datetime
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
def create_access_token(user_data, expires_delta=None):
"""Create an access token with custom claims"""
if expires_delta is None:
expires_delta = datetime.timedelta(minutes=15)
payload = {
# Standard claims
'sub': str(user_data['user_id']), # Subject
'iat': datetime.datetime.utcnow(), # Issued at
'exp': datetime.datetime.utcnow() + expires_delta, # Expiration
'iss': 'your-app', # Issuer
'aud': 'your-app-users', # Audience
# Custom claims
'email': user_data['email'],
'role': user_data.get('role', 'user'),
'permissions': user_data.get('permissions', []),
'department': user_data.get('department'),
'last_login': user_data.get('last_login')
}
token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
return token
def validate_token_claims(token, required_role=None, required_permissions=None):
"""Validate token and check specific claims"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# Check role requirement
if required_role and payload.get('role') != required_role:
return {'valid': False, 'error': 'Insufficient role'}
# Check permissions
if required_permissions:
user_permissions = payload.get('permissions', [])
if not all(perm in user_permissions for perm in required_permissions):
return {'valid': False, 'error': 'Insufficient permissions'}
return {'valid': True, 'payload': payload}
except ExpiredSignatureError:
return {'valid': False, 'error': 'Token expired'}
except InvalidTokenError as e:
return {'valid': False, 'error': f'Invalid token: {str(e)}'}
# Example usage
user_data = {
'user_id': 456,
'email': '[email protected]',
'role': 'admin',
'permissions': ['read', 'write', 'delete'],
'department': 'IT',
'last_login': datetime.datetime.utcnow().isoformat()
}
token = create_access_token(user_data, datetime.timedelta(hours=1))
print(f"Access token: {token}")
# Validate with role requirement
result = validate_token_claims(token, required_role='admin')
print(f"Validation result: {result}")
# Validate with permission requirement
result = validate_token_claims(token, required_permissions=['read', 'write'])
print(f"Permission check: {result}")
Advanced JWT Features (Refresh Tokens)
import jwt
import datetime
import secrets
SECRET_KEY = "your-secret-key"
REFRESH_SECRET_KEY = "your-refresh-secret-key"
ALGORITHM = "HS256"
class JWTManager:
def __init__(self, secret_key, refresh_secret_key, algorithm="HS256"):
self.secret_key = secret_key
self.refresh_secret_key = refresh_secret_key
self.algorithm = algorithm
self.refresh_tokens = set() # In production, use a database
def create_token_pair(self, user_data):
"""Create both access and refresh tokens"""
now = datetime.datetime.utcnow()
# Access token (short-lived)
access_payload = {
'sub': str(user_data['user_id']),
'email': user_data['email'],
'role': user_data.get('role', 'user'),
'iat': now,
'exp': now + datetime.timedelta(minutes=15),
'type': 'access'
}
# Refresh token (long-lived)
refresh_jti = secrets.token_urlsafe(32) # Unique identifier
refresh_payload = {
'sub': str(user_data['user_id']),
'iat': now,
'exp': now + datetime.timedelta(days=7),
'type': 'refresh',
'jti': refresh_jti
}
access_token = jwt.encode(access_payload, self.secret_key, algorithm=self.algorithm)
refresh_token = jwt.encode(refresh_payload, self.refresh_secret_key, algorithm=self.algorithm)
# Store refresh token identifier
self.refresh_tokens.add(refresh_jti)
return {
'access_token': access_token,
'refresh_token': refresh_token,
'token_type': 'bearer',
'expires_in': 900 # 15 minutes in seconds
}
def refresh_access_token(self, refresh_token):
"""Generate new access token using refresh token"""
try:
payload = jwt.decode(
refresh_token,
self.refresh_secret_key,
algorithms=[self.algorithm]
)
# Verify token type and JTI
if payload.get('type') != 'refresh':
raise InvalidTokenError("Not a refresh token")
jti = payload.get('jti')
if jti not in self.refresh_tokens:
raise InvalidTokenError("Refresh token revoked")
# Create new access token
now = datetime.datetime.utcnow()
access_payload = {
'sub': payload['sub'],
'iat': now,
'exp': now + datetime.timedelta(minutes=15),
'type': 'access'
}
access_token = jwt.encode(access_payload, self.secret_key, algorithm=self.algorithm)
return {
'access_token': access_token,
'token_type': 'bearer',
'expires_in': 900
}
except (ExpiredSignatureError, InvalidTokenError) as e:
return {'error': str(e)}
def revoke_refresh_token(self, refresh_token):
"""Revoke a refresh token"""
try:
payload = jwt.decode(
refresh_token,
self.refresh_secret_key,
algorithms=[self.algorithm]
)
jti = payload.get('jti')
if jti in self.refresh_tokens:
self.refresh_tokens.remove(jti)
return True
except:
pass
return False
# Example usage
jwt_manager = JWTManager(SECRET_KEY, REFRESH_SECRET_KEY)
user_data = {'user_id': 789, 'email': '[email protected]', 'role': 'user'}
tokens = jwt_manager.create_token_pair(user_data)
print("Token pair created:")
print(f"Access token: {tokens['access_token']}")
print(f"Refresh token: {tokens['refresh_token']}")
# Refresh access token
new_tokens = jwt_manager.refresh_access_token(tokens['refresh_token'])
if 'access_token' in new_tokens:
print(f"New access token: {new_tokens['access_token']}")
Flask/Django Integration
# Flask Integration Example
from flask import Flask, request, jsonify
from functools import wraps
import jwt
app = Flask(__name__)
SECRET_KEY = "your-flask-secret-key"
ALGORITHM = "HS256"
def jwt_required(f):
"""Decorator to require JWT authentication"""
@wraps(f)
def decorated_function(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'error': 'Token missing'}), 401
try:
# Remove 'Bearer ' prefix if present
if token.startswith('Bearer '):
token = token[7:]
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
request.current_user = payload
except jwt.ExpiredSignatureError:
return jsonify({'error': 'Token has expired'}), 401
except jwt.InvalidTokenError:
return jsonify({'error': 'Invalid token'}), 401
return f(*args, **kwargs)
return decorated_function
def admin_required(f):
"""Decorator to require admin role"""
@wraps(f)
@jwt_required
def decorated_function(*args, **kwargs):
if request.current_user.get('role') != 'admin':
return jsonify({'error': 'Admin access required'}), 403
return f(*args, **kwargs)
return decorated_function
@app.route('/login', methods=['POST'])
def login():
"""Login endpoint"""
data = request.get_json()
email = data.get('email')
password = data.get('password')
# Validate credentials (implement your authentication logic)
if email == '[email protected]' and password == 'password':
payload = {
'user_id': 1,
'email': email,
'role': 'admin',
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=24)
}
token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
return jsonify({'token': token})
return jsonify({'error': 'Invalid credentials'}), 401
@app.route('/protected')
@jwt_required
def protected():
"""Protected endpoint requiring authentication"""
return jsonify({
'message': 'Access granted',
'user': request.current_user
})
@app.route('/admin')
@admin_required
def admin_only():
"""Admin-only endpoint"""
return jsonify({'message': 'Admin access granted'})
# Django Integration Example (middleware)
from django.http import JsonResponse
from django.utils.deprecation import MiddlewareMixin
import jwt
class JWTAuthenticationMiddleware(MiddlewareMixin):
"""Django middleware for JWT authentication"""
def process_request(self, request):
# Skip authentication for certain paths
exempt_paths = ['/login/', '/register/', '/']
if request.path in exempt_paths:
return None
# Get token from header
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if not auth_header.startswith('Bearer '):
return JsonResponse({'error': 'Authentication required'}, status=401)
token = auth_header[7:] # Remove 'Bearer ' prefix
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
request.user_payload = payload
except jwt.ExpiredSignatureError:
return JsonResponse({'error': 'Token expired'}, status=401)
except jwt.InvalidTokenError:
return JsonResponse({'error': 'Invalid token'}, status=401)
return None
# Django view example
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
import json
@csrf_exempt
@require_http_methods(["GET"])
def protected_view(request):
"""Protected Django view"""
user_payload = getattr(request, 'user_payload', None)
if not user_payload:
return JsonResponse({'error': 'Authentication required'}, status=401)
return JsonResponse({
'message': 'Access granted',
'user': user_payload
})
Testing and Debugging
import jwt
import datetime
import unittest
from unittest.mock import patch
import json
class JWTTestCase(unittest.TestCase):
def setUp(self):
self.secret_key = "test-secret-key"
self.algorithm = "HS256"
self.test_payload = {
'user_id': 123,
'email': '[email protected]',
'role': 'user',
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}
def test_token_generation(self):
"""Test JWT token generation"""
token = jwt.encode(self.test_payload, self.secret_key, algorithm=self.algorithm)
self.assertIsInstance(token, str)
self.assertTrue(len(token) > 0)
def test_token_verification(self):
"""Test JWT token verification"""
token = jwt.encode(self.test_payload, self.secret_key, algorithm=self.algorithm)
decoded = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
self.assertEqual(decoded['user_id'], self.test_payload['user_id'])
self.assertEqual(decoded['email'], self.test_payload['email'])
self.assertEqual(decoded['role'], self.test_payload['role'])
def test_expired_token(self):
"""Test handling of expired tokens"""
expired_payload = self.test_payload.copy()
expired_payload['exp'] = datetime.datetime.utcnow() - datetime.timedelta(hours=1)
token = jwt.encode(expired_payload, self.secret_key, algorithm=self.algorithm)
with self.assertRaises(jwt.ExpiredSignatureError):
jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
def test_invalid_signature(self):
"""Test handling of invalid signatures"""
token = jwt.encode(self.test_payload, self.secret_key, algorithm=self.algorithm)
with self.assertRaises(jwt.InvalidSignatureError):
jwt.decode(token, "wrong-secret-key", algorithms=[self.algorithm])
def test_algorithm_confusion(self):
"""Test protection against algorithm confusion attacks"""
token = jwt.encode(self.test_payload, self.secret_key, algorithm=self.algorithm)
with self.assertRaises(jwt.InvalidAlgorithmError):
jwt.decode(token, self.secret_key, algorithms=["none"])
# JWT Debugging Utilities
class JWTDebugger:
"""Utility class for debugging JWT tokens"""
@staticmethod
def decode_without_verification(token):
"""Decode token without signature verification (for debugging)"""
try:
# Decode header
header = jwt.get_unverified_header(token)
# Decode payload without verification
payload = jwt.decode(token, options={"verify_signature": False})
return {
'header': header,
'payload': payload,
'is_expired': JWTDebugger.is_token_expired(payload),
'time_to_expiry': JWTDebugger.time_to_expiry(payload)
}
except Exception as e:
return {'error': str(e)}
@staticmethod
def is_token_expired(payload):
"""Check if token is expired"""
exp = payload.get('exp')
if not exp:
return False
exp_datetime = datetime.datetime.fromtimestamp(exp)
return datetime.datetime.utcnow() > exp_datetime
@staticmethod
def time_to_expiry(payload):
"""Calculate time until token expiry"""
exp = payload.get('exp')
if not exp:
return None
exp_datetime = datetime.datetime.fromtimestamp(exp)
now = datetime.datetime.utcnow()
if now > exp_datetime:
return f"Expired {now - exp_datetime} ago"
else:
return f"Expires in {exp_datetime - now}"
@staticmethod
def validate_token_structure(token):
"""Validate JWT token structure"""
parts = token.split('.')
if len(parts) != 3:
return {'valid': False, 'error': 'Invalid JWT structure'}
try:
# Validate each part is properly base64 encoded
for i, part in enumerate(['header', 'payload', 'signature']):
jwt.utils.base64url_decode(parts[i])
return {'valid': True, 'parts': len(parts)}
except Exception as e:
return {'valid': False, 'error': f'Invalid encoding: {str(e)}'}
# Example usage and testing
if __name__ == "__main__":
# Run tests
unittest.main(argv=[''], exit=False, verbosity=2)
# Debug example
print("\n=== JWT Debugging Example ===")
# Create a test token
test_payload = {
'user_id': 999,
'email': '[email protected]',
'role': 'tester',
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=30)
}
test_token = jwt.encode(test_payload, "debug-key", algorithm="HS256")
print(f"Test token: {test_token}")
# Debug the token
debug_info = JWTDebugger.decode_without_verification(test_token)
print(f"Debug info: {json.dumps(debug_info, indent=2, default=str)}")
# Validate structure
structure_info = JWTDebugger.validate_token_structure(test_token)
print(f"Structure validation: {structure_info}")
This comprehensive guide covers PyJWT's essential features including basic token operations, advanced features like refresh tokens, framework integration, and testing strategies. The code examples demonstrate real-world usage patterns and security best practices for JWT implementation in Python applications.