Using a Screenshot API in Google Cloud Functions: Serverless Visual Capture on GCP

2026-05-21 | Tags: [screenshot-api, google-cloud-functions, serverless, python, gcp]

Google Cloud Functions brings the same serverless economics as AWS Lambda — pay per invocation, scale to zero, no server management — with GCP-native integrations that make certain patterns more natural: Cloud Storage triggers, Pub/Sub fan-out, Cloud Scheduler cron, and Secret Manager for credentials. This guide covers the GCF-specific patterns for screenshot API integrations, with Terraform deployment throughout.

GCF vs Lambda: Key Differences for Screenshot Workflows

Aspect Google Cloud Functions AWS Lambda
Max timeout 60 minutes (2nd gen) 15 minutes
Max memory 32GB (2nd gen) 10GB
Default timeout 60 seconds 3 seconds
Storage integration Cloud Storage S3
Scheduling Cloud Scheduler EventBridge
Secrets Secret Manager SSM Parameter Store
Deployment gcloud / Terraform SAM / CDK
Cold start (Python) ~400ms ~250ms

GCF's 60-minute timeout limit is a meaningful advantage for batch screenshot workflows — long-running capture jobs that would require Step Functions on AWS can run as a single GCF invocation.

Basic Cloud Function

import functions_framework
import os
import urllib.request
import urllib.parse
import urllib.error
import json

@functions_framework.http
def capture_screenshot(request):
    """
    HTTP-triggered Cloud Function for screenshot capture.
    Accepts JSON body: {"url": "...", "format": "webp"}
    """
    if request.method != "POST":
        return json.dumps({"error": "POST required"}), 405, {"Content-Type": "application/json"}

    try:
        body = request.get_json(force=True)
    except Exception:
        return json.dumps({"error": "Invalid JSON"}), 400, {"Content-Type": "application/json"}

    target_url = body.get("url")
    if not target_url:
        return json.dumps({"error": "url is required"}), 400, {"Content-Type": "application/json"}

    api_key = os.environ["SCREENSHOT_API_KEY"]
    fmt = body.get("format", "webp")

    params = urllib.parse.urlencode({
        "url": target_url,
        "format": fmt,
        "full_page": "true",
        "block_ads": "true",
    })

    req = urllib.request.Request(
        f"https://hermesforge.dev/api/screenshot?{params}",
        headers={"X-API-Key": api_key},
    )

    try:
        with urllib.request.urlopen(req, timeout=25) as resp:
            image_bytes = resp.read()
    except urllib.error.HTTPError as e:
        error_body = e.read().decode()
        return json.dumps({"error": error_body}), e.code, {"Content-Type": "application/json"}
    except urllib.error.URLError as e:
        return json.dumps({"error": str(e.reason)}), 502, {"Content-Type": "application/json"}

    import base64
    return json.dumps({
        "bytes": len(image_bytes),
        "image_base64": base64.b64encode(image_bytes).decode(),
        "format": fmt,
    }), 200, {"Content-Type": "application/json"}
# requirements.txt
functions-framework==3.*

Timeout configuration: GCF 2nd gen defaults to 60 seconds. For screenshot-heavy workloads, set explicitly:

gcloud functions deploy capture-screenshot \
  --gen2 \
  --runtime python312 \
  --trigger-http \
  --timeout 120s \
  --memory 256Mi \
  --set-env-vars SCREENSHOT_API_KEY=your-key-here

Cloud Storage Integration

For production use, store screenshots in Cloud Storage rather than returning base64 in the response:

import functions_framework
import os
import urllib.request
import urllib.parse
import urllib.error
import json
from datetime import datetime, UTC
from google.cloud import storage

gcs = storage.Client()
BUCKET_NAME = os.environ["SCREENSHOT_BUCKET"]
API_KEY = os.environ["SCREENSHOT_API_KEY"]


def capture(url: str, fmt: str = "webp") -> bytes:
    params = urllib.parse.urlencode({
        "url": url,
        "format": fmt,
        "full_page": "true",
        "block_ads": "true",
        "viewport_width": "1280",
        "viewport_height": "800",
    })
    req = urllib.request.Request(
        f"https://hermesforge.dev/api/screenshot?{params}",
        headers={"X-API-Key": API_KEY},
    )
    with urllib.request.urlopen(req, timeout=25) as resp:
        return resp.read()


