JWT and OAuth for Enterprise Screenshot API Access: SSO Integration Guide

2026-05-21 | Tags: [screenshot-api, security, jwt, oauth, enterprise, sso, authentication]

Enterprise buyers have a standard objection to API keys: "We already have an identity provider. Why should our developers manage separate credentials for your service?" It's a legitimate complaint. SSO integration means their existing access control — user provisioning, deprovisioning, role assignments — extends automatically to your API. When an employee leaves, their screenshot API access is revoked as part of the standard offboarding, not as a separate step that someone might forget.

The implementation path is narrower than it looks. You don't need to become an OAuth server. You need to accept JWTs from trusted identity providers (Google, Azure AD, Okta, Auth0) and map claims from those tokens to your own permission model.

The JWT Verification Flow

A JWT is a signed JSON payload. An enterprise user authenticates with their identity provider, receives a JWT, and sends it to your API. You verify the signature, check the claims, and map their identity to a local user record.

import time
import json
import base64
import urllib.request
import urllib.parse
from functools import lru_cache
from typing import Optional
from dataclasses import dataclass

@dataclass
class JWTClaims:
    sub: str           # Subject — unique user ID from the IdP
    email: str
    iss: str           # Issuer — which IdP issued this token
    aud: str           # Audience — should match your API's identifier
    exp: int           # Expiry timestamp
    iat: int           # Issued-at timestamp
    roles: list[str]   # Custom claim for role mapping

@lru_cache(maxsize=32)
def get_jwks(jwks_uri: str) -> dict:
    """
    Fetch JSON Web Key Set from identity provider.
    Cached — IdPs publish stable keys; refresh on verification failure.
    """
    with urllib.request.urlopen(jwks_uri, timeout=5) as resp:
        return json.loads(resp.read())

# Trusted issuer configuration
TRUSTED_ISSUERS = {
    "https://accounts.google.com": {
        "jwks_uri": "https://www.googleapis.com/oauth2/v3/certs",
        "audience": "your-client-id.apps.googleusercontent.com",
    },
    "https://login.microsoftonline.com/{tenant}/v2.0": {
        "jwks_uri": "https://login.microsoftonline.com/{tenant}/discovery/v2.0/keys",
        "audience": "api://your-api-identifier",
    },
}

def verify_jwt(token: str) -> JWTClaims:
    """
    Verify a JWT from a trusted identity provider.
    Raises ValueError with specific reason on failure.
    """
    # Decode header to find key ID and algorithm
    parts = token.split('.')
    if len(parts) != 3:
        raise ValueError("Invalid JWT structure")

    try:
        header = json.loads(_base64url_decode(parts[0]))
        payload = json.loads(_base64url_decode(parts[1]))
    except Exception:
        raise ValueError("JWT decode failed")

    issuer = payload.get('iss', '')
    if issuer not in TRUSTED_ISSUERS:
        raise ValueError(f"Untrusted issuer: {issuer}")

    config = TRUSTED_ISSUERS[issuer]

    # Check expiry before fetching keys (cheap rejection)
    now = int(time.time())
    exp = payload.get('exp', 0)
    iat = payload.get('iat', 0)
    if now > exp:
        raise ValueError(f"Token expired {now - exp}s ago")
    if iat > now + 60:
        raise ValueError("Token issued in the future")

    # Check audience
    aud = payload.get('aud', '')
    expected_aud = config['audience']
    if isinstance(aud, list):
        if expected_aud not in aud:
            raise ValueError(f"Invalid audience: {aud}")
    elif aud != expected_aud:
        raise ValueError(f"Invalid audience: {aud}")

    # Verify signature using JWKS
    kid = header.get('kid')
    jwks = get_jwks(config['jwks_uri'])
    key = _find_key(jwks, kid)
    if not key:
        # Key not found — JWKS may have rotated, clear cache and retry
        get_jwks.cache_clear()
        jwks = get_jwks(config['jwks_uri'])
        key = _find_key(jwks, kid)
    if not key:
        raise ValueError(f"Key ID '{kid}' not found in JWKS")

    _verify_signature(token, key, header.get('alg', 'RS256'))

    return JWTClaims(
        sub=payload['sub'],
        email=payload.get('email', ''),
        iss=issuer,
        aud=expected_aud,
        exp=exp,
        iat=iat,
        roles=payload.get('roles', payload.get('groups', [])),
    )

def _base64url_decode(s: str) -> bytes:
    padding = 4 - len(s) % 4
    return base64.urlsafe_b64decode(s + '=' * padding)

The JWKS cache with invalidation-on-miss is the key operational detail. Identity providers rotate their signing keys periodically. If you cache the JWKS indefinitely, a key rotation will cause all token verifications to fail until you restart. The pattern here — cache hit fast path, cache-clear-and-retry on key-not-found — handles rotation gracefully without requiring manual intervention.

Mapping IdP Identity to Local Permissions

A verified JWT tells you who someone is. You still need to decide what they can do. The mapping layer translates IdP claims to your API's permission model:

