Authlib
Authentication Library
Authlib
Overview
Authlib is a comprehensive OAuth and OpenID Connect client and server construction library for Python. As of 2025, at version 1.6.0, supporting Python3.9+, it has established itself as a unified library providing various RFC implementations including OAuth 1.0, OAuth 2.0, JWT, and more. It includes complete implementations of JWS, JWE, JWK, JWA, and JWT, supporting integration with major frameworks such as Flask, Django, Requests, HTTPX, Starlette, and FastAPI. Through its monolithic design, it provides a consistent development experience with everything synchronized from low-level specification implementations to high-level framework integrations. Developed and maintained by Lepture (lepture), it is widely adopted as a reliable choice in the Python authentication ecosystem.
Details
Authlib 1.6 series supports enterprise-level authentication and authorization system construction through industry-standard RFC-compliant implementations. It comprehensively supports all OAuth 2.0 grants (authorization code, client credentials, password, implicit, refresh token) and all OpenID Connect flows (Code, Implicit, Hybrid), as well as extension specifications such as JWT Bearer Token Grant (RFC7523), Client Credentials (RFC6749), and PKCE (RFC7636). As a JOSE library, it provides complete implementations of JWS, JWE, JWK, JWA, and JWT, supporting various cryptographic algorithms including RSA, ECDSA, and EdDSA. For framework integration, dedicated integration packages such as Flask OAuth, Django OAuth, Requests client, and HTTPX client enable implementations that leverage the characteristics of each framework.
Key Features
- Comprehensive RFC Support: Complete implementation of OAuth 1.0/2.0, OpenID Connect, JWT, and JOSE
- Monolithic Design: Consistent design from specification implementation to framework integration
- Multi-Framework: Integration with Flask, Django, FastAPI, Starlette, and more
- Complete JOSE Support: Industry-standard implementation of JWS, JWE, JWK, JWA, and JWT
- Client & Server Support: Support for both authentication clients and authentication servers
- Extensibility: Flexible implementation of custom grants and custom authentication methods
Advantages and Disadvantages
Advantages
- Most comprehensive and reliable library in the Python authentication ecosystem
- Ensures compatibility and security through industry-standard RFC compliance
- Reduces learning costs and provides consistent development experience through monolithic design
- Efficient development through deep integration with major Python frameworks
- Can be used independently as a JOSE library
- Active development community and rich documentation
Disadvantages
- Requires deep understanding of OAuth 2.0/OpenID Connect, leading to high learning costs
- Large size even for minimal feature usage due to monolithic design
- Complex authentication flow implementation requires detailed configuration and customization
- Rich enterprise features may be excessive for small-scale projects
- Framework-specific integration implementations may have portability constraints
- Advanced cryptographic features require knowledge of cryptography
Reference Pages
Usage Examples
Basic Installation and OAuth 2.0 Client Setup
# Install Authlib
pip install authlib
# Additional HTTP clients (if needed)
pip install httpx # For HTTPX client usage
pip install requests # For Requests client usage
# Web framework integration (if needed)
pip install flask # For Flask integration
pip install django # For Django integration
pip install fastapi uvicorn # For FastAPI integration
# oauth_client.py - Basic OAuth 2.0 client setup
from authlib.integrations.requests_client import OAuth2Session
from authlib.integrations.httpx_client import AsyncOAuth2Client
from authlib.oauth2.rfc6749.util import generate_token
import os
class OAuthClient:
def __init__(self, client_id, client_secret, authorization_url, token_url):
self.client_id = client_id
self.client_secret = client_secret
self.authorization_url = authorization_url
self.token_url = token_url
self.redirect_uri = 'http://localhost:8080/callback'
def create_session(self):
"""Create OAuth2Session"""
return OAuth2Session(
self.client_id,
self.client_secret,
redirect_uri=self.redirect_uri,
scope='read write'
)
def get_authorization_url(self):
"""Generate authorization URL"""
session = self.create_session()
authorization_url, state = session.create_authorization_url(
self.authorization_url
)
return authorization_url, state
def fetch_token(self, authorization_response_url, state):
"""Fetch access token"""
session = self.create_session()
token = session.fetch_token(
self.token_url,
authorization_response=authorization_response_url,
state=state
)
return token
# Usage example
oauth_client = OAuthClient(
client_id='your-client-id',
client_secret='your-client-secret',
authorization_url='https://example.com/oauth/authorize',
token_url='https://example.com/oauth/token'
)
# Generate authorization URL
auth_url, state = oauth_client.get_authorization_url()
print(f"Authorization URL: {auth_url}")
Flask Integration and OpenID Connect Implementation
# flask_oidc_app.py - Flask OpenID Connect application
from flask import Flask, redirect, url_for, session, request, jsonify
from authlib.integrations.flask_client import OAuth
from authlib.oidc.core import UserInfo
app = Flask(__name__)
app.secret_key = 'your-secret-key'
# OAuth configuration
oauth = OAuth(app)
# Google OpenID Connect client registration
google = oauth.register(
'google',
client_id='your-google-client-id',
client_secret='your-google-client-secret',
server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
client_kwargs={
'scope': 'openid profile email'
}
)
# GitHub OAuth client registration
github = oauth.register(
'github',
client_id='your-github-client-id',
client_secret='your-github-client-secret',
access_token_url='https://github.com/login/oauth/access_token',
authorize_url='https://github.com/login/oauth/authorize',
api_base_url='https://api.github.com/',
client_kwargs={'scope': 'user:email'},
)
@app.route('/')
def index():
user = session.get('user')
if user:
return f'<h1>Hello, {user["name"]}!</h1><a href="/logout">Logout</a>'
return '''
<h1>OAuth Login</h1>
<a href="/login/google">Login with Google</a><br>
<a href="/login/github">Login with GitHub</a>
'''
@app.route('/login/<provider>')
def login(provider):
client = oauth.create_client(provider)
redirect_uri = url_for('authorize', provider=provider, _external=True)
return client.authorize_redirect(redirect_uri)
@app.route('/authorize/<provider>')
def authorize(provider):
client = oauth.create_client(provider)
token = client.authorize_access_token()
if provider == 'google':
# OpenID Connect - userinfo is automatically retrieved from id_token
userinfo = token.get('userinfo')
if userinfo:
session['user'] = {
'name': userinfo['name'],
'email': userinfo['email'],
'provider': provider
}
elif provider == 'github':
# Use GitHub API to retrieve user information
resp = client.get('user', token=token)
userinfo = resp.json()
session['user'] = {
'name': userinfo['name'] or userinfo['login'],
'email': userinfo.get('email'),
'provider': provider
}
return redirect(url_for('index'))
@app.route('/logout')
def logout():
session.pop('user', None)
return redirect(url_for('index'))
@app.route('/api/profile')
def profile():
user = session.get('user')
if not user:
return jsonify({'error': 'Not authenticated'}), 401
return jsonify(user)
if __name__ == '__main__':
app.run(debug=True, port=8080)
Django Integration and Custom Authorization Server Implementation
# django_auth_server.py - Django OAuth 2.0 authorization server
from django.contrib.auth.models import User
from django.db import models
from authlib.integrations.django_oauth2 import (
AuthorizationServer,
ResourceProtector
)
from authlib.oauth2.rfc6749 import grants
from authlib.oidc.core import grants as oidc_grants, UserInfo
from authlib.common.security import generate_token
# Client model
class OAuth2Client(models.Model):
client_id = models.CharField(max_length=48, unique=True, db_index=True)
client_secret = models.CharField(max_length=120)
client_name = models.CharField(max_length=120)
redirect_uris = models.TextField(default='')
default_scopes = models.TextField(default='')
grant_types = models.TextField(default='')
response_types = models.TextField(default='')
def get_client_id(self):
return self.client_id
def get_default_redirect_uri(self):
return self.redirect_uris.split()[0] if self.redirect_uris else ''
def get_allowed_scope(self, scope):
if not scope:
return ''
allowed = set(self.default_scopes.split())
scopes = set(scope.split())
return ' '.join([s for s in scopes if s in allowed])
def check_redirect_uri(self, redirect_uri):
return redirect_uri in self.redirect_uris.split()
def has_client_secret(self):
return bool(self.client_secret)
def check_client_secret(self, client_secret):
return self.client_secret == client_secret
def check_token_endpoint_auth_method(self, method):
return method in ['client_secret_basic', 'client_secret_post']
def check_response_type(self, response_type):
return response_type in self.response_types.split()
def check_grant_type(self, grant_type):
return grant_type in self.grant_types.split()
# Authorization code model
class OAuth2AuthorizationCode(models.Model):
code = models.CharField(max_length=120, unique=True)
client_id = models.CharField(max_length=48, db_index=True)
redirect_uri = models.TextField(default='')
response_type = models.TextField(default='')
scope = models.TextField(default='')
user = models.ForeignKey(User, on_delete=models.CASCADE)
auth_time = models.IntegerField()
nonce = models.CharField(max_length=120, default='') # For OpenID Connect
def is_expired(self):
# Expires in 10 minutes
import time
return time.time() - self.auth_time > 600
def get_redirect_uri(self):
return self.redirect_uri
def get_scope(self):
return self.scope
# Token model
class OAuth2Token(models.Model):
client_id = models.CharField(max_length=48, db_index=True)
token_type = models.CharField(max_length=40, default='Bearer')
access_token = models.CharField(max_length=255, unique=True, null=False)
refresh_token = models.CharField(max_length=255, db_index=True)
scope = models.TextField(default='')
revoked = models.BooleanField(default=False)
issued_at = models.IntegerField()
expires_in = models.IntegerField()
user = models.ForeignKey(User, on_delete=models.CASCADE)
def get_scope(self):
return self.scope
def get_expires_in(self):
return self.expires_in
def is_expired(self):
import time
return time.time() > self.issued_at + self.expires_in
# Custom authorization code grant
class AuthorizationCodeGrant(grants.AuthorizationCodeGrant):
TOKEN_ENDPOINT_AUTH_METHODS = [
'client_secret_basic',
'client_secret_post',
'none'
]
def save_authorization_code(self, code, request):
import time
nonce = request.data.get('nonce')
auth_code = OAuth2AuthorizationCode(
code=code,
client_id=request.client.client_id,
redirect_uri=request.redirect_uri,
scope=request.scope,
user=request.user,
auth_time=int(time.time()),
nonce=nonce or ''
)
auth_code.save()
return auth_code
def query_authorization_code(self, code, client):
try:
auth_code = OAuth2AuthorizationCode.objects.get(
code=code, client_id=client.client_id
)
if auth_code.is_expired():
return None
return auth_code
except OAuth2AuthorizationCode.DoesNotExist:
return None
def delete_authorization_code(self, authorization_code):
authorization_code.delete()
def authenticate_user(self, authorization_code):
return authorization_code.user
# OpenID Connect Code Extension
class OpenIDCode(oidc_grants.OpenIDCode):
def exists_nonce(self, nonce, request):
try:
OAuth2AuthorizationCode.objects.get(
client_id=request.client_id, nonce=nonce
)
return True
except OAuth2AuthorizationCode.DoesNotExist:
return False
def get_jwt_config(self, grant):
# JWT configuration (use appropriate private key in production)
return {
'key': '''-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEA...
-----END RSA PRIVATE KEY-----''',
'alg': 'RS256',
'iss': 'https://your-auth-server.com',
'exp': 3600
}
def generate_user_info(self, user, scope):
user_info = UserInfo(sub=str(user.pk), name=user.get_full_name())
if 'email' in scope:
user_info['email'] = user.email
if 'profile' in scope:
user_info.update({
'given_name': user.first_name,
'family_name': user.last_name,
'preferred_username': user.username
})
return user_info
# Authorization server setup
authorization_server = AuthorizationServer()
# Grant registration
authorization_server.register_grant(
AuthorizationCodeGrant,
[OpenIDCode(require_nonce=True)]
)
# Protector setup
require_oauth = ResourceProtector()
# views.py usage example
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
@csrf_exempt
def authorize(request):
# Authorization endpoint
try:
grant = authorization_server.get_consent_grant(request)
except OAuth2Error as error:
return JsonResponse(error.get_body(), status=error.status_code)
# User authentication and consent processing
if request.method == 'GET':
# Display authorization form
try:
grant = authorization_server.validate_consent_request(request)
context = {
'grant': grant,
'user': request.user
}
return render(request, 'oauth2/authorize.html', context)
except OAuth2Error as error:
return JsonResponse(error.get_body(), status=error.status_code)
# Consent processing
if request.POST.get('confirm'):
return authorization_server.create_authorization_response(request)
else:
return authorization_server.create_authorization_response(request, grant_user=None)
@csrf_exempt
def issue_token(request):
# Token endpoint
return authorization_server.create_token_response(request)
@require_oauth('profile')
def api_profile(request):
# Protected resource
user = request.oauth_token.user
return JsonResponse({
'sub': str(user.pk),
'name': user.get_full_name(),
'email': user.email
})
HTTPX Client and JWT Operations
# jwt_operations.py - JWT operations and HTTPX client
import asyncio
from authlib.integrations.httpx_client import AsyncOAuth2Client
from authlib.jose import JsonWebSignature, JsonWebKey, jwt
from authlib.common.security import generate_token
import httpx
class JWTManager:
def __init__(self, private_key_pem, algorithm='RS256'):
self.private_key = private_key_pem
self.algorithm = algorithm
self.jws = JsonWebSignature()
def create_jwt_token(self, payload, headers=None):
"""Generate JWT token"""
if headers is None:
headers = {'alg': self.algorithm}
return jwt.encode(headers, payload, self.private_key)
def decode_jwt_token(self, token, public_key):
"""Decode JWT token"""
try:
return jwt.decode(token, public_key)
except Exception as e:
raise ValueError(f"JWT decode error: {e}")
def create_client_assertion(self, client_id, token_endpoint):
"""Generate JWT assertion for client authentication"""
import time
payload = {
'iss': client_id,
'sub': client_id,
'aud': token_endpoint,
'iat': int(time.time()),
'exp': int(time.time()) + 300, # Expires in 5 minutes
'jti': generate_token()
}
return self.create_jwt_token(payload)
class AsyncOAuthClient:
def __init__(self, client_id, private_key, token_endpoint):
self.client_id = client_id
self.private_key = private_key
self.token_endpoint = token_endpoint
self.jwt_manager = JWTManager(private_key)
async def get_client_credentials_token(self, scope=None):
"""Get token with client credentials grant"""
async with httpx.AsyncClient() as client:
# Create client assertion
client_assertion = self.jwt_manager.create_client_assertion(
self.client_id,
self.token_endpoint
)
data = {
'grant_type': 'client_credentials',
'client_assertion_type': 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
'client_assertion': client_assertion
}
if scope:
data['scope'] = scope
response = await client.post(
self.token_endpoint,
data=data,
headers={'Content-Type': 'application/x-www-form-urlencoded'}
)
if response.status_code == 200:
return response.json()
else:
response.raise_for_status()
async def refresh_token(self, refresh_token):
"""Refresh access token with refresh token"""
async with httpx.AsyncClient() as client:
client_assertion = self.jwt_manager.create_client_assertion(
self.client_id,
self.token_endpoint
)
data = {
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
'client_assertion_type': 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
'client_assertion': client_assertion
}
response = await client.post(
self.token_endpoint,
data=data,
headers={'Content-Type': 'application/x-www-form-urlencoded'}
)
return response.json()
# Usage example
async def main():
# Private key (properly managed in production)
private_key = '''-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEA...
-----END RSA PRIVATE KEY-----'''
oauth_client = AsyncOAuthClient(
client_id='your-client-id',
private_key=private_key,
token_endpoint='https://auth-server.com/oauth/token'
)
try:
# Get token with client credentials grant
token_response = await oauth_client.get_client_credentials_token(
scope='read write'
)
print("Access Token:", token_response['access_token'])
print("Token Type:", token_response['token_type'])
print("Expires In:", token_response['expires_in'])
# API call example
access_token = token_response['access_token']
async with httpx.AsyncClient() as client:
headers = {'Authorization': f'Bearer {access_token}'}
api_response = await client.get(
'https://api.example.com/data',
headers=headers
)
print("API Response:", api_response.json())
except Exception as e:
print(f"Error: {e}")
# Execute
if __name__ == '__main__':
asyncio.run(main())
FastAPI Integration and Resource Protection
# fastapi_oauth.py - FastAPI OAuth 2.0 resource server
from fastapi import FastAPI, Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from authlib.jose import jwt, JsonWebKey
from authlib.jose.errors import InvalidClaimsError, ExpiredTokenError
import httpx
import time
from typing import Optional
app = FastAPI(title="OAuth 2.0 Protected API")
# Security scheme
security = HTTPBearer()
class JWTBearer:
def __init__(self, jwks_uri: str, algorithms: list = None):
self.jwks_uri = jwks_uri
self.algorithms = algorithms or ['RS256']
self.jwks_cache = {}
self.cache_expiry = 0
async def get_jwks(self):
"""Get JWKS (with caching)"""
current_time = time.time()
if current_time > self.cache_expiry:
async with httpx.AsyncClient() as client:
response = await client.get(self.jwks_uri)
response.raise_for_status()
self.jwks_cache = response.json()
self.cache_expiry = current_time + 3600 # Cache for 1 hour
return self.jwks_cache
async def verify_token(self, token: str):
"""Verify JWT token"""
try:
# Get JWKS
jwks_data = await self.get_jwks()
jwks = JsonWebKey.import_key_set(jwks_data)
# JWT decode and verification
claims = jwt.decode(token, jwks)
claims.validate()
return claims
except ExpiredTokenError:
raise HTTPException(status_code=401, detail="Token has expired")
except InvalidClaimsError as e:
raise HTTPException(status_code=401, detail=f"Invalid token claims: {e}")
except Exception as e:
raise HTTPException(status_code=401, detail=f"Token verification failed: {e}")
# JWT Bearer instance
jwt_bearer = JWTBearer(jwks_uri="https://auth-server.com/.well-known/jwks.json")
async def get_current_user(credentials: HTTPAuthorizationCredentials = Security(security)):
"""Get current user"""
token = credentials.credentials
claims = await jwt_bearer.verify_token(token)
return claims
def require_scope(required_scope: str):
"""Dependency that requires specific scope"""
async def scope_dependency(claims: dict = Depends(get_current_user)):
token_scopes = claims.get('scope', '').split()
if required_scope not in token_scopes:
raise HTTPException(
status_code=403,
detail=f"Insufficient scope. Required: {required_scope}"
)
return claims
return scope_dependency
@app.get("/")
async def read_root():
return {"message": "OAuth 2.0 Protected API", "version": "1.0"}
@app.get("/public")
async def public_endpoint():
"""Public endpoint (no authentication required)"""
return {"message": "This is a public endpoint"}
@app.get("/protected")
async def protected_endpoint(claims: dict = Depends(get_current_user)):
"""Protected endpoint (authentication required)"""
return {
"message": "This is a protected endpoint",
"user": {
"sub": claims.get("sub"),
"iss": claims.get("iss"),
"scopes": claims.get("scope", "").split()
}
}
@app.get("/profile")
async def get_profile(claims: dict = Depends(require_scope("profile"))):
"""Profile information (profile scope required)"""
return {
"sub": claims.get("sub"),
"name": claims.get("name"),
"email": claims.get("email"),
"preferred_username": claims.get("preferred_username")
}
@app.get("/admin")
async def admin_endpoint(claims: dict = Depends(require_scope("admin"))):
"""Admin endpoint (admin scope required)"""
return {
"message": "Admin access granted",
"admin_data": {
"user_count": 1000,
"server_status": "healthy"
}
}
@app.post("/data")
async def create_data(
data: dict,
claims: dict = Depends(require_scope("write"))
):
"""Create data (write scope required)"""
return {
"message": "Data created successfully",
"created_by": claims.get("sub"),
"data": data
}
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
return {"error": exc.detail, "status_code": exc.status_code}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Error Handling and Testing
# test_oauth.py - OAuth 2.0 test implementation
import pytest
import httpx
from authlib.oauth2.rfc6749.errors import OAuth2Error
from authlib.jose import jwt
from authlib.common.security import generate_token
import time
class OAuthTestClient:
def __init__(self, base_url: str):
self.base_url = base_url
self.client = httpx.Client(base_url=base_url)
def test_authorization_code_flow(self, client_id, redirect_uri, scope):
"""Test authorization code flow"""
# Step 1: Generate authorization URL
auth_params = {
'response_type': 'code',
'client_id': client_id,
'redirect_uri': redirect_uri,
'scope': scope,
'state': generate_token()
}
auth_response = self.client.get('/oauth/authorize', params=auth_params)
assert auth_response.status_code == 200
return auth_params['state']
def test_token_exchange(self, code, client_id, client_secret, redirect_uri):
"""Test token exchange"""
token_data = {
'grant_type': 'authorization_code',
'code': code,
'client_id': client_id,
'client_secret': client_secret,
'redirect_uri': redirect_uri
}
token_response = self.client.post('/oauth/token', data=token_data)
if token_response.status_code == 200:
token_data = token_response.json()
assert 'access_token' in token_data
assert 'token_type' in token_data
assert token_data['token_type'].lower() == 'bearer'
return token_data
else:
raise Exception(f"Token exchange failed: {token_response.text}")
def test_protected_resource(self, access_token, resource_path='/api/profile'):
"""Test protected resource access"""
headers = {'Authorization': f'Bearer {access_token}'}
response = self.client.get(resource_path, headers=headers)
return response
class OAuth2ErrorHandler:
"""OAuth 2.0 error handling"""
@staticmethod
def handle_token_error(error_response):
"""Handle token errors"""
if isinstance(error_response, dict):
error_code = error_response.get('error')
error_description = error_response.get('error_description')
error_mapping = {
'invalid_request': 'The request is invalid',
'invalid_client': 'Client authentication failed',
'invalid_grant': 'The authorization grant is invalid',
'unauthorized_client': 'The client is not authorized',
'unsupported_grant_type': 'Unsupported grant type',
'invalid_scope': 'Invalid scope'
}
user_message = error_mapping.get(error_code, 'An unknown error occurred')
return {
'error': error_code,
'description': error_description,
'user_message': user_message
}
@staticmethod
def handle_jwt_error(jwt_token, public_key):
"""Handle JWT verification errors"""
try:
claims = jwt.decode(jwt_token, public_key)
claims.validate()
return {'valid': True, 'claims': claims}
except Exception as e:
error_type = type(e).__name__
error_messages = {
'ExpiredTokenError': 'Token has expired',
'InvalidSignatureError': 'Token signature is invalid',
'InvalidClaimsError': 'Token claims are invalid',
'InvalidTokenError': 'Token format is invalid'
}
return {
'valid': False,
'error': error_type,
'message': error_messages.get(error_type, 'Unknown JWT error'),
'details': str(e)
}
# pytest test example
@pytest.mark.asyncio
async def test_oauth_flow():
"""OAuth flow integration test"""
test_client = OAuthTestClient('http://localhost:8000')
# Test configuration
client_id = 'test-client'
client_secret = 'test-secret'
redirect_uri = 'http://localhost:8080/callback'
scope = 'read write profile'
try:
# Test authorization URL
state = test_client.test_authorization_code_flow(
client_id, redirect_uri, scope
)
# In actual tests, simulate browser operation here
# Get authorization code (mock)
mock_code = 'test-authorization-code'
# Test token exchange
token_data = test_client.test_token_exchange(
mock_code, client_id, client_secret, redirect_uri
)
# Test protected resource access
protected_response = test_client.test_protected_resource(
token_data['access_token']
)
assert protected_response.status_code == 200
profile_data = protected_response.json()
assert 'sub' in profile_data
print("OAuth flow test passed!")
except Exception as e:
pytest.fail(f"OAuth flow test failed: {e}")
if __name__ == '__main__':
# Run unit tests
error_handler = OAuth2ErrorHandler()
# Error handling test
sample_error = {
'error': 'invalid_client',
'error_description': 'Client authentication failed'
}
handled_error = error_handler.handle_token_error(sample_error)
print("Error handling test:", handled_error)