Caching Strategies for Screenshot APIs: Reduce API Calls by 80%
If you're making screenshot API calls in production, a significant fraction of them are probably redundant. The same URL requested multiple times in a day. The same product page captured for two different users within minutes. The same social preview generated for a link that hasn't changed in weeks.
A well-designed cache layer can eliminate 60-80% of API calls while improving response latency for cached hits. The engineering is not complex — the key decisions are cache key design, TTL strategy, and storage backend selection.
Cache Key Design
The cache key must uniquely identify a screenshot. Two calls should produce the same cache hit if and only if they would produce an identical screenshot.
What affects the output: - URL (including query parameters, if they affect page content) - Viewport dimensions (width, height) - Full page vs. above-fold - Output format (PNG vs. WebP) - Device pixel ratio - Custom CSS injection - Wait strategy
What doesn't affect the output: - Timestamp (unless passed in the URL) - Requester identity - Request ID
import hashlib
import json
from typing import Any
def make_cache_key(url: str, params: dict[str, Any]) -> str:
"""
Generate a stable cache key from URL and screenshot parameters.
Only includes parameters that affect the output image.
"""
# Normalize URL: strip trailing slashes, lowercase scheme and host
from urllib.parse import urlparse, urlunparse
parsed = urlparse(url)
normalized_url = urlunparse(
parsed._replace(scheme=parsed.scheme.lower(), netloc=parsed.netloc.lower())
)
# Only cache-relevant parameters
cache_params = {
"url": normalized_url,
"width": params.get("width", 1280),
"height": params.get("height"),
"full_page": params.get("full_page", True),
"format": params.get("format", "png"),
"device_scale": params.get("device_scale", 1),
"inject_css": params.get("inject_css"),
"block_ads": params.get("block_ads", False),
}
# Remove None values for consistency
cache_params = {k: v for k, v in cache_params.items() if v is not None}
# Stable JSON serialization → hash
serialized = json.dumps(cache_params, sort_keys=True)
return hashlib.sha256(serialized.encode()).hexdigest()[:32]
TTL Strategy
Different pages have different change frequencies. A hardcoded TTL misses most of the opportunity.
from urllib.parse import urlparse
def get_ttl_seconds(url: str, page_hint: str = "default") -> int:
"""
Determine appropriate cache TTL based on URL patterns and content type.
Tune these ranges based on your actual use case:
- Social previews: content rarely changes once published
- Dashboards: update frequently, short TTL
- Product pages: moderate change frequency
- Documentation: slow-changing, long TTL
"""
parsed = urlparse(url)
path = parsed.path.lower()
TTL_RULES = [
# (pattern, ttl_seconds, label)
# Very short: dashboards, real-time monitoring
(["grafana", "metabase", "tableau", "dashboard", "analytics"], 300, "dashboard"),
# Short: pricing, frequently updated pages
(["pricing", "checkout", "cart", "stock", "inventory"], 900, "pricing"),
# Medium: product pages, landing pages
(["product", "item", "listing"], 3600, "product"),
# Long: blog posts, documentation
(["blog", "docs", "documentation", "article", "post"], 86400, "documentation"),
# Very long: static assets, archived content
(["archive", "static", "about"], 604800, "static"),
]
for patterns, ttl, label in TTL_RULES:
if any(p in path or p in parsed.netloc for p in patterns):
return ttl
# Default: 1 hour
return 3600
Local Disk Cache
Simplest implementation. Good for single-server deployments and development:
import os
import time
from pathlib import Path
import requests
SCREENSHOT_API_KEY = "your-api-key"
SCREENSHOT_API_URL = "https://hermesforge.dev/api/screenshot"
class DiskCache:
def __init__(self, cache_dir: str = "./screenshot_cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
def _paths(self, key: str) -> tuple[Path, Path]:
"""Return (image_path, meta_path) for a cache key."""
return (
self.cache_dir / f"{key}.png",
self.cache_dir / f"{key}.meta",
)
def get(self, key: str) -> bytes | None:
"""Return cached image bytes, or None if expired/missing."""
img_path, meta_path = self._paths(key)
if not img_path.exists() or not meta_path.exists():
return None
meta = json.loads(meta_path.read_text())
if time.time() > meta["expires_at"]:
img_path.unlink(missing_ok=True)
meta_path.unlink(missing_ok=True)
return None
return img_path.read_bytes()
def set(self, key: str, data: bytes, ttl: int):
"""Store image bytes with TTL."""
img_path, meta_path = self._paths(key)
img_path.write_bytes(data)
meta_path.write_text(json.dumps({
"cached_at": time.time(),
"expires_at": time.time() + ttl,
"ttl": ttl,
"size_bytes": len(data),
}))
def invalidate(self, key: str):
"""Remove a specific cache entry."""
for path in self._paths(key):
path.unlink(missing_ok=True)
def purge_expired(self):
"""Remove all expired cache entries. Run periodically."""
now = time.time()
removed = 0
for meta_path in self.cache_dir.glob("*.meta"):
try:
meta = json.loads(meta_path.read_text())
if now > meta["expires_at"]:
meta_path.unlink(missing_ok=True)
img_path = meta_path.with_suffix(".png")
img_path.unlink(missing_ok=True)
removed += 1
except Exception:
pass
return removed
Redis Cache
Better for multi-server deployments and when you want automatic expiration:
import redis
class RedisCache:
def __init__(self, redis_url: str = "redis://localhost:6379/0", key_prefix: str = "screenshot:"):
self.client = redis.from_url(redis_url)
self.prefix = key_prefix
def _key(self, cache_key: str) -> str:
return f"{self.prefix}{cache_key}"
def get(self, key: str) -> bytes | None:
"""Return cached image bytes, or None if missing/expired."""
data = self.client.get(self._key(key))
return data # Redis returns None if key doesn't exist or has expired
def set(self, key: str, data: bytes, ttl: int):
"""Store image bytes with TTL. Redis handles expiration automatically."""
self.client.setex(self._key(key), ttl, data)
def invalidate(self, key: str):
self.client.delete(self._key(key))
def invalidate_pattern(self, url_prefix: str):
"""
Invalidate all screenshots for URLs matching a prefix.
Useful for cache busting after a deployment.
Expensive on large keyspaces — use sparingly.
"""
# Compute all possible keys matching this URL prefix... not feasible without a secondary index.
# Better approach: store URL→key mappings separately.
pattern = f"{self.prefix}*"
cursor = 0
deleted = 0
while True:
cursor, keys = self.client.scan(cursor, match=pattern, count=100)
for key in keys:
self.client.delete(key)
deleted += 1
if cursor == 0:
break
return deleted
The Caching Screenshot Client
Wrap the API call with cache lookup and storage:
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class CachedScreenshotClient:
def __init__(
self,
api_key: str,
cache: DiskCache | RedisCache,
api_url: str = "https://hermesforge.dev/api/screenshot",
):
self.api_key = api_key
self.cache = cache
self.api_url = api_url
self._hits = 0
self._misses = 0
def screenshot(
self,
url: str,
format: str = "png",
width: int = 1280,
full_page: bool = True,
wait: str = "networkidle",
block_ads: bool = True,
inject_css: Optional[str] = None,
ttl: Optional[int] = None,
force_refresh: bool = False,
) -> bytes:
"""
Capture a screenshot, serving from cache when available.
force_refresh=True: bypass cache, always capture fresh screenshot.
ttl=None: auto-detect TTL from URL pattern.
"""
params = {
"url": url,
"format": format,
"width": width,
"full_page": full_page,
"wait": wait,
"block_ads": block_ads,
"inject_css": inject_css,
}
cache_key = make_cache_key(url, params)
effective_ttl = ttl if ttl is not None else get_ttl_seconds(url)
# Cache lookup
if not force_refresh:
cached = self.cache.get(cache_key)
if cached is not None:
self._hits += 1
logger.debug(f"Cache HIT: {url} (key={cache_key[:8]}...)")
return cached
# Cache miss — call API
self._misses += 1
logger.debug(f"Cache MISS: {url} — calling API")
api_params = {k: str(v).lower() if isinstance(v, bool) else v
for k, v in params.items() if v is not None}
response = requests.get(
self.api_url,
params=api_params,
headers={"X-API-Key": self.api_key},
timeout=45,
)
response.raise_for_status()
image_data = response.content
self.cache.set(cache_key, image_data, effective_ttl)
logger.debug(f"Cached: {url} TTL={effective_ttl}s")
return image_data
@property
def cache_hit_rate(self) -> float:
total = self._hits + self._misses
return self._hits / total if total > 0 else 0.0
def stats(self) -> dict:
return {
"hits": self._hits,
"misses": self._misses,
"hit_rate_pct": round(self.cache_hit_rate * 100, 1),
}
Cache Invalidation on Deploy
When you deploy a new version of your app, cached screenshots of the old UI are stale. Trigger cache invalidation as part of your deploy:
def invalidate_on_deploy(base_url: str, paths: list[str], cache: RedisCache):
"""
Invalidate cached screenshots for specific paths after a deployment.
Call from your CI/CD pipeline after successful deploy.
"""
for path in paths:
url = f"{base_url.rstrip('/')}/{path.lstrip('/')}"
# Compute the cache key for common viewport configurations
for width in [1280, 375, 768]: # desktop, mobile, tablet
for full_page in [True, False]:
key = make_cache_key(url, {"width": width, "full_page": full_page})
cache.invalidate(key)
logger.info(f"Invalidated: {url} (w={width}, full_page={full_page})")
# In your deploy script:
# invalidate_on_deploy(
# "https://your-app.com",
# ["/pricing", "/features", "/dashboard"],
# redis_cache
# )
Measuring Cache Effectiveness
Track hit rates to validate your TTL strategy:
import time
def benchmark_cache_effectiveness(
urls: list[str],
client: CachedScreenshotClient,
repeat: int = 3,
) -> dict:
"""
Measure cache hit rate and latency for a set of URLs.
Call each URL `repeat` times — first call misses, subsequent calls hit.
"""
results = []
for url in urls:
for i in range(repeat):
start = time.perf_counter()
client.screenshot(url, width=1280)
elapsed_ms = (time.perf_counter() - start) * 1000
results.append({
"url": url,
"attempt": i + 1,
"latency_ms": round(elapsed_ms, 1),
"expected": "miss" if i == 0 else "hit",
})
stats = client.stats()
avg_hit_latency = sum(r["latency_ms"] for r in results if r["expected"] == "hit") / max(
sum(1 for r in results if r["expected"] == "hit"), 1
)
avg_miss_latency = sum(r["latency_ms"] for r in results if r["expected"] == "miss") / max(
sum(1 for r in results if r["expected"] == "miss"), 1
)
return {
"hit_rate_pct": stats["hit_rate_pct"],
"api_calls_made": stats["misses"],
"calls_saved": stats["hits"],
"avg_hit_latency_ms": round(avg_hit_latency, 1),
"avg_miss_latency_ms": round(avg_miss_latency, 1),
"speedup_factor": round(avg_miss_latency / avg_hit_latency, 1) if avg_hit_latency > 0 else None,
}
Practical Impact
For a typical production workload where the same pages are captured multiple times per day:
| Use Case | Without Cache | With Cache (1h TTL) | Savings |
|---|---|---|---|
| Social preview service (1000 link shares/day, 200 unique URLs) | 1000 calls/day | ~200 calls/day | 80% |
| CI/CD visual tests (50 PRs/day, 20 pages each, 3 test runs) | 3000 calls/day | ~350 calls/day | 88% |
| Competitor monitoring (10 pages, checked 24× daily) | 240 calls/day | 10 calls/day | 96% |
| Dashboard reports (5 dashboards, 8 email recipients) | 40 calls/day | 5 calls/day | 87% |
At these savings rates, a Starter tier (200/day) can support workloads that would otherwise require the Business tier (5000/day).
hermesforge.dev — screenshot API. Free: 10/day. Starter: $4/30 days (200/day). Pro: $9 (1000/day). Business: $29 (5000/day).