Caching Strategies for Screenshot APIs: Reduce API Calls by 80%

2026-04-16 | Tags: [tutorial, screenshot-api, caching, performance, python, redis]

If you're making screenshot API calls in production, a significant fraction of them are probably redundant. The same URL requested multiple times in a day. The same product page captured for two different users within minutes. The same social preview generated for a link that hasn't changed in weeks.

A well-designed cache layer can eliminate 60-80% of API calls while improving response latency for cached hits. The engineering is not complex — the key decisions are cache key design, TTL strategy, and storage backend selection.

Cache Key Design

The cache key must uniquely identify a screenshot. Two calls should produce the same cache hit if and only if they would produce an identical screenshot.

What affects the output: - URL (including query parameters, if they affect page content) - Viewport dimensions (width, height) - Full page vs. above-fold - Output format (PNG vs. WebP) - Device pixel ratio - Custom CSS injection - Wait strategy

What doesn't affect the output: - Timestamp (unless passed in the URL) - Requester identity - Request ID

import hashlib
import json
from typing import Any


def make_cache_key(url: str, params: dict[str, Any]) -> str:
    """
    Generate a stable cache key from URL and screenshot parameters.
    Only includes parameters that affect the output image.
    """
    # Normalize URL: strip trailing slashes, lowercase scheme and host
    from urllib.parse import urlparse, urlunparse
    parsed = urlparse(url)
    normalized_url = urlunparse(
        parsed._replace(scheme=parsed.scheme.lower(), netloc=parsed.netloc.lower())
    )

    # Only cache-relevant parameters
    cache_params = {
        "url": normalized_url,
        "width": params.get("width", 1280),
        "height": params.get("height"),
        "full_page": params.get("full_page", True),
        "format": params.get("format", "png"),
        "device_scale": params.get("device_scale", 1),
        "inject_css": params.get("inject_css"),
        "block_ads": params.get("block_ads", False),
    }

    # Remove None values for consistency
    cache_params = {k: v for k, v in cache_params.items() if v is not None}

    # Stable JSON serialization → hash
    serialized = json.dumps(cache_params, sort_keys=True)
    return hashlib.sha256(serialized.encode()).hexdigest()[:32]

TTL Strategy

Different pages have different change frequencies. A hardcoded TTL misses most of the opportunity.

from urllib.parse import urlparse


def get_ttl_seconds(url: str, page_hint: str = "default") -> int:
    """
    Determine appropriate cache TTL based on URL patterns and content type.

    Tune these ranges based on your actual use case:
    - Social previews: content rarely changes once published
    - Dashboards: update frequently, short TTL
    - Product pages: moderate change frequency
    - Documentation: slow-changing, long TTL
    """
    parsed = urlparse(url)
    path = parsed.path.lower()

    TTL_RULES = [
        # (pattern, ttl_seconds, label)
        # Very short: dashboards, real-time monitoring
        (["grafana", "metabase", "tableau", "dashboard", "analytics"], 300, "dashboard"),
        # Short: pricing, frequently updated pages
        (["pricing", "checkout", "cart", "stock", "inventory"], 900, "pricing"),
        # Medium: product pages, landing pages
        (["product", "item", "listing"], 3600, "product"),
        # Long: blog posts, documentation
        (["blog", "docs", "documentation", "article", "post"], 86400, "documentation"),
        # Very long: static assets, archived content
        (["archive", "static", "about"], 604800, "static"),
    ]

    for patterns, ttl, label in TTL_RULES:
        if any(p in path or p in parsed.netloc for p in patterns):
            return ttl

    # Default: 1 hour
    return 3600

Local Disk Cache

Simplest implementation. Good for single-server deployments and development:

import os
import time
from pathlib import Path
import requests

SCREENSHOT_API_KEY = "your-api-key"
SCREENSHOT_API_URL = "https://hermesforge.dev/api/screenshot"


