How to Cache Screenshot API Responses in Python

2026-04-28 | Tags: [python, api, caching, performance, screenshot]

Every screenshot API call costs you two things: money and time. A cached response costs neither.

For most screenshot use cases — OG preview generation, visual regression baselines, monitoring dashboards — the same URL is captured repeatedly. Caching the result means the first capture is expensive; subsequent ones are free.

When Caching Makes Sense

Good candidates for caching: - Static pages (marketing sites, docs, landing pages) - Pages where visual changes are infrequent and expected - Repeated captures of the same URL across different sessions - CI/CD visual regression tests where the baseline is stable

Not suitable for caching: - Live dashboards with real-time data - Authenticated pages showing user-specific content - Pages with dynamic content (timestamps, live counts) - Cases where you specifically need the current visual state

The Layered Cache Architecture

Request → In-Memory Cache (fast) → File Cache (persistent) → API (expensive)

Two layers serve different purposes: - In-memory LRU: Sub-millisecond lookups for hot URLs. Lost on process restart. - File cache: Survives restarts. Slower than memory, faster than API.

Implementation

import hashlib
import json
import time
from collections import OrderedDict
from pathlib import Path
import requests

API_BASE = "https://hermesforge.dev/api"
API_KEY = "your-api-key"
CACHE_DIR = Path("screenshot_cache")
CACHE_DIR.mkdir(exist_ok=True)


class LRUCache:
    """Simple in-memory LRU cache with TTL."""

    def __init__(self, max_size: int = 100, ttl_seconds: int = 3600):
        self.max_size = max_size
        self.ttl = ttl_seconds
        self._cache: OrderedDict[str, tuple[float, bytes]] = OrderedDict()

    def _key(self, url: str, width: int, height: int) -> str:
        raw = f"{url}:{width}:{height}"
        return hashlib.sha256(raw.encode()).hexdigest()[:16]

    def get(self, url: str, width: int, height: int) -> bytes | None:
        key = self._key(url, width, height)
        if key not in self._cache:
            return None
        timestamp, data = self._cache[key]
        if time.time() - timestamp > self.ttl:
            del self._cache[key]
            return None
        # Move to end (most recently used)
        self._cache.move_to_end(key)
        return data

    def set(self, url: str, width: int, height: int, data: bytes) -> None:
        key = self._key(url, width, height)
        self._cache[key] = (time.time(), data)
        self._cache.move_to_end(key)
        # Evict oldest if over capacity
        while len(self._cache) > self.max_size:
            self._cache.popitem(last=False)

    def stats(self) -> dict:
        now = time.time()
        live = sum(1 for ts, _ in self._cache.values() if now - ts <= self.ttl)
        return {"size": len(self._cache), "live_entries": live, "ttl_seconds": self.ttl}


class FileCache:
    """File-based cache that persists across process restarts."""

    def __init__(self, cache_dir: Path, ttl_seconds: int = 86400):
        self.cache_dir = cache_dir
        self.ttl = ttl_seconds
        self._index_path = cache_dir / "index.json"
        self._index = self._load_index()

    def _load_index(self) -> dict:
        if self._index_path.exists():
            try:
                return json.loads(self._index_path.read_text())
            except (json.JSONDecodeError, OSError):
                return {}
        return {}

    def _save_index(self) -> None:
        self._index_path.write_text(json.dumps(self._index, indent=2))

    def _cache_key(self, url: str, width: int, height: int) -> str:
        raw = f"{url}:{width}:{height}"
        return hashlib.sha256(raw.encode()).hexdigest()

    def get(self, url: str, width: int, height: int) -> bytes | None:
        key = self._cache_key(url, width, height)
        if key not in self._index:
            return None

        entry = self._index[key]
        if time.time() - entry["timestamp"] > self.ttl:
            # Expired — clean up
            cache_file = self.cache_dir / f"{key}.png"
            cache_file.unlink(missing_ok=True)
            del self._index[key]
            self._save_index()
            return None

        cache_file = self.cache_dir / f"{key}.png"
        if not cache_file.exists():
            del self._index[key]
            self._save_index()
            return None

        return cache_file.read_bytes()

    def set(self, url: str, width: int, height: int, data: bytes) -> None:
        key = self._cache_key(url, width, height)
        cache_file = self.cache_dir / f"{key}.png"
        cache_file.write_bytes(data)
        self._index[key] = {
            "url": url,
            "width": width,
            "height": height,
            "timestamp": time.time(),
        }
        self._save_index()

    def purge_expired(self) -> int:
        """Remove expired entries. Returns count removed."""
        now = time.time()
        expired = [
            k for k, v in self._index.items()
            if now - v["timestamp"] > self.ttl
        ]
        for key in expired:
            (self.cache_dir / f"{key}.png").unlink(missing_ok=True)
            del self._index[key]
        if expired:
            self._save_index()
        return len(expired)

The Unified Screenshot Client

