Screenshot API for Fintech: Automating Regulatory Disclosure and Price Alert Capture

2026-05-16 | Tags: [screenshot-api, fintech, compliance, automation, tutorials]

Financial services sit at the intersection of two screenshot API use cases that rarely overlap in other industries: regulatory compliance (strict, evidence-based, legally consequential) and real-time monitoring (fast-moving, competitive, time-sensitive).

A mortgage lender needs to prove it disclosed rates correctly. A trading platform needs to document price alerts it sent users. A neobank needs to monitor competitor fee schedules. These are different problems with different urgency profiles — but they share the same infrastructure requirement: capturing web state as evidence.

Regulatory Disclosure Archiving

Financial regulators (FCA, SEC, CFPB, CBI) require that disclosed terms — interest rates, fee schedules, risk warnings, APR tables — are documented as presented to users at the time of disclosure. A database record of what the terms were is insufficient evidence; you need a record of what the user saw.

import requests
import hashlib
import json
import hmac
from datetime import datetime, timezone
from pathlib import Path

SCREENSHOT_API = "https://hermesforge.dev/api/screenshot"
API_KEY = "your_api_key_here"

def capture_disclosure_page(url: str, customer_ref: str = None) -> dict:
    """
    Capture a regulatory disclosure page exactly as presented to users.

    Key: no block_ads, no inject_css. Capture the real first-visit experience.
    Regulators care about what users saw, not a sanitized version.
    """
    resp = requests.get(SCREENSHOT_API, params={
        "url": url,
        "width": 1280,
        "full_page": True,
        "wait_for": "networkidle",
        "format": "png"  # PNG for lossless — regulatory evidence
        # Deliberately NOT passing: block_ads, inject_css
    }, headers={"X-API-Key": API_KEY})
    resp.raise_for_status()

    image_bytes = resp.content
    image_hash = hashlib.sha256(image_bytes).hexdigest()
    timestamp = datetime.now(timezone.utc).isoformat()

    return {
        "url": url,
        "customer_ref": customer_ref,
        "captured_at": timestamp,
        "sha256": image_hash,
        "image_bytes": image_bytes,
        "image_size_kb": len(image_bytes) // 1024
    }

HMAC-Chained Audit Trail

For FCA/SEC compliance, disclosure archives must be tamper-evident. Hash each capture and chain entries so any modification breaks the verification:

class DisclosureAuditTrail:
    def __init__(self, archive_dir: str, secret_key: bytes):
        self.archive_dir = Path(archive_dir)
        self.archive_dir.mkdir(parents=True, exist_ok=True)
        self.manifest_path = self.archive_dir / "manifest.jsonl"
        self.secret_key = secret_key

    def record_disclosure(self, capture: dict, disclosure_type: str,
                          product_id: str = None) -> dict:
        """Record a disclosure capture in the tamper-evident chain."""
        timestamp = capture["captured_at"]
        filename = f"disclosure_{disclosure_type}_{timestamp.replace(':', '-')}.png"

        # Save image
        image_path = self.archive_dir / filename
        image_path.write_bytes(capture["image_bytes"])

        # Build manifest entry
        entry = {
            "type": disclosure_type,
            "product_id": product_id,
            "customer_ref": capture.get("customer_ref"),
            "url": capture["url"],
            "captured_at": timestamp,
            "file": filename,
            "sha256": capture["sha256"]
        }

        # Chain: HMAC(entry_json + prev_hmac)
        prev_hmac = self._get_last_hmac()
        chain_input = json.dumps(entry, sort_keys=True) + (prev_hmac or "")
        entry["hmac"] = hmac.new(
            self.secret_key,
            chain_input.encode("utf-8"),
            hashlib.sha256
        ).hexdigest()

        with open(self.manifest_path, "a") as f:
            f.write(json.dumps(entry) + "\n")

        return entry

    def verify_chain(self) -> dict:
        """Verify entire chain integrity. Returns result with any break location."""
        if not self.manifest_path.exists():
            return {"valid": True, "entries": 0}

        entries = []
        with open(self.manifest_path) as f:
            for line in f:
                if line.strip():
                    entries.append(json.loads(line))

        prev_hmac = None
        for i, entry in enumerate(entries):
            stored_hmac = entry.pop("hmac")
            chain_input = json.dumps(entry, sort_keys=True) + (prev_hmac or "")
            expected_hmac = hmac.new(
                self.secret_key,
                chain_input.encode("utf-8"),
                hashlib.sha256
            ).hexdigest()

            if stored_hmac != expected_hmac:
                return {
                    "valid": False,
                    "break_at_entry": i,
                    "break_at_timestamp": entry.get("captured_at"),
                    "entries_checked": i
                }

            entry["hmac"] = stored_hmac  # Restore for chain continuity
            prev_hmac = stored_hmac

        return {"valid": True, "entries": len(entries)}

    def _get_last_hmac(self) -> str | None:
        if not self.manifest_path.exists():
            return None
        last_line = None
        with open(self.manifest_path) as f:
            for line in f:
                if line.strip():
                    last_line = line.strip()
        if last_line:
            return json.loads(last_line).get("hmac")
        return None