class DiskCache:
    def __init__(self, cache_dir: str = "./screenshot_cache"):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(parents=True, exist_ok=True)

    def _paths(self, key: str) -> tuple[Path, Path]:
        """Return (image_path, meta_path) for a cache key."""
        return (
            self.cache_dir / f"{key}.png",
            self.cache_dir / f"{key}.meta",
        )

    def get(self, key: str) -> bytes | None:
        """Return cached image bytes, or None if expired/missing."""
        img_path, meta_path = self._paths(key)
        if not img_path.exists() or not meta_path.exists():
            return None

        meta = json.loads(meta_path.read_text())
        if time.time() > meta["expires_at"]:
            img_path.unlink(missing_ok=True)
            meta_path.unlink(missing_ok=True)
            return None

        return img_path.read_bytes()

    def set(self, key: str, data: bytes, ttl: int):
        """Store image bytes with TTL."""
        img_path, meta_path = self._paths(key)
        img_path.write_bytes(data)
        meta_path.write_text(json.dumps({
            "cached_at": time.time(),
            "expires_at": time.time() + ttl,
            "ttl": ttl,
            "size_bytes": len(data),
        }))

    def invalidate(self, key: str):
        """Remove a specific cache entry."""
        for path in self._paths(key):
            path.unlink(missing_ok=True)

    def purge_expired(self):
        """Remove all expired cache entries. Run periodically."""
        now = time.time()
        removed = 0
        for meta_path in self.cache_dir.glob("*.meta"):
            try:
                meta = json.loads(meta_path.read_text())
                if now > meta["expires_at"]:
                    meta_path.unlink(missing_ok=True)
                    img_path = meta_path.with_suffix(".png")
                    img_path.unlink(missing_ok=True)
                    removed += 1
            except Exception:
                pass
        return removed

Redis Cache

Better for multi-server deployments and when you want automatic expiration:

import redis


class RedisCache:
    def __init__(self, redis_url: str = "redis://localhost:6379/0", key_prefix: str = "screenshot:"):
        self.client = redis.from_url(redis_url)
        self.prefix = key_prefix

    def _key(self, cache_key: str) -> str:
        return f"{self.prefix}{cache_key}"

    def get(self, key: str) -> bytes | None:
        """Return cached image bytes, or None if missing/expired."""
        data = self.client.get(self._key(key))
        return data  # Redis returns None if key doesn't exist or has expired

    def set(self, key: str, data: bytes, ttl: int):
        """Store image bytes with TTL. Redis handles expiration automatically."""
        self.client.setex(self._key(key), ttl, data)

    def invalidate(self, key: str):
        self.client.delete(self._key(key))

    def invalidate_pattern(self, url_prefix: str):
        """
        Invalidate all screenshots for URLs matching a prefix.
        Useful for cache busting after a deployment.
        Expensive on large keyspaces — use sparingly.
        """
        # Compute all possible keys matching this URL prefix... not feasible without a secondary index.
        # Better approach: store URL→key mappings separately.
        pattern = f"{self.prefix}*"
        cursor = 0
        deleted = 0
        while True:
            cursor, keys = self.client.scan(cursor, match=pattern, count=100)
            for key in keys:
                self.client.delete(key)
                deleted += 1
            if cursor == 0:
                break
        return deleted

The Caching Screenshot Client

Wrap the API call with cache lookup and storage:

from typing import Optional
import logging

logger = logging.getLogger(__name__)


class CachedScreenshotClient:
    def __init__(
        self,
        api_key: str,
        cache: DiskCache | RedisCache,
        api_url: str = "https://hermesforge.dev/api/screenshot",
    ):
        self.api_key = api_key
        self.cache = cache
        self.api_url = api_url
        self._hits = 0
        self._misses = 0

    def screenshot(
        self,
        url: str,
        format: str = "png",
        width: int = 1280,
        full_page: bool = True,
        wait: str = "networkidle",
        block_ads: bool = True,
        inject_css: Optional[str] = None,
        ttl: Optional[int] = None,
        force_refresh: bool = False,
    ) -> bytes:
        """
        Capture a screenshot, serving from cache when available.

        force_refresh=True: bypass cache, always capture fresh screenshot.
        ttl=None: auto-detect TTL from URL pattern.
        """
        params = {
            "url": url,
            "format": format,
            "width": width,
            "full_page": full_page,
            "wait": wait,
            "block_ads": block_ads,
            "inject_css": inject_css,
        }

        cache_key = make_cache_key(url, params)
        effective_ttl = ttl if ttl is not None else get_ttl_seconds(url)

        # Cache lookup
        if not force_refresh:
            cached = self.cache.get(cache_key)
            if cached is not None:
                self._hits += 1
                logger.debug(f"Cache HIT: {url} (key={cache_key[:8]}...)")
                return cached

        # Cache miss — call API
        self._misses += 1
        logger.debug(f"Cache MISS: {url} — calling API")

        api_params = {k: str(v).lower() if isinstance(v, bool) else v
                      for k, v in params.items() if v is not None}

        response = requests.get(
            self.api_url,
            params=api_params,
            headers={"X-API-Key": self.api_key},
            timeout=45,
        )
        response.raise_for_status()

        image_data = response.content
        self.cache.set(cache_key, image_data, effective_ttl)
        logger.debug(f"Cached: {url} TTL={effective_ttl}s")

        return image_data

    @property
    def cache_hit_rate(self) -> float:
        total = self._hits + self._misses
        return self._hits / total if total > 0 else 0.0

    def stats(self) -> dict:
        return {
            "hits": self._hits,
            "misses": self._misses,
            "hit_rate_pct": round(self.cache_hit_rate * 100, 1),
        }

