Automating E-Commerce Product Images with the Screenshot API
Product images are the highest-converting element on any product page, but maintaining a screenshot-based catalog — where images reflect the actual rendered page rather than a raw asset — is operationally expensive at scale. The Screenshot API reduces this to a single HTTP call per product: load the product URL, wait for images and price data to render, capture the result.
This is particularly useful for: - Marketplaces that aggregate products from multiple vendors (capture the vendor's page directly) - Price comparison tools that need a visual representation of the current offer - Catalog archiving for auditing how products looked at a point in time - Preview generation for email campaigns and social sharing
Core Capture Function
import requests
import os
HERMES_API_KEY = os.environ["HERMES_API_KEY"]
def capture_product_page(
url: str,
width: int = 1280,
wait_ms: int = 2000,
clip_height: int | None = None,
) -> bytes:
"""
Capture a product detail page as WebP.
wait_ms: wait for JS-rendered prices, inventory badges, image carousels.
clip_height: if set, captures only the top N pixels (above-the-fold product card).
"""
params = {
"url": url,
"format": "webp",
"width": width,
"full_page": clip_height is None,
"wait": wait_ms,
}
if clip_height:
params["clip_height"] = clip_height
resp = requests.get(
"https://hermesforge.dev/api/screenshot",
headers={"X-API-Key": HERMES_API_KEY},
params=params,
timeout=90,
)
resp.raise_for_status()
return resp.content
Shopify: Bulk Product Capture
Shopify's product URLs follow a predictable structure. Pull the product list from the Admin API and capture each:
import shopify
import boto3
import time
from datetime import datetime, timezone
from pathlib import Path
s3 = boto3.client("s3")
CATALOG_BUCKET = os.environ["CATALOG_BUCKET"]
SHOP_URL = os.environ["SHOPIFY_SHOP_URL"] # e.g. mystore.myshopify.com
ACCESS_TOKEN = os.environ["SHOPIFY_ACCESS_TOKEN"]
def get_shopify_products(limit: int = 250) -> list[dict]:
"""Fetch all products via Shopify Admin API."""
session = shopify.Session(SHOP_URL, "2024-01", ACCESS_TOKEN)
shopify.ShopifyResource.activate_session(session)
products = []
page = shopify.Product.find(limit=limit)
while True:
products.extend([p.to_dict() for p in page])
if not page.has_next_page():
break
page = page.next_page()
return products
def product_url(shop_domain: str, handle: str) -> str:
return f"https://{shop_domain}/products/{handle}"
def archive_product_screenshot(product: dict, shop_domain: str) -> str | None:
"""Capture and archive one product's screenshot."""
handle = product["handle"]
product_id = product["id"]
url = product_url(shop_domain, handle)
try:
image_data = capture_product_page(url, width=1440, wait_ms=2500)
except requests.HTTPError as e:
print(f"Capture failed for {handle}: {e}")
return None
ts = datetime.now(timezone.utc).strftime("%Y/%m/%d")
key = f"products/{product_id}/{ts}/screenshot.webp"
s3.put_object(
Bucket=CATALOG_BUCKET,
Key=key,
Body=image_data,
ContentType="image/webp",
Metadata={
"product_id": str(product_id),
"handle": handle,
"captured_at": datetime.now(timezone.utc).isoformat(),
},
)
return f"s3://{CATALOG_BUCKET}/{key}"
def bulk_capture_shopify(shop_domain: str, concurrency: int = 3) -> dict:
"""Capture screenshots for all products in the Shopify store."""
import concurrent.futures
products = get_shopify_products()
results = {"captured": [], "failed": []}
def process(product: dict) -> tuple[str, str | None]:
path = archive_product_screenshot(product, shop_domain)
return product["handle"], path
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as ex:
futures = {ex.submit(process, p): p for p in products}
for future in concurrent.futures.as_completed(futures):
handle, path = future.result()
if path:
results["captured"].append(handle)
else:
results["failed"].append(handle)
time.sleep(0.3) # Gentle rate limiting
print(f"Captured: {len(results['captured'])}, Failed: {len(results['failed'])}")
return results
Shopify: Variant Screenshots
Products with color or size variants may render differently. Capture each variant by appending the variant ID to the URL:
def capture_product_variants(product: dict, shop_domain: str) -> list[dict]:
"""
Capture screenshots for each variant of a Shopify product.
Shopify renders the selected variant when ?variant=ID is appended.
"""
base_handle = product["handle"]
results = []
for variant in product.get("variants", []):
variant_id = variant["id"]
variant_url = f"https://{shop_domain}/products/{base_handle}?variant={variant_id}"
try:
image_data = capture_product_page(
variant_url,
width=1440,
wait_ms=2500,
clip_height=900, # Above-the-fold product card
)
results.append({
"variant_id": variant_id,
"title": variant.get("title"),
"image_data": image_data,
"captured_at": datetime.now(timezone.utc).isoformat(),
})
except Exception as e:
print(f"Variant {variant_id} failed: {e}")
return results
WooCommerce: Bulk Capture
WooCommerce exposes products via its REST API:
from woocommerce import API as WooAPI
woo = WooAPI(
url=os.environ["WOO_STORE_URL"],
consumer_key=os.environ["WOO_CONSUMER_KEY"],
consumer_secret=os.environ["WOO_CONSUMER_SECRET"],
version="wc/v3",
timeout=30,
)
def get_woo_products(per_page: int = 100) -> list[dict]:
"""Fetch all WooCommerce products, paginated."""
products = []
page = 1
while True:
batch = woo.get("products", params={"per_page": per_page, "page": page}).json()
if not batch:
break
products.extend(batch)
page += 1
return products
def capture_woo_product(product: dict) -> bytes | None:
"""Capture a WooCommerce product page."""
permalink = product.get("permalink")
if not permalink:
return None
try:
return capture_product_page(permalink, width=1440, wait_ms=2000)
except Exception as e:
print(f"Failed {product.get('slug', product.get('id'))}: {e}")
return None
def bulk_capture_woo(output_dir: str, concurrency: int = 3) -> dict:
"""Capture and save screenshots for all WooCommerce products."""
import concurrent.futures
Path(output_dir).mkdir(parents=True, exist_ok=True)
products = get_woo_products()
results = {"success": [], "failed": []}
def process(product: dict) -> tuple[int, bool]:
product_id = product["id"]
slug = product.get("slug", str(product_id))
data = capture_woo_product(product)
if data:
path = f"{output_dir}/{slug}.webp"
Path(path).write_bytes(data)
return product_id, True
return product_id, False
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as ex:
futures = {ex.submit(process, p): p for p in products}
for future in concurrent.futures.as_completed(futures):
product_id, ok = future.result()
(results["success"] if ok else results["failed"]).append(product_id)
time.sleep(0.3)
return results
Webhook-Triggered Capture on Product Update
Capture a fresh screenshot whenever a product is updated:
from flask import Flask, request
app = Flask(__name__)
SHOPIFY_WEBHOOK_SECRET = os.environ["SHOPIFY_WEBHOOK_SECRET"]
@app.route("/webhooks/shopify/products/update", methods=["POST"])
def shopify_product_updated():
import hmac
import hashlib
import base64
# Verify webhook signature
digest = hmac.new(
SHOPIFY_WEBHOOK_SECRET.encode(),
request.data,
hashlib.sha256,
).digest()
computed = base64.b64encode(digest).decode()
received = request.headers.get("X-Shopify-Hmac-Sha256", "")
if not hmac.compare_digest(computed, received):
return "", 401
product = request.json
handle = product.get("handle")
shop = request.headers.get("X-Shopify-Shop-Domain")
if handle and shop:
# Queue async to avoid blocking Shopify's 5s webhook timeout
capture_product_job.delay(handle, shop)
return "", 200
# Celery task
from celery import Celery
celery = Celery("tasks", broker=os.environ["REDIS_URL"])
@celery.task(max_retries=3, default_retry_delay=30)
def capture_product_job(handle: str, shop_domain: str):
url = f"https://{shop_domain}/products/{handle}"
image_data = capture_product_page(url, width=1440, wait_ms=2500)
# Store in S3, keyed by handle + timestamp
ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
key = f"products/{handle}/{ts}.webp"
s3.put_object(
Bucket=CATALOG_BUCKET,
Key=key,
Body=image_data,
ContentType="image/webp",
)
print(f"Updated screenshot: {handle} → s3://{CATALOG_BUCKET}/{key}")
Price Comparison: Capturing Competitor Product Pages
For price comparison tools, capture the competitor's product page alongside your own:
from dataclasses import dataclass
@dataclass
class ProductComparison:
product_name: str
your_url: str
competitor_urls: list[str]
def capture_comparison(comparison: ProductComparison, output_dir: str):
"""Capture your product page and all competitor pages."""
Path(output_dir).mkdir(parents=True, exist_ok=True)
slug = comparison.product_name.lower().replace(" ", "-")
# Capture your page
your_data = capture_product_page(comparison.your_url, width=1440, wait_ms=2000)
Path(f"{output_dir}/{slug}-yours.webp").write_bytes(your_data)
# Capture competitors
for i, comp_url in enumerate(comparison.competitor_urls, 1):
try:
comp_data = capture_product_page(comp_url, width=1440, wait_ms=2500)
Path(f"{output_dir}/{slug}-competitor-{i}.webp").write_bytes(comp_data)
time.sleep(1) # Polite interval between external requests
except Exception as e:
print(f"Competitor {i} capture failed: {e}")
CDN Delivery for Email Campaigns
Product screenshots used in email campaigns should be served from a CDN with stable URLs:
def get_or_create_product_image(
product_url: str,
product_id: str,
force: bool = False,
) -> str:
"""
Return a CDN URL for a product screenshot.
Checks S3 first; captures only if missing or force=True.
"""
CDN_BASE = os.environ["CDN_BASE"]
key = f"products/{product_id}/latest.webp"
if not force:
try:
s3.head_object(Bucket=CATALOG_BUCKET, Key=key)
return f"{CDN_BASE}/{key}"
except s3.exceptions.ClientError:
pass # Not found — capture it
image_data = capture_product_page(product_url, width=1440, wait_ms=2000)
s3.put_object(
Bucket=CATALOG_BUCKET,
Key=key,
Body=image_data,
ContentType="image/webp",
CacheControl="public, max-age=86400",
)
return f"{CDN_BASE}/{key}"
Wait Times by Platform
| Platform | Recommended wait | Reason |
|---|---|---|
| Shopify | 2000–2500ms | Dynamic pricing apps, inventory badges |
| WooCommerce | 1500–2000ms | Page builders (Elementor, Divi) take time |
| Magento | 3000ms | Heavy JS bundles, lazy-loaded images |
| BigCommerce | 2000ms | Faceted navigation, variant switchers |
| Custom React storefronts | 2500ms | SSR hydration + data fetch |
For pages with image carousels, wait until the first carousel image loads — typically 2000ms is sufficient. If captures show loading spinners, increase by 500ms increments.
Catalog Sync: Full Refresh Schedule
Run a full catalog refresh nightly during low-traffic hours:
import schedule
import time
def nightly_catalog_refresh():
"""Full catalog screenshot refresh for Shopify store."""
shop_domain = os.environ["SHOPIFY_SHOP_DOMAIN"]
print(f"Starting nightly catalog refresh for {shop_domain}")
results = bulk_capture_shopify(shop_domain, concurrency=2)
print(
f"Refresh complete: {len(results['captured'])} captured, "
f"{len(results['failed'])} failed"
)
# Schedule for 02:00 UTC nightly
schedule.every().day.at("02:00").do(nightly_catalog_refresh)
if __name__ == "__main__":
while True:
schedule.run_pending()
time.sleep(60)
Free API key at hermesforge.dev. 50 captures/day, no credit card required.