Screenshot API Error Handling: Patterns for Production-Grade Integrations
The difference between a demo integration and a production integration is almost always error handling. A demo calls the API, gets a response, moves on. A production integration classifies errors by type, retries the right ones, falls back gracefully on the wrong ones, logs structured data for observability, and never silently drops a failure that a human should know about.
This guide covers the error handling patterns that matter for screenshot API integrations, with runnable examples in Python and JavaScript.
Error Classification
Not all errors are equal. The first decision in any error handler is: is this error retriable?
| Error Type | HTTP Status | Retriable | Action |
|---|---|---|---|
| Rate limit exceeded | 429 | Yes | Backoff + retry, respect Retry-After |
| Server error | 500, 502, 503, 504 | Yes | Exponential backoff |
| Bad request | 400 | No | Fix the request |
| Unauthorized | 401 | No | Check API key |
| Forbidden | 403 | No | Check tier/permissions |
| Not found | 404 | No | Check endpoint URL |
| Timeout | (network) | Sometimes | Retry once with longer timeout |
| Connection error | (network) | Yes | Retry with backoff |
The rule: retry transient errors (rate limits, server errors, network failures). Do not retry client errors (bad request, auth failures). Treat timeouts as conditional — one retry at a higher timeout, then fail.
Python: Structured Error Handling
import urllib.request
import urllib.parse
import urllib.error
import time
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Optional
logger = logging.getLogger(__name__)
class ScreenshotErrorKind(Enum):
RATE_LIMITED = "rate_limited"
SERVER_ERROR = "server_error"
CLIENT_ERROR = "client_error"
AUTH_ERROR = "auth_error"
NETWORK_ERROR = "network_error"
TIMEOUT = "timeout"
@dataclass
class ScreenshotError(Exception):
kind: ScreenshotErrorKind
status_code: Optional[int]
message: str
retry_after: Optional[float] = None
retriable: bool = False
def __str__(self):
return f"ScreenshotError({self.kind.value}, status={self.status_code}): {self.message}"
def _classify_http_error(exc: urllib.error.HTTPError) -> ScreenshotError:
"""Map HTTPError to a typed ScreenshotError."""
status = exc.code
try:
body = exc.read().decode("utf-8", errors="replace")
except Exception:
body = "(unreadable)"
if status == 429:
retry_after = None
raw = exc.headers.get("Retry-After")
if raw:
try:
retry_after = float(raw)
except ValueError:
pass
return ScreenshotError(
kind=ScreenshotErrorKind.RATE_LIMITED,
status_code=429,
message=f"Rate limited. {body}",
retry_after=retry_after,
retriable=True,
)
if status in (401, 403):
return ScreenshotError(
kind=ScreenshotErrorKind.AUTH_ERROR,
status_code=status,
message=f"Auth error: {body}",
retriable=False,
)
if status >= 500:
return ScreenshotError(
kind=ScreenshotErrorKind.SERVER_ERROR,
status_code=status,
message=f"Server error {status}: {body}",
retriable=True,
)
# 4xx client errors
return ScreenshotError(
kind=ScreenshotErrorKind.CLIENT_ERROR,
status_code=status,
message=f"Client error {status}: {body}",
retriable=False,
)
def capture_screenshot(
url: str,
api_key: str,
fmt: str = "webp",
timeout: float = 25.0,
) -> bytes:
"""
Capture a screenshot. Raises ScreenshotError on failure.
Callers decide whether to retry — this function does not.
"""
params = urllib.parse.urlencode({
"url": url,
"format": fmt,
"full_page": "true",
"block_ads": "true",
})
req = urllib.request.Request(
f"https://hermesforge.dev/api/screenshot?{params}",
headers={"X-API-Key": api_key},
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.read()
except urllib.error.HTTPError as e:
raise _classify_http_error(e) from e
except TimeoutError:
raise ScreenshotError(
kind=ScreenshotErrorKind.TIMEOUT,
status_code=None,
message=f"Request timed out after {timeout}s",
retriable=True, # Retry once at higher timeout
)
except urllib.error.URLError as e:
raise ScreenshotError(
kind=ScreenshotErrorKind.NETWORK_ERROR,
status_code=None,
message=str(e.reason),
retriable=True,
) from e
Python: Retry with Backoff
def capture_with_retry(
url: str,
api_key: str,
fmt: str = "webp",
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
) -> bytes:
"""
Retry-aware wrapper around capture_screenshot.
Respects Retry-After on 429, exponential backoff for server/network errors.
Does NOT retry client errors or auth failures.
"""
last_error: Optional[ScreenshotError] = None
timeout = 25.0
for attempt in range(max_attempts):
try:
return capture_screenshot(url, api_key, fmt, timeout=timeout)
except ScreenshotError as err:
last_error = err
if not err.retriable:
logger.error(
"Non-retriable error",
extra={"kind": err.kind.value, "status": err.status_code, "url": url},
)
raise
if attempt == max_attempts - 1:
break # Last attempt failed — raise below
# Calculate delay
if err.kind == ScreenshotErrorKind.RATE_LIMITED and err.retry_after is not None:
delay = err.retry_after
elif err.kind == ScreenshotErrorKind.TIMEOUT:
# One retry at 2x timeout
timeout = min(timeout * 2, 60.0)
delay = 0
else:
delay = min(base_delay * (2 ** attempt), max_delay)
logger.warning(
"Retriable error, backing off",
extra={
"kind": err.kind.value,
"attempt": attempt + 1,
"delay": delay,
"url": url,
},
)
if delay > 0:
time.sleep(delay)
assert last_error is not None
logger.error(
"All attempts exhausted",
extra={"kind": last_error.kind.value, "attempts": max_attempts, "url": url},
)
raise last_error
Python: Circuit Breaker
For high-volume workflows where the screenshot API may become temporarily unavailable, a circuit breaker prevents cascading failures by failing fast rather than queuing up retries:
import threading
from datetime import datetime, UTC
class CircuitBreaker:
"""
Three-state circuit breaker: CLOSED (normal) → OPEN (failing fast) → HALF_OPEN (testing).
Thread-safe.
"""
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
success_threshold: int = 2,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self._state = self.CLOSED
self._failure_count = 0
self._success_count = 0
self._opened_at: Optional[float] = None
self._lock = threading.Lock()
@property
def state(self) -> str:
with self._lock:
if self._state == self.OPEN:
if time.time() - self._opened_at >= self.recovery_timeout:
self._state = self.HALF_OPEN
self._success_count = 0
return self._state
def record_success(self):
with self._lock:
if self._state == self.HALF_OPEN:
self._success_count += 1
if self._success_count >= self.success_threshold:
self._state = self.CLOSED
self._failure_count = 0
elif self._state == self.CLOSED:
self._failure_count = max(0, self._failure_count - 1)
def record_failure(self):
with self._lock:
self._failure_count += 1
if self._failure_count >= self.failure_threshold or self._state == self.HALF_OPEN:
self._state = self.OPEN
self._opened_at = time.time()
def __call__(self, func):
"""Decorator usage: @circuit_breaker"""
def wrapper(*args, **kwargs):
if self.state == self.OPEN:
raise ScreenshotError(
kind=ScreenshotErrorKind.SERVER_ERROR,
status_code=None,
message="Circuit breaker OPEN — screenshot API unavailable",
retriable=False,
)
try:
result = func(*args, **kwargs)
self.record_success()
return result
except ScreenshotError as err:
if err.retriable:
self.record_failure()
raise
return wrapper
# Usage
screenshot_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=60.0)
@screenshot_breaker
def capture_protected(url: str, api_key: str) -> bytes:
return capture_with_retry(url, api_key)
Python: Fallback Strategies
When the screenshot API is unavailable, choose an appropriate fallback based on your use case:
from typing import Callable
import hashlib
def capture_with_fallback(
url: str,
api_key: str,
fallback: Callable[[str], bytes | None] = None,
) -> bytes | None:
"""
Attempt capture; run fallback on failure.
Returns None if both capture and fallback fail.
"""
try:
return capture_protected(url, api_key)
except ScreenshotError as err:
logger.warning(
"Screenshot capture failed, using fallback",
extra={"kind": err.kind.value, "url": url},
)
if fallback:
return fallback(url)
return None
def cached_screenshot_fallback(cache_dir: str) -> Callable[[str], bytes | None]:
"""
Return a fallback that serves the last cached screenshot for a URL.
Useful for monitoring dashboards — stale data is better than no data.
"""
import os
def fallback(url: str) -> bytes | None:
url_hash = hashlib.sha256(url.encode()).hexdigest()[:16]
cache_path = os.path.join(cache_dir, f"{url_hash}.webp")
if os.path.exists(cache_path):
logger.info(f"Serving cached screenshot for {url}")
with open(cache_path, "rb") as f:
return f.read()
return None
return fallback
def placeholder_fallback(width: int = 1280, height: int = 800) -> Callable[[str], bytes]:
"""
Return a fallback that generates a minimal placeholder image.
Useful for OG image pipelines — a blank/grey image is better than a broken tag.
Requires Pillow.
"""
from PIL import Image, ImageDraw
import io
def fallback(url: str) -> bytes:
img = Image.new("RGB", (width, height), color=(245, 245, 245))
draw = ImageDraw.Draw(img)
draw.rectangle([(0, 0), (width - 1, height - 1)], outline=(200, 200, 200), width=2)
draw.text((width // 2, height // 2), "Preview unavailable", fill=(150, 150, 150), anchor="mm")
buf = io.BytesIO()
img.save(buf, format="WEBP")
return buf.getvalue()
return fallback
JavaScript: Typed Error Handling
class ScreenshotError extends Error {
constructor(kind, statusCode, message, { retryAfter = null, retriable = false } = {}) {
super(message);
this.name = 'ScreenshotError';
this.kind = kind;
this.statusCode = statusCode;
this.retryAfter = retryAfter;
this.retriable = retriable;
}
}
const ErrorKind = {
RATE_LIMITED: 'rate_limited',
SERVER_ERROR: 'server_error',
CLIENT_ERROR: 'client_error',
AUTH_ERROR: 'auth_error',
NETWORK_ERROR: 'network_error',
TIMEOUT: 'timeout',
};
async function captureScreenshot(url, apiKey, { format = 'webp', timeoutMs = 25000 } = {}) {
const params = new URLSearchParams({ url, format, full_page: 'true', block_ads: 'true' });
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), timeoutMs);
let resp;
try {
resp = await fetch(`https://hermesforge.dev/api/screenshot?${params}`, {
headers: { 'X-API-Key': apiKey },
signal: controller.signal,
});
} catch (err) {
clearTimeout(timer);
if (err.name === 'AbortError') {
throw new ScreenshotError(
ErrorKind.TIMEOUT, null,
`Request timed out after ${timeoutMs}ms`,
{ retriable: true }
);
}
throw new ScreenshotError(
ErrorKind.NETWORK_ERROR, null,
err.message,
{ retriable: true }
);
}
clearTimeout(timer);
if (!resp.ok) {
const body = await resp.text().catch(() => '');
if (resp.status === 429) {
const retryAfter = parseFloat(resp.headers.get('Retry-After') ?? '0') || null;
throw new ScreenshotError(
ErrorKind.RATE_LIMITED, 429,
`Rate limited. ${body}`,
{ retryAfter, retriable: true }
);
}
if (resp.status === 401 || resp.status === 403) {
throw new ScreenshotError(ErrorKind.AUTH_ERROR, resp.status, `Auth error: ${body}`);
}
if (resp.status >= 500) {
throw new ScreenshotError(
ErrorKind.SERVER_ERROR, resp.status,
`Server error ${resp.status}: ${body}`,
{ retriable: true }
);
}
throw new ScreenshotError(ErrorKind.CLIENT_ERROR, resp.status, `Client error ${resp.status}: ${body}`);
}
return Buffer.from(await resp.arrayBuffer());
}
async function captureWithRetry(url, apiKey, {
format = 'webp',
maxAttempts = 3,
baseDelayMs = 1000,
maxDelayMs = 30000,
} = {}) {
let lastError;
let timeoutMs = 25000;
for (let attempt = 0; attempt < maxAttempts; attempt++) {
try {
return await captureScreenshot(url, apiKey, { format, timeoutMs });
} catch (err) {
if (!(err instanceof ScreenshotError)) throw err;
lastError = err;
if (!err.retriable) throw err;
if (attempt === maxAttempts - 1) break;
let delayMs;
if (err.kind === ErrorKind.RATE_LIMITED && err.retryAfter != null) {
delayMs = err.retryAfter * 1000;
} else if (err.kind === ErrorKind.TIMEOUT) {
timeoutMs = Math.min(timeoutMs * 2, 60000);
delayMs = 0;
} else {
delayMs = Math.min(baseDelayMs * 2 ** attempt, maxDelayMs);
}
console.warn(`Screenshot attempt ${attempt + 1}/${maxAttempts} failed (${err.kind}), retrying in ${delayMs}ms`);
if (delayMs > 0) await new Promise(r => setTimeout(r, delayMs));
}
}
throw lastError;
}
Structured Logging for Observability
Error handlers are only as useful as the data they emit. Structured logs let you query error rates by kind, URL, and time window:
import json
import sys
from datetime import datetime, UTC
def log_screenshot_event(
event_type: str, # "capture_ok" | "capture_error" | "retry"
url: str,
**extra,
):
"""Emit a structured JSON log line to stdout."""
record = {
"ts": datetime.now(UTC).isoformat(),
"event": event_type,
"url": url,
**extra,
}
print(json.dumps(record), file=sys.stdout, flush=True)
# Usage in capture_with_retry:
# On success: log_screenshot_event("capture_ok", url, bytes=len(image_bytes), attempt=attempt+1)
# On retry: log_screenshot_event("retry", url, kind=err.kind.value, attempt=attempt+1, delay=delay)
# On final failure: log_screenshot_event("capture_error", url, kind=err.kind.value, attempts=max_attempts)
In CloudWatch, GCP Cloud Logging, or Azure Monitor, you can then query:
-- Error rate by kind (last 24h)
SELECT kind, COUNT(*) as count
FROM logs
WHERE event = 'capture_error'
AND ts > NOW() - INTERVAL 24 HOURS
GROUP BY kind
ORDER BY count DESC;
Rate Limit Budget Tracking
If you are making many screenshot calls in a workflow, track your rate limit budget explicitly rather than relying on 429 responses:
import threading
class RateLimitBudget:
"""
Token bucket for proactive rate limit management.
Prevents 429 responses by tracking usage locally.
"""
def __init__(self, calls_per_day: int):
self.calls_per_day = calls_per_day
self._calls_today = 0
self._reset_at = self._next_midnight()
self._lock = threading.Lock()
@staticmethod
def _next_midnight() -> float:
from datetime import date, timedelta
tomorrow = date.today() + timedelta(days=1)
return datetime.combine(tomorrow, datetime.min.time()).timestamp()
def consume(self) -> bool:
"""Return True if a call is permitted, False if budget exhausted."""
with self._lock:
now = time.time()
if now >= self._reset_at:
self._calls_today = 0
self._reset_at = self._next_midnight()
if self._calls_today >= self.calls_per_day:
return False
self._calls_today += 1
return True
@property
def remaining(self) -> int:
with self._lock:
return max(0, self.calls_per_day - self._calls_today)
# Pro tier: 1000 calls/day
budget = RateLimitBudget(calls_per_day=1000)
def capture_with_budget(url: str, api_key: str) -> bytes:
if not budget.consume():
raise ScreenshotError(
kind=ScreenshotErrorKind.RATE_LIMITED,
status_code=None,
message=f"Local rate limit budget exhausted (0/{budget.calls_per_day} remaining today)",
retriable=False,
)
return capture_with_retry(url, api_key)
Summary
Production screenshot API error handling requires: (1) typed error classification — retriable vs non-retriable, with Retry-After respect for 429; (2) exponential backoff with jitter for server/network errors, timeout escalation for timeout errors; (3) circuit breaker to fail fast when the API is degraded; (4) meaningful fallbacks matched to the use case (cached screenshot for monitoring, placeholder for OG images, queue for batch); (5) structured logging with enough context to query error rates and debug failures. The patterns here apply regardless of language — the Python and JavaScript examples share the same underlying logic.
Get a free API key at hermesforge.dev to start building.