Cache Invalidation on Deploy

When you deploy a new version of your app, cached screenshots of the old UI are stale. Trigger cache invalidation as part of your deploy:

def invalidate_on_deploy(base_url: str, paths: list[str], cache: RedisCache):
    """
    Invalidate cached screenshots for specific paths after a deployment.
    Call from your CI/CD pipeline after successful deploy.
    """
    for path in paths:
        url = f"{base_url.rstrip('/')}/{path.lstrip('/')}"
        # Compute the cache key for common viewport configurations
        for width in [1280, 375, 768]:  # desktop, mobile, tablet
            for full_page in [True, False]:
                key = make_cache_key(url, {"width": width, "full_page": full_page})
                cache.invalidate(key)
                logger.info(f"Invalidated: {url} (w={width}, full_page={full_page})")


# In your deploy script:
# invalidate_on_deploy(
#     "https://your-app.com",
#     ["/pricing", "/features", "/dashboard"],
#     redis_cache
# )

Measuring Cache Effectiveness

Track hit rates to validate your TTL strategy:

import time

def benchmark_cache_effectiveness(
    urls: list[str],
    client: CachedScreenshotClient,
    repeat: int = 3,
) -> dict:
    """
    Measure cache hit rate and latency for a set of URLs.
    Call each URL `repeat` times — first call misses, subsequent calls hit.
    """
    results = []

    for url in urls:
        for i in range(repeat):
            start = time.perf_counter()
            client.screenshot(url, width=1280)
            elapsed_ms = (time.perf_counter() - start) * 1000
            results.append({
                "url": url,
                "attempt": i + 1,
                "latency_ms": round(elapsed_ms, 1),
                "expected": "miss" if i == 0 else "hit",
            })

    stats = client.stats()
    avg_hit_latency = sum(r["latency_ms"] for r in results if r["expected"] == "hit") / max(
        sum(1 for r in results if r["expected"] == "hit"), 1
    )
    avg_miss_latency = sum(r["latency_ms"] for r in results if r["expected"] == "miss") / max(
        sum(1 for r in results if r["expected"] == "miss"), 1
    )

    return {
        "hit_rate_pct": stats["hit_rate_pct"],
        "api_calls_made": stats["misses"],
        "calls_saved": stats["hits"],
        "avg_hit_latency_ms": round(avg_hit_latency, 1),
        "avg_miss_latency_ms": round(avg_miss_latency, 1),
        "speedup_factor": round(avg_miss_latency / avg_hit_latency, 1) if avg_hit_latency > 0 else None,
    }

Practical Impact

For a typical production workload where the same pages are captured multiple times per day:

Use Case Without Cache With Cache (1h TTL) Savings
Social preview service (1000 link shares/day, 200 unique URLs) 1000 calls/day ~200 calls/day 80%
CI/CD visual tests (50 PRs/day, 20 pages each, 3 test runs) 3000 calls/day ~350 calls/day 88%
Competitor monitoring (10 pages, checked 24× daily) 240 calls/day 10 calls/day 96%
Dashboard reports (5 dashboards, 8 email recipients) 40 calls/day 5 calls/day 87%

At these savings rates, a Starter tier (200/day) can support workloads that would otherwise require the Business tier (5000/day).


hermesforge.dev — screenshot API. Free: 10/day. Starter: $4/30 days (200/day). Pro: $9 (1000/day). Business: $29 (5000/day).