def store(image_bytes: bytes, blob_name: str, fmt: str) -> str:
    """Store in GCS, return signed URL valid for 1 hour."""
    content_types = {"webp": "image/webp", "png": "image/png", "jpeg": "image/jpeg"}
    bucket = gcs.bucket(BUCKET_NAME)
    blob = bucket.blob(blob_name)
    blob.upload_from_string(
        image_bytes,
        content_type=content_types.get(fmt, "image/webp"),
    )
    from datetime import timedelta
    return blob.generate_signed_url(
        expiration=timedelta(hours=1),
        method="GET",
        version="v4",
    )


@functions_framework.http
def capture_and_store(request):
    body = request.get_json(force=True) or {}
    target_url = body.get("url")
    if not target_url:
        return json.dumps({"error": "url required"}), 400, {"Content-Type": "application/json"}

    fmt = body.get("format", "webp")
    ts = datetime.now(UTC).strftime("%Y/%m/%d/%H%M%S")
    safe_host = urllib.parse.urlparse(target_url).netloc.replace(".", "-")
    blob_name = f"screenshots/{ts}/{safe_host}.{fmt}"

    try:
        image_bytes = capture(target_url, fmt)
    except urllib.error.HTTPError as e:
        return json.dumps({"error": e.read().decode()}), e.code, {"Content-Type": "application/json"}

    signed_url = store(image_bytes, blob_name, fmt)

    return json.dumps({
        "gcs_path": f"gs://{BUCKET_NAME}/{blob_name}",
        "signed_url": signed_url,
        "bytes": len(image_bytes),
        "expires_in": 3600,
    }), 200, {"Content-Type": "application/json"}

Secret Manager for API Keys

Hardcoding API keys in environment variables works but Secret Manager is the GCP-idiomatic approach for production credentials:

from google.cloud import secretmanager


def get_api_key() -> str:
    """Fetch screenshot API key from Secret Manager."""
    client = secretmanager.SecretManagerServiceClient()
    project_id = os.environ["GOOGLE_CLOUD_PROJECT"]
    name = f"projects/{project_id}/secrets/screenshot-api-key/versions/latest"
    response = client.access_secret_version(request={"name": name})
    return response.payload.data.decode("utf-8").strip()


# Cache at module level — survives warm invocations
_API_KEY: str | None = None


def api_key() -> str:
    global _API_KEY
    if _API_KEY is None:
        _API_KEY = get_api_key()
    return _API_KEY

Module-level caching matters: GCF reuses warm instances across invocations. A Secret Manager fetch on every invocation adds ~50ms. Caching it at module initialization means the latency hits only on cold starts.

Pub/Sub-Triggered Batch Processing

For batch screenshot jobs — processing a queue of URLs from a data pipeline, generating thumbnails for a content ingestion workflow — Pub/Sub provides natural fan-out:

import functions_framework
import base64
import json
import os
import urllib.request
import urllib.parse
import urllib.error
from google.cloud import storage

gcs = storage.Client()
BUCKET_NAME = os.environ["SCREENSHOT_BUCKET"]
API_KEY = os.environ["SCREENSHOT_API_KEY"]


@functions_framework.cloud_event
def process_screenshot_request(cloud_event):
    """
    Pub/Sub-triggered function. Message format:
    {"url": "https://...", "reference_id": "...", "format": "webp"}
    """
    message_data = base64.b64decode(cloud_event.data["message"]["data"]).decode()
    try:
        payload = json.loads(message_data)
    except json.JSONDecodeError as e:
        print(f"Invalid message JSON: {e}")
        return  # Ack the message to avoid infinite retry on bad messages

    url = payload.get("url")
    reference_id = payload.get("reference_id", cloud_event.data["message"]["message_id"])
    fmt = payload.get("format", "webp")

    if not url:
        print(f"No URL in message {reference_id}, skipping")
        return

    try:
        image_bytes = _capture_with_retry(url, fmt)
    except Exception as e:
        print(f"FAIL {reference_id}: {e}")
        # Raising here causes the message to be redelivered (up to the subscription's retry policy)
        raise

    blob_name = f"batch/{reference_id}.{fmt}"
    bucket = gcs.bucket(BUCKET_NAME)
    blob = bucket.blob(blob_name)
    blob.upload_from_string(
        image_bytes,
        content_type=f"image/{fmt}",
    )
    blob.metadata = {"source_url": url, "reference_id": reference_id}
    blob.patch()

    print(f"OK {reference_id}: {len(image_bytes)} bytes → gs://{BUCKET_NAME}/{blob_name}")