Price Alert Evidence Capture

When a trading platform or neobank sends a user a price alert ("AAPL crossed $200"), regulators and customers may later dispute what price was shown. Screenshot the platform page at alert time:

import sqlite3
from datetime import datetime, timezone

class PriceAlertEvidenceCapture:
    def __init__(self, db_path: str = "price_alerts.db", evidence_dir: str = "alert_evidence"):
        self.evidence_dir = Path(evidence_dir)
        self.evidence_dir.mkdir(exist_ok=True)
        self.db_path = db_path
        self._init_db()

    def _init_db(self):
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS alert_captures (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    alert_id TEXT UNIQUE,
                    user_id TEXT,
                    instrument TEXT,
                    alert_price REAL,
                    market_price_at_capture REAL,
                    page_url TEXT,
                    captured_at TEXT,
                    file_path TEXT,
                    sha256 TEXT
                )
            """)

    def capture_alert_evidence(self, alert_id: str, user_id: str,
                                instrument: str, alert_price: float,
                                platform_page_url: str,
                                market_price: float = None) -> dict:
        """
        Capture evidence screenshot when a price alert fires.

        Call this from your alert dispatch logic, synchronously before
        sending the notification to the user.
        """
        capture = capture_disclosure_page(platform_page_url)

        timestamp = capture["captured_at"]
        filename = f"alert_{alert_id}_{timestamp.replace(':', '-')}.png"
        file_path = self.evidence_dir / filename
        file_path.write_bytes(capture["image_bytes"])

        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                INSERT OR IGNORE INTO alert_captures
                (alert_id, user_id, instrument, alert_price, market_price_at_capture,
                 page_url, captured_at, file_path, sha256)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
            """, (alert_id, user_id, instrument, alert_price, market_price,
                  platform_page_url, timestamp, str(file_path), capture["sha256"]))

        return {
            "alert_id": alert_id,
            "captured_at": timestamp,
            "file": str(file_path),
            "sha256": capture["sha256"]
        }

Competitor Rate and Fee Monitoring

Fintech competitive intelligence: when a competitor changes their mortgage rate, savings rate, or fee schedule, you want to know before your sales team gets asked about it by customers:

import sqlite3