class CachedScreenshotClient:
    """Screenshot API client with layered caching."""

    def __init__(
        self,
        api_key: str,
        cache_dir: Path = CACHE_DIR,
        memory_ttl: int = 3600,    # 1 hour in-memory
        file_ttl: int = 86400,     # 24 hours on disk
        memory_size: int = 100,
    ):
        self.api_key = api_key
        self.memory_cache = LRUCache(max_size=memory_size, ttl_seconds=memory_ttl)
        self.file_cache = FileCache(cache_dir=cache_dir, ttl_seconds=file_ttl)
        self._hits = {"memory": 0, "file": 0, "api": 0}

    def capture(
        self,
        url: str,
        width: int = 1280,
        height: int = 800,
        force_refresh: bool = False,
    ) -> bytes:
        """
        Capture a screenshot with caching.

        Args:
            url: Page URL to capture
            width: Viewport width in pixels
            height: Viewport height in pixels
            force_refresh: Bypass cache and fetch fresh

        Returns:
            Screenshot bytes (PNG)
        """
        if not force_refresh:
            # L1: memory cache
            cached = self.memory_cache.get(url, width, height)
            if cached is not None:
                self._hits["memory"] += 1
                return cached

            # L2: file cache
            cached = self.file_cache.get(url, width, height)
            if cached is not None:
                self._hits["file"] += 1
                # Promote to memory cache
                self.memory_cache.set(url, width, height, cached)
                return cached

        # L3: API call
        self._hits["api"] += 1
        data = self._fetch_from_api(url, width, height)
        self.memory_cache.set(url, width, height, data)
        self.file_cache.set(url, width, height, data)
        return data

    def _fetch_from_api(self, url: str, width: int, height: int) -> bytes:
        response = requests.get(
            f"{API_BASE}/screenshot",
            params={"url": url, "width": width, "height": height, "format": "png"},
            headers={"X-API-Key": self.api_key},
            timeout=30,
        )
        response.raise_for_status()
        return response.content

    def cache_stats(self) -> dict:
        total = sum(self._hits.values())
        return {
            "hits": self._hits.copy(),
            "total_requests": total,
            "api_savings_percent": round(
                (1 - self._hits["api"] / total) * 100 if total > 0 else 0, 1
            ),
            "memory_cache": self.memory_cache.stats(),
        }

Usage

client = CachedScreenshotClient(api_key=API_KEY)

# First capture — hits API
img1 = client.capture("https://example.com/pricing")

# Second capture — hits memory cache
img2 = client.capture("https://example.com/pricing")

# Force fresh capture (e.g., after a deploy)
img3 = client.capture("https://example.com/pricing", force_refresh=True)

print(client.cache_stats())
# {
#   "hits": {"memory": 1, "file": 0, "api": 2},
#   "total_requests": 3,
#   "api_savings_percent": 33.3,
#   "memory_cache": {"size": 1, "live_entries": 1, "ttl_seconds": 3600}
# }

Batch Capture with Caching

Combining caching with concurrent requests maximizes efficiency:

from concurrent.futures import ThreadPoolExecutor, as_completed

def batch_capture(
    client: CachedScreenshotClient,
    urls: list[str],
    width: int = 1280,
    height: int = 800,
    max_workers: int = 5,
) -> dict[str, bytes | Exception]:
    results = {}

    def capture_one(url):
        return url, client.capture(url, width, height)

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(capture_one, url): url for url in urls}
        for future in as_completed(futures):
            url = futures[future]
            try:
                _, data = future.result()
                results[url] = data
            except Exception as e:
                results[url] = e

    return results


# Capture a list of pages — cached URLs are free, uncached hit the API in parallel
pages = [
    "https://example.com",
    "https://example.com/pricing",
    "https://example.com/docs",
    "https://example.com/blog",
]
screenshots = batch_capture(client, pages)
print(client.cache_stats())

Cache Invalidation Strategy

The hardest problem in caching is knowing when to invalidate.

Time-based (what we've built): Sufficient for most use cases. Set TTL based on how frequently the page changes.

Content-based: Fetch a lightweight signal first (e.g., HTTP Last-Modified or ETag header), compare to cached metadata. Only re-capture if the content has changed.

def should_refresh(url: str, cached_at: float) -> bool:
    """Check if page has been modified since last capture."""
    try:
        response = requests.head(url, timeout=5, allow_redirects=True)
        last_modified = response.headers.get("Last-Modified")
        if last_modified:
            from email.utils import parsedate_to_datetime
            lm_ts = parsedate_to_datetime(last_modified).timestamp()
            return lm_ts > cached_at
    except requests.RequestException:
        pass
    return False  # Assume unchanged on error

Event-based: Invalidate on deploy. Webhook from CI/CD → call client.capture(url, force_refresh=True) for key pages.

Choosing TTL Values

Page type Recommended TTL
Highly dynamic (news, live data) No caching
Blog posts, docs 24–72 hours
Marketing pages 12–24 hours
Visual regression baselines Until next intentional update
Competitor monitoring 24 hours

API documentation and key creation at hermesforge.dev/api. Free tier: 50 screenshots/day. Caching makes this tier go further — cache hit rate of 80% means 50 API calls service 250 total requests.