Screenshot API Error Handling: Patterns for Production-Grade Integrations

2026-05-13 | Tags: [screenshot-api, error-handling, python, javascript, best-practices]

The difference between a demo integration and a production integration is almost always error handling. A demo calls the API, gets a response, moves on. A production integration classifies errors by type, retries the right ones, falls back gracefully on the wrong ones, logs structured data for observability, and never silently drops a failure that a human should know about.

This guide covers the error handling patterns that matter for screenshot API integrations, with runnable examples in Python and JavaScript.

Error Classification

Not all errors are equal. The first decision in any error handler is: is this error retriable?

Error Type HTTP Status Retriable Action
Rate limit exceeded 429 Yes Backoff + retry, respect Retry-After
Server error 500, 502, 503, 504 Yes Exponential backoff
Bad request 400 No Fix the request
Unauthorized 401 No Check API key
Forbidden 403 No Check tier/permissions
Not found 404 No Check endpoint URL
Timeout (network) Sometimes Retry once with longer timeout
Connection error (network) Yes Retry with backoff

The rule: retry transient errors (rate limits, server errors, network failures). Do not retry client errors (bad request, auth failures). Treat timeouts as conditional — one retry at a higher timeout, then fail.

Python: Structured Error Handling

import urllib.request
import urllib.parse
import urllib.error
import time
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Optional

logger = logging.getLogger(__name__)


class ScreenshotErrorKind(Enum):
    RATE_LIMITED = "rate_limited"
    SERVER_ERROR = "server_error"
    CLIENT_ERROR = "client_error"
    AUTH_ERROR = "auth_error"
    NETWORK_ERROR = "network_error"
    TIMEOUT = "timeout"


@dataclass
class ScreenshotError(Exception):
    kind: ScreenshotErrorKind
    status_code: Optional[int]
    message: str
    retry_after: Optional[float] = None
    retriable: bool = False

    def __str__(self):
        return f"ScreenshotError({self.kind.value}, status={self.status_code}): {self.message}"


def _classify_http_error(exc: urllib.error.HTTPError) -> ScreenshotError:
    """Map HTTPError to a typed ScreenshotError."""
    status = exc.code
    try:
        body = exc.read().decode("utf-8", errors="replace")
    except Exception:
        body = "(unreadable)"

    if status == 429:
        retry_after = None
        raw = exc.headers.get("Retry-After")
        if raw:
            try:
                retry_after = float(raw)
            except ValueError:
                pass
        return ScreenshotError(
            kind=ScreenshotErrorKind.RATE_LIMITED,
            status_code=429,
            message=f"Rate limited. {body}",
            retry_after=retry_after,
            retriable=True,
        )

    if status in (401, 403):
        return ScreenshotError(
            kind=ScreenshotErrorKind.AUTH_ERROR,
            status_code=status,
            message=f"Auth error: {body}",
            retriable=False,
        )

    if status >= 500:
        return ScreenshotError(
            kind=ScreenshotErrorKind.SERVER_ERROR,
            status_code=status,
            message=f"Server error {status}: {body}",
            retriable=True,
        )

    # 4xx client errors
    return ScreenshotError(
        kind=ScreenshotErrorKind.CLIENT_ERROR,
        status_code=status,
        message=f"Client error {status}: {body}",
        retriable=False,
    )


def capture_screenshot(
    url: str,
    api_key: str,
    fmt: str = "webp",
    timeout: float = 25.0,
) -> bytes:
    """
    Capture a screenshot. Raises ScreenshotError on failure.
    Callers decide whether to retry — this function does not.
    """
    params = urllib.parse.urlencode({
        "url": url,
        "format": fmt,
        "full_page": "true",
        "block_ads": "true",
    })
    req = urllib.request.Request(
        f"https://hermesforge.dev/api/screenshot?{params}",
        headers={"X-API-Key": api_key},
    )

    try:
        with urllib.request.urlopen(req, timeout=timeout) as resp:
            return resp.read()
    except urllib.error.HTTPError as e:
        raise _classify_http_error(e) from e
    except TimeoutError:
        raise ScreenshotError(
            kind=ScreenshotErrorKind.TIMEOUT,
            status_code=None,
            message=f"Request timed out after {timeout}s",
            retriable=True,  # Retry once at higher timeout
        )
    except urllib.error.URLError as e:
        raise ScreenshotError(
            kind=ScreenshotErrorKind.NETWORK_ERROR,
            status_code=None,
            message=str(e.reason),
            retriable=True,
        ) from e

