Using a Screenshot API in Google Cloud Functions: Serverless Visual Capture on GCP
Google Cloud Functions brings the same serverless economics as AWS Lambda — pay per invocation, scale to zero, no server management — with GCP-native integrations that make certain patterns more natural: Cloud Storage triggers, Pub/Sub fan-out, Cloud Scheduler cron, and Secret Manager for credentials. This guide covers the GCF-specific patterns for screenshot API integrations, with Terraform deployment throughout.
GCF vs Lambda: Key Differences for Screenshot Workflows
| Aspect | Google Cloud Functions | AWS Lambda |
|---|---|---|
| Max timeout | 60 minutes (2nd gen) | 15 minutes |
| Max memory | 32GB (2nd gen) | 10GB |
| Default timeout | 60 seconds | 3 seconds |
| Storage integration | Cloud Storage | S3 |
| Scheduling | Cloud Scheduler | EventBridge |
| Secrets | Secret Manager | SSM Parameter Store |
| Deployment | gcloud / Terraform | SAM / CDK |
| Cold start (Python) | ~400ms | ~250ms |
GCF's 60-minute timeout limit is a meaningful advantage for batch screenshot workflows — long-running capture jobs that would require Step Functions on AWS can run as a single GCF invocation.
Basic Cloud Function
import functions_framework
import os
import urllib.request
import urllib.parse
import urllib.error
import json
@functions_framework.http
def capture_screenshot(request):
"""
HTTP-triggered Cloud Function for screenshot capture.
Accepts JSON body: {"url": "...", "format": "webp"}
"""
if request.method != "POST":
return json.dumps({"error": "POST required"}), 405, {"Content-Type": "application/json"}
try:
body = request.get_json(force=True)
except Exception:
return json.dumps({"error": "Invalid JSON"}), 400, {"Content-Type": "application/json"}
target_url = body.get("url")
if not target_url:
return json.dumps({"error": "url is required"}), 400, {"Content-Type": "application/json"}
api_key = os.environ["SCREENSHOT_API_KEY"]
fmt = body.get("format", "webp")
params = urllib.parse.urlencode({
"url": target_url,
"format": fmt,
"full_page": "true",
"block_ads": "true",
})
req = urllib.request.Request(
f"https://hermesforge.dev/api/screenshot?{params}",
headers={"X-API-Key": api_key},
)
try:
with urllib.request.urlopen(req, timeout=25) as resp:
image_bytes = resp.read()
except urllib.error.HTTPError as e:
error_body = e.read().decode()
return json.dumps({"error": error_body}), e.code, {"Content-Type": "application/json"}
except urllib.error.URLError as e:
return json.dumps({"error": str(e.reason)}), 502, {"Content-Type": "application/json"}
import base64
return json.dumps({
"bytes": len(image_bytes),
"image_base64": base64.b64encode(image_bytes).decode(),
"format": fmt,
}), 200, {"Content-Type": "application/json"}
# requirements.txt
functions-framework==3.*
Timeout configuration: GCF 2nd gen defaults to 60 seconds. For screenshot-heavy workloads, set explicitly:
gcloud functions deploy capture-screenshot \
--gen2 \
--runtime python312 \
--trigger-http \
--timeout 120s \
--memory 256Mi \
--set-env-vars SCREENSHOT_API_KEY=your-key-here
Cloud Storage Integration
For production use, store screenshots in Cloud Storage rather than returning base64 in the response:
import functions_framework
import os
import urllib.request
import urllib.parse
import urllib.error
import json
from datetime import datetime, UTC
from google.cloud import storage
gcs = storage.Client()
BUCKET_NAME = os.environ["SCREENSHOT_BUCKET"]
API_KEY = os.environ["SCREENSHOT_API_KEY"]
def capture(url: str, fmt: str = "webp") -> bytes:
params = urllib.parse.urlencode({
"url": url,
"format": fmt,
"full_page": "true",
"block_ads": "true",
"viewport_width": "1280",
"viewport_height": "800",
})
req = urllib.request.Request(
f"https://hermesforge.dev/api/screenshot?{params}",
headers={"X-API-Key": API_KEY},
)
with urllib.request.urlopen(req, timeout=25) as resp:
return resp.read()
def store(image_bytes: bytes, blob_name: str, fmt: str) -> str:
"""Store in GCS, return signed URL valid for 1 hour."""
content_types = {"webp": "image/webp", "png": "image/png", "jpeg": "image/jpeg"}
bucket = gcs.bucket(BUCKET_NAME)
blob = bucket.blob(blob_name)
blob.upload_from_string(
image_bytes,
content_type=content_types.get(fmt, "image/webp"),
)
from datetime import timedelta
return blob.generate_signed_url(
expiration=timedelta(hours=1),
method="GET",
version="v4",
)
@functions_framework.http
def capture_and_store(request):
body = request.get_json(force=True) or {}
target_url = body.get("url")
if not target_url:
return json.dumps({"error": "url required"}), 400, {"Content-Type": "application/json"}
fmt = body.get("format", "webp")
ts = datetime.now(UTC).strftime("%Y/%m/%d/%H%M%S")
safe_host = urllib.parse.urlparse(target_url).netloc.replace(".", "-")
blob_name = f"screenshots/{ts}/{safe_host}.{fmt}"
try:
image_bytes = capture(target_url, fmt)
except urllib.error.HTTPError as e:
return json.dumps({"error": e.read().decode()}), e.code, {"Content-Type": "application/json"}
signed_url = store(image_bytes, blob_name, fmt)
return json.dumps({
"gcs_path": f"gs://{BUCKET_NAME}/{blob_name}",
"signed_url": signed_url,
"bytes": len(image_bytes),
"expires_in": 3600,
}), 200, {"Content-Type": "application/json"}
Secret Manager for API Keys
Hardcoding API keys in environment variables works but Secret Manager is the GCP-idiomatic approach for production credentials:
from google.cloud import secretmanager
def get_api_key() -> str:
"""Fetch screenshot API key from Secret Manager."""
client = secretmanager.SecretManagerServiceClient()
project_id = os.environ["GOOGLE_CLOUD_PROJECT"]
name = f"projects/{project_id}/secrets/screenshot-api-key/versions/latest"
response = client.access_secret_version(request={"name": name})
return response.payload.data.decode("utf-8").strip()
# Cache at module level — survives warm invocations
_API_KEY: str | None = None
def api_key() -> str:
global _API_KEY
if _API_KEY is None:
_API_KEY = get_api_key()
return _API_KEY
Module-level caching matters: GCF reuses warm instances across invocations. A Secret Manager fetch on every invocation adds ~50ms. Caching it at module initialization means the latency hits only on cold starts.
Pub/Sub-Triggered Batch Processing
For batch screenshot jobs — processing a queue of URLs from a data pipeline, generating thumbnails for a content ingestion workflow — Pub/Sub provides natural fan-out:
import functions_framework
import base64
import json
import os
import urllib.request
import urllib.parse
import urllib.error
from google.cloud import storage
gcs = storage.Client()
BUCKET_NAME = os.environ["SCREENSHOT_BUCKET"]
API_KEY = os.environ["SCREENSHOT_API_KEY"]
@functions_framework.cloud_event
def process_screenshot_request(cloud_event):
"""
Pub/Sub-triggered function. Message format:
{"url": "https://...", "reference_id": "...", "format": "webp"}
"""
message_data = base64.b64decode(cloud_event.data["message"]["data"]).decode()
try:
payload = json.loads(message_data)
except json.JSONDecodeError as e:
print(f"Invalid message JSON: {e}")
return # Ack the message to avoid infinite retry on bad messages
url = payload.get("url")
reference_id = payload.get("reference_id", cloud_event.data["message"]["message_id"])
fmt = payload.get("format", "webp")
if not url:
print(f"No URL in message {reference_id}, skipping")
return
try:
image_bytes = _capture_with_retry(url, fmt)
except Exception as e:
print(f"FAIL {reference_id}: {e}")
# Raising here causes the message to be redelivered (up to the subscription's retry policy)
raise
blob_name = f"batch/{reference_id}.{fmt}"
bucket = gcs.bucket(BUCKET_NAME)
blob = bucket.blob(blob_name)
blob.upload_from_string(
image_bytes,
content_type=f"image/{fmt}",
)
blob.metadata = {"source_url": url, "reference_id": reference_id}
blob.patch()
print(f"OK {reference_id}: {len(image_bytes)} bytes → gs://{BUCKET_NAME}/{blob_name}")
def _capture_with_retry(url: str, fmt: str, max_attempts: int = 3) -> bytes:
import time
last_error = None
for attempt in range(max_attempts):
try:
params = urllib.parse.urlencode({
"url": url, "format": fmt,
"full_page": "true", "block_ads": "true",
})
req = urllib.request.Request(
f"https://hermesforge.dev/api/screenshot?{params}",
headers={"X-API-Key": API_KEY},
)
with urllib.request.urlopen(req, timeout=25) as resp:
return resp.read()
except urllib.error.HTTPError as e:
if e.code == 429 or e.code >= 500:
retry_after = float(e.headers.get("Retry-After", 2 ** attempt))
time.sleep(retry_after)
last_error = e
else:
raise
except urllib.error.URLError as e:
time.sleep(2 ** attempt)
last_error = e
raise RuntimeError(f"All {max_attempts} attempts failed") from last_error
Configure the Pub/Sub subscription with a dead letter topic and max delivery attempts (typically 5) so persistently failing messages are isolated rather than retried indefinitely.
Cloud Scheduler for Daily Captures
@functions_framework.http
def daily_regulatory_capture(request):
"""
Triggered by Cloud Scheduler (HTTP target).
Captures a fixed set of regulatory pages daily.
"""
capture_date = datetime.now(UTC).strftime("%Y-%m-%d")
pages = [
{"url": "https://example.com/disclosures/best-execution", "label": "best-execution"},
{"url": "https://example.com/disclosures/rts28", "label": "rts28"},
{"url": "https://example.com/disclosures/order-handling", "label": "order-handling"},
]
results = []
for page in pages:
try:
image_bytes = _capture_with_retry(page["url"], "webp")
blob_name = f"regulatory/{capture_date}/{page['label']}.webp"
bucket = gcs.bucket(BUCKET_NAME)
bucket.blob(blob_name).upload_from_string(
image_bytes, content_type="image/webp"
)
results.append({"label": page["label"], "bytes": len(image_bytes), "status": "ok"})
print(f"[OK] {page['label']}: {len(image_bytes)} bytes")
except Exception as e:
results.append({"label": page["label"], "status": "error", "error": str(e)})
print(f"[FAIL] {page['label']}: {e}")
success_count = sum(1 for r in results if r["status"] == "ok")
status_code = 200 if success_count == len(pages) else 207
return json.dumps({
"date": capture_date,
"results": results,
"success": success_count,
"total": len(pages),
}), status_code, {"Content-Type": "application/json"}
Cloud Scheduler config (Terraform below) sends an authenticated HTTP POST to the function URL daily at 13:30 UTC.
Terraform Deployment
# main.tf
terraform {
required_providers {
google = { source = "hashicorp/google", version = "~> 5.0" }
}
}
provider "google" {
project = var.project_id
region = var.region
}
# Cloud Storage bucket for screenshots
resource "google_storage_bucket" "screenshots" {
name = "${var.project_id}-screenshots"
location = var.region
force_destroy = false
lifecycle_rule {
condition { age = 90 }
action { type = "Delete" }
}
uniform_bucket_level_access = true
}
# Secret Manager secret for API key
resource "google_secret_manager_secret" "screenshot_api_key" {
secret_id = "screenshot-api-key"
replication { auto {} }
}
resource "google_secret_manager_secret_version" "screenshot_api_key" {
secret = google_secret_manager_secret.screenshot_api_key.id
secret_data = var.screenshot_api_key
}
# Cloud Storage bucket for function source
resource "google_storage_bucket" "function_source" {
name = "${var.project_id}-function-source"
location = var.region
}
resource "google_storage_bucket_object" "function_zip" {
name = "screenshot-function-${filemd5("${path.module}/function.zip")}.zip"
bucket = google_storage_bucket.function_source.name
source = "${path.module}/function.zip"
}
# Cloud Function (2nd gen)
resource "google_cloudfunctions2_function" "screenshot" {
name = "screenshot-capture"
location = var.region
build_config {
runtime = "python312"
entry_point = "capture_and_store"
source {
storage_source {
bucket = google_storage_bucket.function_source.name
object = google_storage_bucket_object.function_zip.name
}
}
}
service_config {
max_instance_count = 10
min_instance_count = 0
available_memory = "256Mi"
timeout_seconds = 120
environment_variables = {
SCREENSHOT_BUCKET = google_storage_bucket.screenshots.name
}
secret_environment_variables {
key = "SCREENSHOT_API_KEY"
project_id = var.project_id
secret = google_secret_manager_secret.screenshot_api_key.secret_id
version = "latest"
}
}
}
# Cloud Scheduler job for daily captures
resource "google_cloud_scheduler_job" "daily_capture" {
name = "daily-regulatory-capture"
schedule = "30 13 * * 1-5" # 13:30 UTC, weekdays
time_zone = "UTC"
http_target {
uri = google_cloudfunctions2_function.screenshot.service_config[0].uri
http_method = "POST"
body = base64encode(jsonencode({}))
oidc_token {
service_account_email = google_service_account.scheduler.email
}
}
}
resource "google_service_account" "scheduler" {
account_id = "screenshot-scheduler"
display_name = "Screenshot Scheduler"
}
resource "google_cloudfunctions2_function_iam_member" "scheduler_invoker" {
location = google_cloudfunctions2_function.screenshot.location
cloud_function = google_cloudfunctions2_function.screenshot.name
role = "roles/cloudfunctions.invoker"
member = "serviceAccount:${google_service_account.scheduler.email}"
}
# variables.tf
variable "project_id" { type = string }
variable "region" { default = "us-central1" }
variable "screenshot_api_key" {
type = string
sensitive = true
}
Deploy with:
# Package the function
zip -r function.zip main.py requirements.txt
# Deploy
terraform init
terraform apply \
-var="project_id=$(gcloud config get-value project)" \
-var="screenshot_api_key=your-api-key-here"
Cloud Storage Event Trigger
For workflows where screenshot capture should react to file uploads — a content ingestion pipeline drops a JSON manifest, and the function processes each URL in the manifest:
@functions_framework.cloud_event
def process_url_manifest(cloud_event):
"""
Triggered when a manifest file is uploaded to Cloud Storage.
Manifest format: {"urls": [{"url": "...", "id": "..."}], "format": "webp"}
"""
data = cloud_event.data
bucket_name = data["bucket"]
blob_name = data["name"]
if not blob_name.endswith(".json") or not blob_name.startswith("manifests/"):
return # Only process manifest files
bucket = gcs.bucket(bucket_name)
manifest_blob = bucket.blob(blob_name)
manifest = json.loads(manifest_blob.download_as_text())
fmt = manifest.get("format", "webp")
urls = manifest.get("urls", [])
print(f"Processing manifest {blob_name}: {len(urls)} URLs")
for item in urls:
try:
image_bytes = _capture_with_retry(item["url"], fmt)
output_blob_name = f"screenshots/{item['id']}.{fmt}"
bucket.blob(output_blob_name).upload_from_string(
image_bytes, content_type=f"image/{fmt}"
)
print(f"[OK] {item['id']}: {len(image_bytes)} bytes")
except Exception as e:
print(f"[FAIL] {item['id']}: {e}")
Cost Comparison
| Component | Cost |
|---|---|
| GCF invocation (2nd gen) | $0.40 per 1M requests |
| GCF compute (256MB, 25s avg) | ~$0.12 per 1000 screenshots |
| Cloud Storage write | $0.05 per 10k operations |
| Cloud Storage storage (90-day lifecycle) | ~$0.02 per GB/month |
| Screenshot API (Pro tier) | $9/30 days for 1,000 calls/day |
For 1000 screenshots/month: GCF compute ≈ $0.12, GCS ≈ negligible, API ≈ $9. Same economics as Lambda — the API pricing dominates at these volumes, making the choice between GCF and Lambda a matter of your existing GCP/AWS footprint rather than cost optimization.
Summary
GCF 2nd gen's longer timeout (60 minutes vs Lambda's 15), native Pub/Sub fan-out, and Terraform-idiomatic deployment make it a natural fit for screenshot workflows in GCP environments. The key patterns: use Secret Manager for the API key, Cloud Storage for screenshot output, Pub/Sub for batch fan-out with dead letter handling, Cloud Scheduler for periodic captures, and module-level caching for secrets to avoid per-invocation latency on warm instances.
Get a free API key at hermesforge.dev to start building.