def _capture_with_retry(url: str, fmt: str, max_attempts: int = 3) -> bytes:
    import time
    last_error = None
    for attempt in range(max_attempts):
        try:
            params = urllib.parse.urlencode({
                "url": url, "format": fmt,
                "full_page": "true", "block_ads": "true",
            })
            req = urllib.request.Request(
                f"https://hermesforge.dev/api/screenshot?{params}",
                headers={"X-API-Key": API_KEY},
            )
            with urllib.request.urlopen(req, timeout=25) as resp:
                return resp.read()
        except urllib.error.HTTPError as e:
            if e.code == 429 or e.code >= 500:
                retry_after = float(e.headers.get("Retry-After", 2 ** attempt))
                time.sleep(retry_after)
                last_error = e
            else:
                raise
        except urllib.error.URLError as e:
            time.sleep(2 ** attempt)
            last_error = e
    raise RuntimeError(f"All {max_attempts} attempts failed") from last_error

Configure the Pub/Sub subscription with a dead letter topic and max delivery attempts (typically 5) so persistently failing messages are isolated rather than retried indefinitely.

Cloud Scheduler for Daily Captures

@functions_framework.http
def daily_regulatory_capture(request):
    """
    Triggered by Cloud Scheduler (HTTP target).
    Captures a fixed set of regulatory pages daily.
    """
    capture_date = datetime.now(UTC).strftime("%Y-%m-%d")
    pages = [
        {"url": "https://example.com/disclosures/best-execution", "label": "best-execution"},
        {"url": "https://example.com/disclosures/rts28", "label": "rts28"},
        {"url": "https://example.com/disclosures/order-handling", "label": "order-handling"},
    ]

    results = []
    for page in pages:
        try:
            image_bytes = _capture_with_retry(page["url"], "webp")
            blob_name = f"regulatory/{capture_date}/{page['label']}.webp"
            bucket = gcs.bucket(BUCKET_NAME)
            bucket.blob(blob_name).upload_from_string(
                image_bytes, content_type="image/webp"
            )
            results.append({"label": page["label"], "bytes": len(image_bytes), "status": "ok"})
            print(f"[OK] {page['label']}: {len(image_bytes)} bytes")
        except Exception as e:
            results.append({"label": page["label"], "status": "error", "error": str(e)})
            print(f"[FAIL] {page['label']}: {e}")

    success_count = sum(1 for r in results if r["status"] == "ok")
    status_code = 200 if success_count == len(pages) else 207

    return json.dumps({
        "date": capture_date,
        "results": results,
        "success": success_count,
        "total": len(pages),
    }), status_code, {"Content-Type": "application/json"}

Cloud Scheduler config (Terraform below) sends an authenticated HTTP POST to the function URL daily at 13:30 UTC.

Terraform Deployment

# main.tf

terraform {
  required_providers {
    google = { source = "hashicorp/google", version = "~> 5.0" }
  }
}

provider "google" {
  project = var.project_id
  region  = var.region
}

# Cloud Storage bucket for screenshots
resource "google_storage_bucket" "screenshots" {
  name          = "${var.project_id}-screenshots"
  location      = var.region
  force_destroy = false

  lifecycle_rule {
    condition { age = 90 }
    action { type = "Delete" }
  }

  uniform_bucket_level_access = true
}

# Secret Manager secret for API key
resource "google_secret_manager_secret" "screenshot_api_key" {
  secret_id = "screenshot-api-key"
  replication { auto {} }
}

resource "google_secret_manager_secret_version" "screenshot_api_key" {
  secret      = google_secret_manager_secret.screenshot_api_key.id
  secret_data = var.screenshot_api_key
}

# Cloud Storage bucket for function source
resource "google_storage_bucket" "function_source" {
  name     = "${var.project_id}-function-source"
  location = var.region
}

resource "google_storage_bucket_object" "function_zip" {
  name   = "screenshot-function-${filemd5("${path.module}/function.zip")}.zip"
  bucket = google_storage_bucket.function_source.name
  source = "${path.module}/function.zip"
}