Python: Retry with Backoff

def capture_with_retry(
    url: str,
    api_key: str,
    fmt: str = "webp",
    max_attempts: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 30.0,
) -> bytes:
    """
    Retry-aware wrapper around capture_screenshot.
    Respects Retry-After on 429, exponential backoff for server/network errors.
    Does NOT retry client errors or auth failures.
    """
    last_error: Optional[ScreenshotError] = None
    timeout = 25.0

    for attempt in range(max_attempts):
        try:
            return capture_screenshot(url, api_key, fmt, timeout=timeout)

        except ScreenshotError as err:
            last_error = err

            if not err.retriable:
                logger.error(
                    "Non-retriable error",
                    extra={"kind": err.kind.value, "status": err.status_code, "url": url},
                )
                raise

            if attempt == max_attempts - 1:
                break  # Last attempt failed — raise below

            # Calculate delay
            if err.kind == ScreenshotErrorKind.RATE_LIMITED and err.retry_after is not None:
                delay = err.retry_after
            elif err.kind == ScreenshotErrorKind.TIMEOUT:
                # One retry at 2x timeout
                timeout = min(timeout * 2, 60.0)
                delay = 0
            else:
                delay = min(base_delay * (2 ** attempt), max_delay)

            logger.warning(
                "Retriable error, backing off",
                extra={
                    "kind": err.kind.value,
                    "attempt": attempt + 1,
                    "delay": delay,
                    "url": url,
                },
            )
            if delay > 0:
                time.sleep(delay)

    assert last_error is not None
    logger.error(
        "All attempts exhausted",
        extra={"kind": last_error.kind.value, "attempts": max_attempts, "url": url},
    )
    raise last_error

Python: Circuit Breaker

For high-volume workflows where the screenshot API may become temporarily unavailable, a circuit breaker prevents cascading failures by failing fast rather than queuing up retries:

import threading
from datetime import datetime, UTC

class CircuitBreaker:
    """
    Three-state circuit breaker: CLOSED (normal) → OPEN (failing fast) → HALF_OPEN (testing).
    Thread-safe.
    """
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 60.0,
        success_threshold: int = 2,
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold

        self._state = self.CLOSED
        self._failure_count = 0
        self._success_count = 0
        self._opened_at: Optional[float] = None
        self._lock = threading.Lock()

    @property
    def state(self) -> str:
        with self._lock:
            if self._state == self.OPEN:
                if time.time() - self._opened_at >= self.recovery_timeout:
                    self._state = self.HALF_OPEN
                    self._success_count = 0
            return self._state

    def record_success(self):
        with self._lock:
            if self._state == self.HALF_OPEN:
                self._success_count += 1
                if self._success_count >= self.success_threshold:
                    self._state = self.CLOSED
                    self._failure_count = 0
            elif self._state == self.CLOSED:
                self._failure_count = max(0, self._failure_count - 1)

    def record_failure(self):
        with self._lock:
            self._failure_count += 1
            if self._failure_count >= self.failure_threshold or self._state == self.HALF_OPEN:
                self._state = self.OPEN
                self._opened_at = time.time()

    def __call__(self, func):
        """Decorator usage: @circuit_breaker"""
        def wrapper(*args, **kwargs):
            if self.state == self.OPEN:
                raise ScreenshotError(
                    kind=ScreenshotErrorKind.SERVER_ERROR,
                    status_code=None,
                    message="Circuit breaker OPEN — screenshot API unavailable",
                    retriable=False,
                )
            try:
                result = func(*args, **kwargs)
                self.record_success()
                return result
            except ScreenshotError as err:
                if err.retriable:
                    self.record_failure()
                raise
        return wrapper


# Usage
screenshot_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=60.0)

@screenshot_breaker
def capture_protected(url: str, api_key: str) -> bytes:
    return capture_with_retry(url, api_key)

Python: Fallback Strategies

When the screenshot API is unavailable, choose an appropriate fallback based on your use case:

from typing import Callable
import hashlib

def capture_with_fallback(
    url: str,
    api_key: str,
    fallback: Callable[[str], bytes | None] = None,
) -> bytes | None:
    """
    Attempt capture; run fallback on failure.
    Returns None if both capture and fallback fail.
    """
    try:
        return capture_protected(url, api_key)
    except ScreenshotError as err:
        logger.warning(
            "Screenshot capture failed, using fallback",
            extra={"kind": err.kind.value, "url": url},
        )
        if fallback:
            return fallback(url)
        return None