def resolve_user_from_jwt(claims: JWTClaims) -> ApiKey:
    """
    Map JWT claims to an API user with appropriate permissions.
    Creates account on first login if enterprise SSO is configured.
    """
    # Look up enterprise SSO configuration for this issuer
    sso_config = db_get_sso_config_by_issuer(claims.iss)
    if not sso_config:
        raise PermissionError(f"No SSO configuration for issuer: {claims.iss}")

    # Find or create user
    user = db_get_user_by_sso_identity(claims.iss, claims.sub)
    if not user:
        # First login — provision account
        user = provision_sso_user(
            sso_config=sso_config,
            sub=claims.sub,
            email=claims.email,
        )

    # Map IdP roles to API tier
    tier = map_roles_to_tier(claims.roles, sso_config.role_mappings)

    # Build a transient ApiKey from the JWT claims
    # Not stored — exists only for the duration of this request
    return ApiKey(
        key_id=f"jwt:{claims.sub}",
        key_hash="",  # No hash for JWT-authenticated requests
        user_id=user.user_id,
        name=f"SSO:{claims.email}",
        tier=tier,
        scopes=get_scopes_for_tier(tier),
        expires_at=None,  # JWT expiry handled separately
    )

def map_roles_to_tier(roles: list[str], role_mappings: dict) -> str:
    """
    Map IdP roles/groups to API tier.
    role_mappings example:
      {"screenshot-enterprise": "enterprise", "screenshot-pro": "pro"}
    Returns 'free' if no matching role.
    """
    # Priority: enterprise > pro > free
    priority = ["enterprise", "pro", "free"]
    assigned_tiers = set()

    for role in roles:
        if role in role_mappings:
            assigned_tiers.add(role_mappings[role])

    for tier in priority:
        if tier in assigned_tiers:
            return tier

    return sso_config.default_tier  # Configured per enterprise customer

The default_tier per SSO configuration is the field that enterprise sales conversations are built around. When an enterprise customer signs an agreement for 50 Pro seats, you set default_tier: pro in their SSO configuration. Every employee who authenticates through their IdP automatically gets Pro access. No key distribution, no individual provisioning, no forgetting to deprovision when someone leaves.

The OAuth Authorization Code Flow for Web Clients

For browser-based access (the screenshot web tool, not the API), the full OAuth flow is appropriate:

import secrets as secrets_module

OAUTH_STATE_TTL = 600  # 10 minutes

def start_oauth_flow(provider: str, redirect_uri: str) -> str:
    """
    Begin OAuth authorization code flow.
    Returns the authorization URL to redirect the user to.
    """
    state = secrets_module.token_urlsafe(32)
    # Store state with TTL to prevent CSRF
    r.setex(f"oauth_state:{state}", OAUTH_STATE_TTL, json.dumps({
        "provider": provider,
        "redirect_uri": redirect_uri,
        "created_at": int(time.time()),
    }))

    provider_config = OAUTH_PROVIDERS[provider]
    params = urllib.parse.urlencode({
        "client_id": provider_config["client_id"],
        "redirect_uri": redirect_uri,
        "response_type": "code",
        "scope": "openid email profile",
        "state": state,
        "access_type": "offline",  # Request refresh token
    })

    return f"{provider_config['auth_endpoint']}?{params}"

def complete_oauth_flow(code: str, state: str) -> tuple[str, JWTClaims]:
    """
    Exchange authorization code for tokens.
    Returns (session_token, verified_claims).
    """
    # Verify state to prevent CSRF
    stored = r.get(f"oauth_state:{state}")
    if not stored:
        raise ValueError("Invalid or expired OAuth state")
    r.delete(f"oauth_state:{state}")  # One-time use

    state_data = json.loads(stored)
    provider = state_data["provider"]
    provider_config = OAUTH_PROVIDERS[provider]

    # Exchange code for tokens
    token_response = _exchange_code_for_tokens(
        code=code,
        provider_config=provider_config,
        redirect_uri=state_data["redirect_uri"],
    )

    # Verify the ID token
    claims = verify_jwt(token_response["id_token"])

    # Create session
    session_token = secrets_module.token_urlsafe(32)
    r.setex(
        f"session:{session_token}",
        3600 * 24,  # 24-hour session
        json.dumps({
            "sub": claims.sub,
            "email": claims.email,
            "iss": claims.iss,
            "expires_at": int(time.time()) + 3600 * 24,
        })
    )

    return session_token, claims

The state parameter validation is the security-critical step. Without it, an attacker can craft an authorization URL, trick a user into completing it, and then inject the code into their own browser session — gaining access as the victim. The state ties the authorization request to the specific browser that initiated it.

What This Unlocks

With JWT/OAuth in place, the enterprise sales pitch changes:

The implementation adds complexity, but complexity that replaces a category of operational burden for enterprise customers. That's the right trade.


Part of the security and authentication series. Previous: Webhook Security. This post completes the security/auth arc: key designscopingwebhook signing → JWT/OAuth.