class CompetitorRateMonitor:
    def __init__(self, db_path: str = "competitor_rates.db",
                 screenshot_dir: str = "rate_screenshots"):
        self.screenshot_dir = Path(screenshot_dir)
        self.screenshot_dir.mkdir(exist_ok=True)
        self.db_path = db_path
        self._init_db()

    def _init_db(self):
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS rate_pages (
                    id TEXT PRIMARY KEY,
                    name TEXT,
                    url TEXT,
                    product_type TEXT,
                    last_checked TEXT,
                    last_hash TEXT,
                    change_count INTEGER DEFAULT 0,
                    first_seen TEXT DEFAULT CURRENT_TIMESTAMP
                )
            """)
            conn.execute("""
                CREATE TABLE IF NOT EXISTS rate_snapshots (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    page_id TEXT,
                    captured_at TEXT,
                    file_path TEXT,
                    image_hash TEXT,
                    change_detected BOOLEAN DEFAULT 0,
                    FOREIGN KEY (page_id) REFERENCES rate_pages(id)
                )
            """)

    def add_competitor(self, page_id: str, name: str, url: str, product_type: str):
        """Register a competitor rate page to monitor."""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                "INSERT OR IGNORE INTO rate_pages (id, name, url, product_type) VALUES (?, ?, ?, ?)",
                (page_id, name, url, product_type)
            )

    def check_competitor(self, page_id: str) -> dict:
        """Check one competitor page for rate changes."""
        with sqlite3.connect(self.db_path) as conn:
            row = conn.execute(
                "SELECT url, last_hash, name FROM rate_pages WHERE id = ?", (page_id,)
            ).fetchone()

        if not row:
            return {"error": "unknown page_id"}

        url, prev_hash, name = row

        # Inject CSS to stabilize dynamic elements (timestamps, animations)
        # that would cause false positives on unchanged rate tables
        stability_css = """
            .timestamp, .last-updated, [class*="time"],
            .live-badge, .online-indicator { visibility: hidden !important; }
            * { animation: none !important; transition: none !important; }
        """

        resp = requests.get(SCREENSHOT_API, params={
            "url": url,
            "width": 1280,
            "full_page": True,
            "wait_for": "networkidle",
            "block_ads": True,
            "inject_css": stability_css,
            "format": "webp"
        }, headers={"X-API-Key": API_KEY})
        resp.raise_for_status()

        image_bytes = resp.content
        image_hash = hashlib.sha256(image_bytes).hexdigest()
        timestamp = datetime.now(timezone.utc).isoformat()

        change_detected = prev_hash is not None and prev_hash != image_hash

        filename = f"{page_id}_{timestamp.replace(':', '-')}.webp"
        file_path = self.screenshot_dir / filename
        file_path.write_bytes(image_bytes)

        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                INSERT INTO rate_snapshots (page_id, captured_at, file_path, image_hash, change_detected)
                VALUES (?, ?, ?, ?, ?)
            """, (page_id, timestamp, str(file_path), image_hash, change_detected))

            conn.execute("""
                UPDATE rate_pages SET last_checked = ?, last_hash = ?,
                    change_count = change_count + ?
                WHERE id = ?
            """, (timestamp, image_hash, 1 if change_detected else 0, page_id))

        return {
            "page_id": page_id,
            "name": name,
            "change_detected": change_detected,
            "hash": image_hash,
            "file": str(file_path)
        }

    def check_all(self) -> list[dict]:
        """Check all registered competitor pages."""
        with sqlite3.connect(self.db_path) as conn:
            page_ids = [row[0] for row in conn.execute("SELECT id FROM rate_pages").fetchall()]
        return [self.check_competitor(pid) for pid in page_ids]

Risk Warning Capture for Investment Products

MiFID II, KID (Key Information Document), and PRIIP regulations require that risk disclosures are documented. Capture the risk warning page users see before completing a product purchase:

def capture_pre_purchase_risk_warning(product_isin: str, risk_warning_url: str,
                                       session_token: str = None,
                                       audit_trail: DisclosureAuditTrail = None) -> dict:
    """
    Capture risk warning page before a product purchase completes.

    For MiFID II / PRIIP compliance: capture the KIID/KID page the user
    reviewed. Session cookie injection allows capturing authenticated views
    if the risk warning is behind a login.
    """
    params = {
        "url": risk_warning_url,
        "width": 1280,
        "full_page": True,
        "wait_for": "networkidle",
        "format": "png"
    }

    if session_token:
        # Inject auth cookie to capture the authenticated page state
        params["inject_js"] = f"""
            document.cookie = 'session={session_token}; path=/';
            // Wait for authenticated content to load
            await new Promise(r => setTimeout(r, 1000));
        """

    resp = requests.get(SCREENSHOT_API, params=params,
                        headers={"X-API-Key": API_KEY})
    resp.raise_for_status()

    capture = {
        "url": risk_warning_url,
        "captured_at": datetime.now(timezone.utc).isoformat(),
        "sha256": hashlib.sha256(resp.content).hexdigest(),
        "image_bytes": resp.content
    }

    if audit_trail:
        return audit_trail.record_disclosure(
            capture,
            disclosure_type="risk_warning",
            product_id=product_isin
        )

    return capture