def cached_screenshot_fallback(cache_dir: str) -> Callable[[str], bytes | None]:
    """
    Return a fallback that serves the last cached screenshot for a URL.
    Useful for monitoring dashboards — stale data is better than no data.
    """
    import os
    def fallback(url: str) -> bytes | None:
        url_hash = hashlib.sha256(url.encode()).hexdigest()[:16]
        cache_path = os.path.join(cache_dir, f"{url_hash}.webp")
        if os.path.exists(cache_path):
            logger.info(f"Serving cached screenshot for {url}")
            with open(cache_path, "rb") as f:
                return f.read()
        return None
    return fallback


def placeholder_fallback(width: int = 1280, height: int = 800) -> Callable[[str], bytes]:
    """
    Return a fallback that generates a minimal placeholder image.
    Useful for OG image pipelines — a blank/grey image is better than a broken tag.
    Requires Pillow.
    """
    from PIL import Image, ImageDraw
    import io

    def fallback(url: str) -> bytes:
        img = Image.new("RGB", (width, height), color=(245, 245, 245))
        draw = ImageDraw.Draw(img)
        draw.rectangle([(0, 0), (width - 1, height - 1)], outline=(200, 200, 200), width=2)
        draw.text((width // 2, height // 2), "Preview unavailable", fill=(150, 150, 150), anchor="mm")
        buf = io.BytesIO()
        img.save(buf, format="WEBP")
        return buf.getvalue()

    return fallback

JavaScript: Typed Error Handling

class ScreenshotError extends Error {
  constructor(kind, statusCode, message, { retryAfter = null, retriable = false } = {}) {
    super(message);
    this.name = 'ScreenshotError';
    this.kind = kind;
    this.statusCode = statusCode;
    this.retryAfter = retryAfter;
    this.retriable = retriable;
  }
}

const ErrorKind = {
  RATE_LIMITED: 'rate_limited',
  SERVER_ERROR: 'server_error',
  CLIENT_ERROR: 'client_error',
  AUTH_ERROR: 'auth_error',
  NETWORK_ERROR: 'network_error',
  TIMEOUT: 'timeout',
};

async function captureScreenshot(url, apiKey, { format = 'webp', timeoutMs = 25000 } = {}) {
  const params = new URLSearchParams({ url, format, full_page: 'true', block_ads: 'true' });

  const controller = new AbortController();
  const timer = setTimeout(() => controller.abort(), timeoutMs);

  let resp;
  try {
    resp = await fetch(`https://hermesforge.dev/api/screenshot?${params}`, {
      headers: { 'X-API-Key': apiKey },
      signal: controller.signal,
    });
  } catch (err) {
    clearTimeout(timer);
    if (err.name === 'AbortError') {
      throw new ScreenshotError(
        ErrorKind.TIMEOUT, null,
        `Request timed out after ${timeoutMs}ms`,
        { retriable: true }
      );
    }
    throw new ScreenshotError(
      ErrorKind.NETWORK_ERROR, null,
      err.message,
      { retriable: true }
    );
  }
  clearTimeout(timer);

  if (!resp.ok) {
    const body = await resp.text().catch(() => '');
    if (resp.status === 429) {
      const retryAfter = parseFloat(resp.headers.get('Retry-After') ?? '0') || null;
      throw new ScreenshotError(
        ErrorKind.RATE_LIMITED, 429,
        `Rate limited. ${body}`,
        { retryAfter, retriable: true }
      );
    }
    if (resp.status === 401 || resp.status === 403) {
      throw new ScreenshotError(ErrorKind.AUTH_ERROR, resp.status, `Auth error: ${body}`);
    }
    if (resp.status >= 500) {
      throw new ScreenshotError(
        ErrorKind.SERVER_ERROR, resp.status,
        `Server error ${resp.status}: ${body}`,
        { retriable: true }
      );
    }
    throw new ScreenshotError(ErrorKind.CLIENT_ERROR, resp.status, `Client error ${resp.status}: ${body}`);
  }

  return Buffer.from(await resp.arrayBuffer());
}


async function captureWithRetry(url, apiKey, {
  format = 'webp',
  maxAttempts = 3,
  baseDelayMs = 1000,
  maxDelayMs = 30000,
} = {}) {
  let lastError;
  let timeoutMs = 25000;

  for (let attempt = 0; attempt < maxAttempts; attempt++) {
    try {
      return await captureScreenshot(url, apiKey, { format, timeoutMs });
    } catch (err) {
      if (!(err instanceof ScreenshotError)) throw err;
      lastError = err;

      if (!err.retriable) throw err;
      if (attempt === maxAttempts - 1) break;

      let delayMs;
      if (err.kind === ErrorKind.RATE_LIMITED && err.retryAfter != null) {
        delayMs = err.retryAfter * 1000;
      } else if (err.kind === ErrorKind.TIMEOUT) {
        timeoutMs = Math.min(timeoutMs * 2, 60000);
        delayMs = 0;
      } else {
        delayMs = Math.min(baseDelayMs * 2 ** attempt, maxDelayMs);
      }

      console.warn(`Screenshot attempt ${attempt + 1}/${maxAttempts} failed (${err.kind}), retrying in ${delayMs}ms`);
      if (delayMs > 0) await new Promise(r => setTimeout(r, delayMs));
    }
  }

  throw lastError;
}

Structured Logging for Observability

Error handlers are only as useful as the data they emit. Structured logs let you query error rates by kind, URL, and time window:

import json
import sys
from datetime import datetime, UTC

def log_screenshot_event(
    event_type: str,  # "capture_ok" | "capture_error" | "retry"
    url: str,
    **extra,
):
    """Emit a structured JSON log line to stdout."""
    record = {
        "ts": datetime.now(UTC).isoformat(),
        "event": event_type,
        "url": url,
        **extra,
    }
    print(json.dumps(record), file=sys.stdout, flush=True)


# Usage in capture_with_retry:
# On success: log_screenshot_event("capture_ok", url, bytes=len(image_bytes), attempt=attempt+1)
# On retry: log_screenshot_event("retry", url, kind=err.kind.value, attempt=attempt+1, delay=delay)
# On final failure: log_screenshot_event("capture_error", url, kind=err.kind.value, attempts=max_attempts)

In CloudWatch, GCP Cloud Logging, or Azure Monitor, you can then query:

-- Error rate by kind (last 24h)
SELECT kind, COUNT(*) as count
FROM logs
WHERE event = 'capture_error'
  AND ts > NOW() - INTERVAL 24 HOURS
GROUP BY kind
ORDER BY count DESC;

Rate Limit Budget Tracking

If you are making many screenshot calls in a workflow, track your rate limit budget explicitly rather than relying on 429 responses:

import threading

class RateLimitBudget:
    """
    Token bucket for proactive rate limit management.
    Prevents 429 responses by tracking usage locally.
    """
    def __init__(self, calls_per_day: int):
        self.calls_per_day = calls_per_day
        self._calls_today = 0
        self._reset_at = self._next_midnight()
        self._lock = threading.Lock()

    @staticmethod
    def _next_midnight() -> float:
        from datetime import date, timedelta
        tomorrow = date.today() + timedelta(days=1)
        return datetime.combine(tomorrow, datetime.min.time()).timestamp()

    def consume(self) -> bool:
        """Return True if a call is permitted, False if budget exhausted."""
        with self._lock:
            now = time.time()
            if now >= self._reset_at:
                self._calls_today = 0
                self._reset_at = self._next_midnight()
            if self._calls_today >= self.calls_per_day:
                return False
            self._calls_today += 1
            return True

    @property
    def remaining(self) -> int:
        with self._lock:
            return max(0, self.calls_per_day - self._calls_today)


# Pro tier: 1000 calls/day
budget = RateLimitBudget(calls_per_day=1000)

def capture_with_budget(url: str, api_key: str) -> bytes:
    if not budget.consume():
        raise ScreenshotError(
            kind=ScreenshotErrorKind.RATE_LIMITED,
            status_code=None,
            message=f"Local rate limit budget exhausted (0/{budget.calls_per_day} remaining today)",
            retriable=False,
        )
    return capture_with_retry(url, api_key)

Summary

Production screenshot API error handling requires: (1) typed error classification — retriable vs non-retriable, with Retry-After respect for 429; (2) exponential backoff with jitter for server/network errors, timeout escalation for timeout errors; (3) circuit breaker to fail fast when the API is degraded; (4) meaningful fallbacks matched to the use case (cached screenshot for monitoring, placeholder for OG images, queue for batch); (5) structured logging with enough context to query error rates and debug failures. The patterns here apply regardless of language — the Python and JavaScript examples share the same underlying logic.

Get a free API key at hermesforge.dev to start building.