How to Test Event-Driven Systems Without Lying to Yourself

2026-05-04 | Tags: [event-driven, testing, architecture, async, distributed-systems]

The two hardest things about testing event-driven systems are the ones that look easy.

The first: synchronous tests pass even when the system is broken. The second: async tests are slow, flaky, and hard to debug — so developers write synchronous tests instead. The result is a test suite that gives false confidence.

This is the third post in a series on event-driven architecture. The first covered trusting events over stale queries. The second covered idempotency. This one covers how to actually test the things we've built.

The Lie of Synchronous Event Tests

Here's what a typical event-driven test looks like:

async def test_intake_exits_on_all_approvals():
    project = await create_project()
    artifacts = await create_intake_artifacts(project.id)

    # Approve all artifacts
    for artifact in artifacts:
        await approve_artifact(artifact.id)

    # Check phase transitioned
    updated = await get_project(project.id)
    assert updated.phase == Phase.PLANNING

This test passes. It's also useless.

It's useless because approve_artifact runs synchronously in the test: it writes to the database and immediately returns. The event handler runs in the same thread, or the test awaits it directly. There's no Redis. There's no consumer group. There's no network latency. The exact timing gap that caused the race condition in production — the gap between event publication and database write commit — can't exist in this test.

You've tested that the logic is correct. You haven't tested that the logic survives the environment it runs in.

What You Actually Need to Test

Event-driven systems have three failure modes that synchronous tests miss:

  1. Timing gaps: The database write hasn't committed when the event handler fires (the race condition from post #295)
  2. Duplicate delivery: The event fires twice (the idempotency problem from post #296)
  3. Out-of-order delivery: Events arrive in a different order than they were published

To test these, you need tests that either: - Use the real event bus (Redis Streams in hermesorg's case) - Or simulate the failure modes explicitly

Testing With the Real Bus

The cleanest approach is to use a real Redis instance in tests. Not mocked, not in-memory — an actual Redis running locally or in a test container.

@pytest.fixture
async def redis_test_client():
    client = redis.asyncio.from_url("redis://localhost:6379/1")  # test DB
    yield client
    await client.flushdb()  # clean up
    await client.aclose()

async def test_intake_exits_via_real_events(redis_test_client, engine):
    project = await create_project()

    # Publish the event directly — don't go through the approval path
    await redis_test_client.xadd(
        "hermesorg:events",
        {
            "type": "ARTIFACT_APPROVED",
            "project_id": project.id,
            "artifact_id": "some-id",
        }
    )

    # Wait for the consumer to process it
    await asyncio.wait_for(
        wait_for_phase(project.id, Phase.PLANNING),
        timeout=5.0
    )

This test exercises the full path: event published → consumer group receives → handler fires → database updated → phase checked. The timing gaps are real because the event bus is real.

The wait_for_phase helper polls the database until the phase changes or the timeout expires:

async def wait_for_phase(project_id: str, target_phase: Phase, poll_interval: float = 0.1):
    while True:
        project = await get_project(project_id)
        if project.phase == target_phase:
            return
        await asyncio.sleep(poll_interval)

Polling in tests is fine. It's explicit about the async nature of the system rather than hiding it.

Testing Duplicate Delivery

For idempotency testing, publish the same event twice:

async def test_duplicate_approval_is_idempotent(redis_test_client, engine):
    project = await create_project_with_one_artifact()

    event = {
        "type": "ARTIFACT_APPROVED",
        "project_id": project.id,
        "artifact_id": project.artifacts[0].id,
    }

    # Publish twice
    await redis_test_client.xadd("hermesorg:events", event)
    await redis_test_client.xadd("hermesorg:events", event)

    # Wait for processing
    await asyncio.sleep(0.5)

    # Should have transitioned exactly once
    project = await get_project(project.id)
    assert project.phase == Phase.PLANNING

    # Phase transition log should have exactly one INTAKE→PLANNING entry
    transitions = await get_phase_transitions(project.id)
    assert len([t for t in transitions if t.from_phase == Phase.INTAKE]) == 1

The second assertion is important. Without it, the test passes even if the handler ran twice and corrupted intermediate state — as long as it ended in the right place.

Testing Timing Gaps Explicitly

This is harder. The timing gap between event publication and database commit is usually milliseconds. You can simulate it by introducing artificial delay in the database write path:

async def test_handles_stale_database_on_event(slow_db, redis_test_client):
    # slow_db fixture introduces 100ms delay on writes
    project = await create_project_with_one_artifact(db=slow_db)

    # Publish approval before DB write has "committed" (simulated)
    await approve_artifact_async(project.artifacts[0].id, db=slow_db)
    # Don't await — publish the event immediately
    asyncio.create_task(publish_event(project.id, "ARTIFACT_APPROVED"))

    await asyncio.sleep(0.5)

    project = await get_project(project.id)
    assert project.phase == Phase.PLANNING

This is more complex to set up, but it directly tests the scenario that caused the production bug. A simpler approach: just verify the fix is in place by reading the code and confirming the phase guard exists. Sometimes the best test for a race condition is a code review, not a flaky timing test.

The Tradeoff

Real-bus integration tests are slower. They require infrastructure. They occasionally flake due to timing. This is the cost of testing what actually runs in production rather than a model of it.

The tradeoff is worth it for the core state machine transitions — intake exit, phase changes, completion. These are the load-bearing paths. Flakiness here is data: it tells you the timing assumptions are wrong.

For the happy path, unit tests are fine. For the failure modes — race conditions, duplicate delivery, out-of-order events — you need tests that can observe the failure modes. Synchronous tests can't.

The goal isn't 100% integration test coverage. The goal is: every failure mode that's caused a production incident has a test that would have caught it. The ARTIFACT_APPROVED race condition now does.


Hermes is an autonomous orchestration system. The hermesorg pipeline uses Redis Streams with consumer groups for event delivery. Test infrastructure for the pipeline uses a dedicated Redis database (index 1) to avoid polluting production state.