# Cloud Function (2nd gen)
resource "google_cloudfunctions2_function" "screenshot" {
  name     = "screenshot-capture"
  location = var.region

  build_config {
    runtime     = "python312"
    entry_point = "capture_and_store"
    source {
      storage_source {
        bucket = google_storage_bucket.function_source.name
        object = google_storage_bucket_object.function_zip.name
      }
    }
  }

  service_config {
    max_instance_count = 10
    min_instance_count = 0
    available_memory   = "256Mi"
    timeout_seconds    = 120
    environment_variables = {
      SCREENSHOT_BUCKET = google_storage_bucket.screenshots.name
    }
    secret_environment_variables {
      key        = "SCREENSHOT_API_KEY"
      project_id = var.project_id
      secret     = google_secret_manager_secret.screenshot_api_key.secret_id
      version    = "latest"
    }
  }
}

# Cloud Scheduler job for daily captures
resource "google_cloud_scheduler_job" "daily_capture" {
  name      = "daily-regulatory-capture"
  schedule  = "30 13 * * 1-5"  # 13:30 UTC, weekdays
  time_zone = "UTC"

  http_target {
    uri         = google_cloudfunctions2_function.screenshot.service_config[0].uri
    http_method = "POST"
    body        = base64encode(jsonencode({}))

    oidc_token {
      service_account_email = google_service_account.scheduler.email
    }
  }
}

resource "google_service_account" "scheduler" {
  account_id   = "screenshot-scheduler"
  display_name = "Screenshot Scheduler"
}

resource "google_cloudfunctions2_function_iam_member" "scheduler_invoker" {
  location       = google_cloudfunctions2_function.screenshot.location
  cloud_function = google_cloudfunctions2_function.screenshot.name
  role           = "roles/cloudfunctions.invoker"
  member         = "serviceAccount:${google_service_account.scheduler.email}"
}

# variables.tf
variable "project_id" { type = string }
variable "region" { default = "us-central1" }
variable "screenshot_api_key" {
  type      = string
  sensitive = true
}

Deploy with:

# Package the function
zip -r function.zip main.py requirements.txt

# Deploy
terraform init
terraform apply \
  -var="project_id=$(gcloud config get-value project)" \
  -var="screenshot_api_key=your-api-key-here"

Cloud Storage Event Trigger

For workflows where screenshot capture should react to file uploads — a content ingestion pipeline drops a JSON manifest, and the function processes each URL in the manifest:

@functions_framework.cloud_event
def process_url_manifest(cloud_event):
    """
    Triggered when a manifest file is uploaded to Cloud Storage.
    Manifest format: {"urls": [{"url": "...", "id": "..."}], "format": "webp"}
    """
    data = cloud_event.data
    bucket_name = data["bucket"]
    blob_name = data["name"]

    if not blob_name.endswith(".json") or not blob_name.startswith("manifests/"):
        return  # Only process manifest files

    bucket = gcs.bucket(bucket_name)
    manifest_blob = bucket.blob(blob_name)
    manifest = json.loads(manifest_blob.download_as_text())

    fmt = manifest.get("format", "webp")
    urls = manifest.get("urls", [])

    print(f"Processing manifest {blob_name}: {len(urls)} URLs")

    for item in urls:
        try:
            image_bytes = _capture_with_retry(item["url"], fmt)
            output_blob_name = f"screenshots/{item['id']}.{fmt}"
            bucket.blob(output_blob_name).upload_from_string(
                image_bytes, content_type=f"image/{fmt}"
            )
            print(f"[OK] {item['id']}: {len(image_bytes)} bytes")
        except Exception as e:
            print(f"[FAIL] {item['id']}: {e}")

Cost Comparison

Component Cost
GCF invocation (2nd gen) $0.40 per 1M requests
GCF compute (256MB, 25s avg) ~$0.12 per 1000 screenshots
Cloud Storage write $0.05 per 10k operations
Cloud Storage storage (90-day lifecycle) ~$0.02 per GB/month
Screenshot API (Pro tier) $9/30 days for 1,000 calls/day

For 1000 screenshots/month: GCF compute ≈ $0.12, GCS ≈ negligible, API ≈ $9. Same economics as Lambda — the API pricing dominates at these volumes, making the choice between GCF and Lambda a matter of your existing GCP/AWS footprint rather than cost optimization.

Summary

GCF 2nd gen's longer timeout (60 minutes vs Lambda's 15), native Pub/Sub fan-out, and Terraform-idiomatic deployment make it a natural fit for screenshot workflows in GCP environments. The key patterns: use Secret Manager for the API key, Cloud Storage for screenshot output, Pub/Sub for batch fan-out with dead letter handling, Cloud Scheduler for periodic captures, and module-level caching for secrets to avoid per-invocation latency on warm instances.

Get a free API key at hermesforge.dev to start building.