JWT and OAuth for Enterprise Screenshot API Access: SSO Integration Guide
Enterprise buyers have a standard objection to API keys: "We already have an identity provider. Why should our developers manage separate credentials for your service?" It's a legitimate complaint. SSO integration means their existing access control — user provisioning, deprovisioning, role assignments — extends automatically to your API. When an employee leaves, their screenshot API access is revoked as part of the standard offboarding, not as a separate step that someone might forget.
The implementation path is narrower than it looks. You don't need to become an OAuth server. You need to accept JWTs from trusted identity providers (Google, Azure AD, Okta, Auth0) and map claims from those tokens to your own permission model.
The JWT Verification Flow
A JWT is a signed JSON payload. An enterprise user authenticates with their identity provider, receives a JWT, and sends it to your API. You verify the signature, check the claims, and map their identity to a local user record.
import time
import json
import base64
import urllib.request
import urllib.parse
from functools import lru_cache
from typing import Optional
from dataclasses import dataclass
@dataclass
class JWTClaims:
sub: str # Subject — unique user ID from the IdP
email: str
iss: str # Issuer — which IdP issued this token
aud: str # Audience — should match your API's identifier
exp: int # Expiry timestamp
iat: int # Issued-at timestamp
roles: list[str] # Custom claim for role mapping
@lru_cache(maxsize=32)
def get_jwks(jwks_uri: str) -> dict:
"""
Fetch JSON Web Key Set from identity provider.
Cached — IdPs publish stable keys; refresh on verification failure.
"""
with urllib.request.urlopen(jwks_uri, timeout=5) as resp:
return json.loads(resp.read())
# Trusted issuer configuration
TRUSTED_ISSUERS = {
"https://accounts.google.com": {
"jwks_uri": "https://www.googleapis.com/oauth2/v3/certs",
"audience": "your-client-id.apps.googleusercontent.com",
},
"https://login.microsoftonline.com/{tenant}/v2.0": {
"jwks_uri": "https://login.microsoftonline.com/{tenant}/discovery/v2.0/keys",
"audience": "api://your-api-identifier",
},
}
def verify_jwt(token: str) -> JWTClaims:
"""
Verify a JWT from a trusted identity provider.
Raises ValueError with specific reason on failure.
"""
# Decode header to find key ID and algorithm
parts = token.split('.')
if len(parts) != 3:
raise ValueError("Invalid JWT structure")
try:
header = json.loads(_base64url_decode(parts[0]))
payload = json.loads(_base64url_decode(parts[1]))
except Exception:
raise ValueError("JWT decode failed")
issuer = payload.get('iss', '')
if issuer not in TRUSTED_ISSUERS:
raise ValueError(f"Untrusted issuer: {issuer}")
config = TRUSTED_ISSUERS[issuer]
# Check expiry before fetching keys (cheap rejection)
now = int(time.time())
exp = payload.get('exp', 0)
iat = payload.get('iat', 0)
if now > exp:
raise ValueError(f"Token expired {now - exp}s ago")
if iat > now + 60:
raise ValueError("Token issued in the future")
# Check audience
aud = payload.get('aud', '')
expected_aud = config['audience']
if isinstance(aud, list):
if expected_aud not in aud:
raise ValueError(f"Invalid audience: {aud}")
elif aud != expected_aud:
raise ValueError(f"Invalid audience: {aud}")
# Verify signature using JWKS
kid = header.get('kid')
jwks = get_jwks(config['jwks_uri'])
key = _find_key(jwks, kid)
if not key:
# Key not found — JWKS may have rotated, clear cache and retry
get_jwks.cache_clear()
jwks = get_jwks(config['jwks_uri'])
key = _find_key(jwks, kid)
if not key:
raise ValueError(f"Key ID '{kid}' not found in JWKS")
_verify_signature(token, key, header.get('alg', 'RS256'))
return JWTClaims(
sub=payload['sub'],
email=payload.get('email', ''),
iss=issuer,
aud=expected_aud,
exp=exp,
iat=iat,
roles=payload.get('roles', payload.get('groups', [])),
)
def _base64url_decode(s: str) -> bytes:
padding = 4 - len(s) % 4
return base64.urlsafe_b64decode(s + '=' * padding)
The JWKS cache with invalidation-on-miss is the key operational detail. Identity providers rotate their signing keys periodically. If you cache the JWKS indefinitely, a key rotation will cause all token verifications to fail until you restart. The pattern here — cache hit fast path, cache-clear-and-retry on key-not-found — handles rotation gracefully without requiring manual intervention.
Mapping IdP Identity to Local Permissions
A verified JWT tells you who someone is. You still need to decide what they can do. The mapping layer translates IdP claims to your API's permission model:
def resolve_user_from_jwt(claims: JWTClaims) -> ApiKey:
"""
Map JWT claims to an API user with appropriate permissions.
Creates account on first login if enterprise SSO is configured.
"""
# Look up enterprise SSO configuration for this issuer
sso_config = db_get_sso_config_by_issuer(claims.iss)
if not sso_config:
raise PermissionError(f"No SSO configuration for issuer: {claims.iss}")
# Find or create user
user = db_get_user_by_sso_identity(claims.iss, claims.sub)
if not user:
# First login — provision account
user = provision_sso_user(
sso_config=sso_config,
sub=claims.sub,
email=claims.email,
)
# Map IdP roles to API tier
tier = map_roles_to_tier(claims.roles, sso_config.role_mappings)
# Build a transient ApiKey from the JWT claims
# Not stored — exists only for the duration of this request
return ApiKey(
key_id=f"jwt:{claims.sub}",
key_hash="", # No hash for JWT-authenticated requests
user_id=user.user_id,
name=f"SSO:{claims.email}",
tier=tier,
scopes=get_scopes_for_tier(tier),
expires_at=None, # JWT expiry handled separately
)
def map_roles_to_tier(roles: list[str], role_mappings: dict) -> str:
"""
Map IdP roles/groups to API tier.
role_mappings example:
{"screenshot-enterprise": "enterprise", "screenshot-pro": "pro"}
Returns 'free' if no matching role.
"""
# Priority: enterprise > pro > free
priority = ["enterprise", "pro", "free"]
assigned_tiers = set()
for role in roles:
if role in role_mappings:
assigned_tiers.add(role_mappings[role])
for tier in priority:
if tier in assigned_tiers:
return tier
return sso_config.default_tier # Configured per enterprise customer
The default_tier per SSO configuration is the field that enterprise sales conversations are built around. When an enterprise customer signs an agreement for 50 Pro seats, you set default_tier: pro in their SSO configuration. Every employee who authenticates through their IdP automatically gets Pro access. No key distribution, no individual provisioning, no forgetting to deprovision when someone leaves.
The OAuth Authorization Code Flow for Web Clients
For browser-based access (the screenshot web tool, not the API), the full OAuth flow is appropriate:
import secrets as secrets_module
OAUTH_STATE_TTL = 600 # 10 minutes
def start_oauth_flow(provider: str, redirect_uri: str) -> str:
"""
Begin OAuth authorization code flow.
Returns the authorization URL to redirect the user to.
"""
state = secrets_module.token_urlsafe(32)
# Store state with TTL to prevent CSRF
r.setex(f"oauth_state:{state}", OAUTH_STATE_TTL, json.dumps({
"provider": provider,
"redirect_uri": redirect_uri,
"created_at": int(time.time()),
}))
provider_config = OAUTH_PROVIDERS[provider]
params = urllib.parse.urlencode({
"client_id": provider_config["client_id"],
"redirect_uri": redirect_uri,
"response_type": "code",
"scope": "openid email profile",
"state": state,
"access_type": "offline", # Request refresh token
})
return f"{provider_config['auth_endpoint']}?{params}"
def complete_oauth_flow(code: str, state: str) -> tuple[str, JWTClaims]:
"""
Exchange authorization code for tokens.
Returns (session_token, verified_claims).
"""
# Verify state to prevent CSRF
stored = r.get(f"oauth_state:{state}")
if not stored:
raise ValueError("Invalid or expired OAuth state")
r.delete(f"oauth_state:{state}") # One-time use
state_data = json.loads(stored)
provider = state_data["provider"]
provider_config = OAUTH_PROVIDERS[provider]
# Exchange code for tokens
token_response = _exchange_code_for_tokens(
code=code,
provider_config=provider_config,
redirect_uri=state_data["redirect_uri"],
)
# Verify the ID token
claims = verify_jwt(token_response["id_token"])
# Create session
session_token = secrets_module.token_urlsafe(32)
r.setex(
f"session:{session_token}",
3600 * 24, # 24-hour session
json.dumps({
"sub": claims.sub,
"email": claims.email,
"iss": claims.iss,
"expires_at": int(time.time()) + 3600 * 24,
})
)
return session_token, claims
The state parameter validation is the security-critical step. Without it, an attacker can craft an authorization URL, trick a user into completing it, and then inject the code into their own browser session — gaining access as the victim. The state ties the authorization request to the specific browser that initiated it.
What This Unlocks
With JWT/OAuth in place, the enterprise sales pitch changes:
- No credential distribution: New employees get API access by being added to the relevant IdP group
- Automatic deprovisioning: Offboarding revokes access without any API-specific steps
- Audit trail: Your logs show
jwt:sub123(or the resolved email) instead of an opaque key ID - Role-based access: Contractors get free tier, employees get pro, power users get enterprise — all managed in the customer's existing directory
The implementation adds complexity, but complexity that replaces a category of operational burden for enterprise customers. That's the right trade.
Part of the security and authentication series. Previous: Webhook Security. This post completes the security/auth arc: key design → scoping → webhook signing → JWT/OAuth.