Scheduled Compliance Run

Run disclosure captures daily across all regulated product pages. Detect changes between captures and flag for compliance review:

import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

def run_daily_compliance_check(disclosure_pages: list[dict],
                                audit_trail: DisclosureAuditTrail,
                                alert_email: str,
                                smtp_config: dict) -> dict:
    """
    Daily compliance run: capture all disclosure pages and flag changes.

    disclosure_pages: [{"url": "...", "type": "apr_disclosure", "product_id": "..."}]
    """
    results = {"captured": 0, "changed": 0, "errors": 0, "changes": []}

    for page in disclosure_pages:
        try:
            capture = capture_disclosure_page(page["url"])
            entry = audit_trail.record_disclosure(
                capture, page["type"], page.get("product_id")
            )
            results["captured"] += 1

            # Check if hash differs from previous day's entry
            # (simple: check if this is not the first entry for this product+type)
            # A more complete implementation would query the manifest for prior entries

        except Exception as e:
            results["errors"] += 1
            results.setdefault("error_detail", []).append(
                {"url": page["url"], "error": str(e)}
            )

    # Verify chain integrity after all captures
    chain_status = audit_trail.verify_chain()
    results["chain_valid"] = chain_status["valid"]

    if not chain_status["valid"]:
        # Chain break — alert compliance team immediately
        send_chain_break_alert(alert_email, chain_status, smtp_config)

    return results

def send_chain_break_alert(to_email: str, chain_status: dict, smtp_config: dict):
    msg = MIMEMultipart()
    msg["Subject"] = "URGENT: Compliance Archive Chain Break Detected"
    msg["From"] = smtp_config["user"]
    msg["To"] = to_email
    msg.attach(MIMEText(
        f"Chain break detected at entry {chain_status['break_at_entry']} "
        f"(timestamp: {chain_status.get('break_at_timestamp', 'unknown')}). "
        f"Immediate review required.",
        "plain"
    ))
    with smtplib.SMTP_SSL(smtp_config["host"], smtp_config["port"]) as smtp:
        smtp.login(smtp_config["user"], smtp_config["password"])
        smtp.sendmail(smtp_config["user"], [to_email], msg.as_string())

Rate Limit Planning for Fintech Workloads

Workload Frequency Pages Calls/day Tier
Single product disclosure archive Daily 10 10 Free (50/day)
Price alert evidence (low volume) Per-alert ~30 alerts 30 Free
Competitor rate monitoring Hourly 20 480 Pro (1000/day)
Full regulatory portfolio + alerts Combined 50 ~1,500 Business (5000/day)
National bank multi-product suite 30-minute 200 9,600 Enterprise

What Fintech Distinguishes

Two things separate fintech from most screenshot API use cases:

Evidence grade requirements: Screenshots must be lossless (PNG), unchained (HMAC), and unmodified (no inject_css, no block_ads). Every sanitizing option that helps other use cases is a liability here. The capture must represent exactly what a user saw.

Dual urgency profiles: Compliance captures happen daily on a schedule. Alert captures happen in real-time, synchronously, before the user notification is dispatched. The same API handles both — the difference is in when and why you call it.

A fintech that combines both patterns into one screenshot API integration covers regulatory disclosure archiving and competitive rate intelligence from the same account and billing tier.


Hermesforge Screenshot API: full-page capture, PNG and WebP, custom wait strategies. Get a free API key — 50 calls/day, no signup required.