How to Cache Screenshot API Responses in Python
Every screenshot API call costs you two things: money and time. A cached response costs neither.
For most screenshot use cases — OG preview generation, visual regression baselines, monitoring dashboards — the same URL is captured repeatedly. Caching the result means the first capture is expensive; subsequent ones are free.
When Caching Makes Sense
Good candidates for caching: - Static pages (marketing sites, docs, landing pages) - Pages where visual changes are infrequent and expected - Repeated captures of the same URL across different sessions - CI/CD visual regression tests where the baseline is stable
Not suitable for caching: - Live dashboards with real-time data - Authenticated pages showing user-specific content - Pages with dynamic content (timestamps, live counts) - Cases where you specifically need the current visual state
The Layered Cache Architecture
Request → In-Memory Cache (fast) → File Cache (persistent) → API (expensive)
Two layers serve different purposes: - In-memory LRU: Sub-millisecond lookups for hot URLs. Lost on process restart. - File cache: Survives restarts. Slower than memory, faster than API.
Implementation
import hashlib
import json
import time
from collections import OrderedDict
from pathlib import Path
import requests
API_BASE = "https://hermesforge.dev/api"
API_KEY = "your-api-key"
CACHE_DIR = Path("screenshot_cache")
CACHE_DIR.mkdir(exist_ok=True)
class LRUCache:
"""Simple in-memory LRU cache with TTL."""
def __init__(self, max_size: int = 100, ttl_seconds: int = 3600):
self.max_size = max_size
self.ttl = ttl_seconds
self._cache: OrderedDict[str, tuple[float, bytes]] = OrderedDict()
def _key(self, url: str, width: int, height: int) -> str:
raw = f"{url}:{width}:{height}"
return hashlib.sha256(raw.encode()).hexdigest()[:16]
def get(self, url: str, width: int, height: int) -> bytes | None:
key = self._key(url, width, height)
if key not in self._cache:
return None
timestamp, data = self._cache[key]
if time.time() - timestamp > self.ttl:
del self._cache[key]
return None
# Move to end (most recently used)
self._cache.move_to_end(key)
return data
def set(self, url: str, width: int, height: int, data: bytes) -> None:
key = self._key(url, width, height)
self._cache[key] = (time.time(), data)
self._cache.move_to_end(key)
# Evict oldest if over capacity
while len(self._cache) > self.max_size:
self._cache.popitem(last=False)
def stats(self) -> dict:
now = time.time()
live = sum(1 for ts, _ in self._cache.values() if now - ts <= self.ttl)
return {"size": len(self._cache), "live_entries": live, "ttl_seconds": self.ttl}
class FileCache:
"""File-based cache that persists across process restarts."""
def __init__(self, cache_dir: Path, ttl_seconds: int = 86400):
self.cache_dir = cache_dir
self.ttl = ttl_seconds
self._index_path = cache_dir / "index.json"
self._index = self._load_index()
def _load_index(self) -> dict:
if self._index_path.exists():
try:
return json.loads(self._index_path.read_text())
except (json.JSONDecodeError, OSError):
return {}
return {}
def _save_index(self) -> None:
self._index_path.write_text(json.dumps(self._index, indent=2))
def _cache_key(self, url: str, width: int, height: int) -> str:
raw = f"{url}:{width}:{height}"
return hashlib.sha256(raw.encode()).hexdigest()
def get(self, url: str, width: int, height: int) -> bytes | None:
key = self._cache_key(url, width, height)
if key not in self._index:
return None
entry = self._index[key]
if time.time() - entry["timestamp"] > self.ttl:
# Expired — clean up
cache_file = self.cache_dir / f"{key}.png"
cache_file.unlink(missing_ok=True)
del self._index[key]
self._save_index()
return None
cache_file = self.cache_dir / f"{key}.png"
if not cache_file.exists():
del self._index[key]
self._save_index()
return None
return cache_file.read_bytes()
def set(self, url: str, width: int, height: int, data: bytes) -> None:
key = self._cache_key(url, width, height)
cache_file = self.cache_dir / f"{key}.png"
cache_file.write_bytes(data)
self._index[key] = {
"url": url,
"width": width,
"height": height,
"timestamp": time.time(),
}
self._save_index()
def purge_expired(self) -> int:
"""Remove expired entries. Returns count removed."""
now = time.time()
expired = [
k for k, v in self._index.items()
if now - v["timestamp"] > self.ttl
]
for key in expired:
(self.cache_dir / f"{key}.png").unlink(missing_ok=True)
del self._index[key]
if expired:
self._save_index()
return len(expired)
The Unified Screenshot Client
class CachedScreenshotClient:
"""Screenshot API client with layered caching."""
def __init__(
self,
api_key: str,
cache_dir: Path = CACHE_DIR,
memory_ttl: int = 3600, # 1 hour in-memory
file_ttl: int = 86400, # 24 hours on disk
memory_size: int = 100,
):
self.api_key = api_key
self.memory_cache = LRUCache(max_size=memory_size, ttl_seconds=memory_ttl)
self.file_cache = FileCache(cache_dir=cache_dir, ttl_seconds=file_ttl)
self._hits = {"memory": 0, "file": 0, "api": 0}
def capture(
self,
url: str,
width: int = 1280,
height: int = 800,
force_refresh: bool = False,
) -> bytes:
"""
Capture a screenshot with caching.
Args:
url: Page URL to capture
width: Viewport width in pixels
height: Viewport height in pixels
force_refresh: Bypass cache and fetch fresh
Returns:
Screenshot bytes (PNG)
"""
if not force_refresh:
# L1: memory cache
cached = self.memory_cache.get(url, width, height)
if cached is not None:
self._hits["memory"] += 1
return cached
# L2: file cache
cached = self.file_cache.get(url, width, height)
if cached is not None:
self._hits["file"] += 1
# Promote to memory cache
self.memory_cache.set(url, width, height, cached)
return cached
# L3: API call
self._hits["api"] += 1
data = self._fetch_from_api(url, width, height)
self.memory_cache.set(url, width, height, data)
self.file_cache.set(url, width, height, data)
return data
def _fetch_from_api(self, url: str, width: int, height: int) -> bytes:
response = requests.get(
f"{API_BASE}/screenshot",
params={"url": url, "width": width, "height": height, "format": "png"},
headers={"X-API-Key": self.api_key},
timeout=30,
)
response.raise_for_status()
return response.content
def cache_stats(self) -> dict:
total = sum(self._hits.values())
return {
"hits": self._hits.copy(),
"total_requests": total,
"api_savings_percent": round(
(1 - self._hits["api"] / total) * 100 if total > 0 else 0, 1
),
"memory_cache": self.memory_cache.stats(),
}
Usage
client = CachedScreenshotClient(api_key=API_KEY)
# First capture — hits API
img1 = client.capture("https://example.com/pricing")
# Second capture — hits memory cache
img2 = client.capture("https://example.com/pricing")
# Force fresh capture (e.g., after a deploy)
img3 = client.capture("https://example.com/pricing", force_refresh=True)
print(client.cache_stats())
# {
# "hits": {"memory": 1, "file": 0, "api": 2},
# "total_requests": 3,
# "api_savings_percent": 33.3,
# "memory_cache": {"size": 1, "live_entries": 1, "ttl_seconds": 3600}
# }
Batch Capture with Caching
Combining caching with concurrent requests maximizes efficiency:
from concurrent.futures import ThreadPoolExecutor, as_completed
def batch_capture(
client: CachedScreenshotClient,
urls: list[str],
width: int = 1280,
height: int = 800,
max_workers: int = 5,
) -> dict[str, bytes | Exception]:
results = {}
def capture_one(url):
return url, client.capture(url, width, height)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(capture_one, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
_, data = future.result()
results[url] = data
except Exception as e:
results[url] = e
return results
# Capture a list of pages — cached URLs are free, uncached hit the API in parallel
pages = [
"https://example.com",
"https://example.com/pricing",
"https://example.com/docs",
"https://example.com/blog",
]
screenshots = batch_capture(client, pages)
print(client.cache_stats())
Cache Invalidation Strategy
The hardest problem in caching is knowing when to invalidate.
Time-based (what we've built): Sufficient for most use cases. Set TTL based on how frequently the page changes.
Content-based: Fetch a lightweight signal first (e.g., HTTP Last-Modified or ETag header), compare to cached metadata. Only re-capture if the content has changed.
def should_refresh(url: str, cached_at: float) -> bool:
"""Check if page has been modified since last capture."""
try:
response = requests.head(url, timeout=5, allow_redirects=True)
last_modified = response.headers.get("Last-Modified")
if last_modified:
from email.utils import parsedate_to_datetime
lm_ts = parsedate_to_datetime(last_modified).timestamp()
return lm_ts > cached_at
except requests.RequestException:
pass
return False # Assume unchanged on error
Event-based: Invalidate on deploy. Webhook from CI/CD → call client.capture(url, force_refresh=True) for key pages.
Choosing TTL Values
| Page type | Recommended TTL |
|---|---|
| Highly dynamic (news, live data) | No caching |
| Blog posts, docs | 24–72 hours |
| Marketing pages | 12–24 hours |
| Visual regression baselines | Until next intentional update |
| Competitor monitoring | 24 hours |
API documentation and key creation at hermesforge.dev/api. Free tier: 50 screenshots/day. Caching makes this tier go further — cache hit rate of 80% means 50 API calls service 250 total requests.