Skip to content

Circuit Breakers

Concept Position

flowchart TD
  family["Python Programming"] --> program["Python Functional Programming"]
  program --> module["Module 04: Streaming Resilience and Failure Handling"]
  module --> concept["Circuit Breakers"]
  concept --> capstone["Capstone pressure point"]
flowchart TD
  problem["Start with the design or failure question"] --> example["Study the worked example and trade-offs"]
  example --> boundary["Name the boundary this page is trying to protect"]
  boundary --> proof["Carry that question into code review or the capstone"]

Read the first diagram as a placement map: this page is one concept inside its parent module, not a detached essay, and the capstone is the pressure test for whether the idea holds. Read the second diagram as the working rhythm for the page: name the problem, study the example, identify the boundary, then carry one review question forward.

Progression Note

By the end of Module 4, you will master safe recursion over unpredictable tree-shaped data, monoidal folds as the universal recursion pattern, Result/Option for streaming error handling, validation aggregators, retries, and structured error reporting — all while preserving laziness, equational reasoning, and constant call-stack usage.

Here's a snippet from the progression map:

Module Focus Key Outcomes
3 Lazy Iteration & Generators Memory-efficient streaming, itertools mastery, short-circuiting, observability
4 Safe Recursion & Error Handling in Streams Stack-safe tree recursion, folds, Result/Option, streaming validation/retries/reports
5 Advanced Type-Driven Design ADTs, exhaustive pattern matching, total functions, refined types

Core question:
How do you implement short-circuiting and circuit-breaker patterns in streaming pipelines using pure Result types, ensuring early termination on thresholds or failures while maintaining purity, composability, and resource safety?

We now take the Iterator[Result[Chunk, ErrInfo]] stream from M04C05–C06 and face the final real-world question:

“Processing 100 000 chunks is expensive. If the error rate climbs above 20 % after the first 500 chunks, the whole run is doomed — why waste hours finishing it?”

The naïve solution is a manual flag inside a loop:

error_rate = 0.0
seen = 0
for r in embedded:
    seen += 1
    if isinstance(r, Err):
        n_err += 1
    if seen >= 500:
        error_rate = n_err / seen
        if error_rate > 0.2:
            logger.critical("Aborting run – error rate too high")
            break
    process(r)

This works once — but it’s duplicated everywhere, easy to get wrong, and breaks when you later add parallelism or recovery.

The production solution uses pure, composable circuit-breakers — lazy iterator transducers over Result streams that give you early termination with mathematical guarantees and automatic resource cleanup.

Audience: Engineers who run long-running batch pipelines and cannot afford to process doomed data for hours.

Outcome:
1. You will short-circuit on first error, error count, error rate, or arbitrary predicate — all with O(k) work.
2. You will choose between observable breakers (emit BreakInfo) and silent truncate breakers.
3. You will ship a RAG pipeline that aborts gracefully the moment it becomes hopeless, with full provenance on why.

We formalise exactly what we want from correct, production-ready breakers: short-circuiting, ordering, bounded work, resource cleanup, and equivalence to reference implementations.


Concrete Motivating Example

Same 100 000 chunk tree from previous cores:

  • Normal success rate ≈ 99 %.
  • One malformed section causes 5 000 consecutive failures → error rate spikes to 30 % after 10 000 chunks processed.

Desired behaviour:

embedded = circuit_breaker_rate_emit(
    embedded,
    max_rate=0.2,
    min_samples=500,
)

for r in embedded:
    if isinstance(r, Err) and isinstance(r.error, BreakInfo):
        report_circuit_break(r.error)
        break
    if isinstance(r, Ok):
        index_chunk(r.value)
    else:
        log_err_info(r.error)

Total work: ~10 500 chunk attempts (stops instantly when threshold hit).


1. Laws & Invariants (machine-checked)

Law Formal Statement Enforcement
Short-Circuit Emit/truncate breakers stop after trigger, performing O(k) work where k ≤ position of trigger. test_emit_breakers_short_circuit, test_truncate_breakers_stop_silently.
Ordering Items (including terminal BreakInfo) appear in original stream order. test_breaker_ordering.
Resource Cleanup Upstream generators are closed via their close() method on early termination. test_upstream_closed_on_break.
Purity Breakers are pure functions — deterministic, no side effects beyond iteration. Reproducibility tests.
Equivalence For finite streams, breaker output equals the full stream truncated at the first trigger position, optionally followed by a terminal BreakInfo. test_breaker_equivalence_to_full_scan.

These laws guarantee breakers are safe, predictable, and resource-correct.


2. Decision Table – Which Breaker Do You Actually Use?

Trigger Need Observable Break? Need Terminal Value? Recommended Variant
First error Yes Yes short_circuit_on_err_emit
First error (silent) No No short_circuit_on_err_truncate
Error rate threshold Yes Yes circuit_breaker_rate_emit
Error rate threshold (silent) No No circuit_breaker_rate_truncate
Error count threshold Yes Yes circuit_breaker_count_emit
Custom predicate Yes Yes circuit_breaker_pred_emit

Prefer emit variants — they give you a terminal BreakInfo for reporting without breaking composability.


3. Public API Surface (end-of-Module-04 refactor note)

Refactor note: breakers live in funcpipe_rag.policies.breakers (capstone/src/funcpipe_rag/policies/breakers.py) and are re-exported from funcpipe_rag.api.core.

All snippets assume the Result / Ok / Err ADT from earlier Module 4 cores is already imported.

from funcpipe_rag.api.core import (
    BreakInfo,
    circuit_breaker_count_emit,
    circuit_breaker_count_truncate,
    circuit_breaker_pred_emit,
    circuit_breaker_pred_truncate,
    circuit_breaker_rate_emit,
    circuit_breaker_rate_truncate,
    short_circuit_on_err_emit,
    short_circuit_on_err_truncate,
)

4. Reference Implementations

All breakers are pure generators and guarantee upstream cleanup via try/finally.

4.1 Emit Breakers (Observable Termination)

def short_circuit_on_err_emit(
    xs: Iterable[Result[T, E]],
) -> Iterator[Result[T, E | BreakInfo[E]]]:
    """Yield items until first Err (which is yielded), then emit terminal BreakInfo."""
    it = iter(xs)
    n_ok = n_err = 0
    last_err: E | None = None
    exhausted = False
    try:
        for r in it:
            yield r
            if isinstance(r, Ok):
                n_ok += 1
            else:
                n_err += 1
                last_err = r.error
                bi = BreakInfo(
                    code="BREAK/FIRST_ERR",
                    reason="first error encountered",
                    last_error=last_err,
                    n_ok=n_ok,
                    n_err=n_err,
                    total=n_ok + n_err,
                    threshold=MappingProxyType({}),
                )
                yield Err(bi)
                return
        exhausted = True
    finally:
        if not exhausted:
            close = getattr(it, "close", None)
            if callable(close):
                close()
def circuit_breaker_rate_emit(
    xs: Iterable[Result[T, E]],
    *,
    max_rate: float,
    min_samples: int = 100,
) -> Iterator[Result[T, E | BreakInfo[E]]]:
    """Yield until error rate > max_rate after min_samples, then emit terminal BreakInfo."""
    if not 0.0 < max_rate < 1.0:
        raise ValueError("max_rate must be in (0,1)")
    if min_samples < 1:
        raise ValueError("min_samples >= 1")
    it = iter(xs)
    n_ok = n_err = 0
    last_err: E | None = None
    exhausted = False
    try:
        for r in it:
            yield r
            if isinstance(r, Ok):
                n_ok += 1
            else:
                n_err += 1
                last_err = r.error
            total = n_ok + n_err
            if total >= min_samples and n_err / total > max_rate:
                bi = BreakInfo(
                    code="BREAK/ERR_RATE",
                    reason=f"error rate {n_err/total:.3f} > {max_rate}",
                    last_error=last_err,
                    n_ok=n_ok,
                    n_err=n_err,
                    total=total,
                    threshold=MappingProxyType({"max_rate": max_rate, "min_samples": min_samples}),
                )
                yield Err(bi)
                return
        exhausted = True
    finally:
        if not exhausted:
            close = getattr(it, "close", None)
            if callable(close):
                close()
def circuit_breaker_count_emit(
    xs: Iterable[Result[T, E]],
    *,
    max_errs: int,
) -> Iterator[Result[T, E | BreakInfo[E]]]:
    """Yield until error count > max_errs, then emit terminal BreakInfo."""
    if max_errs < 0:
        raise ValueError("max_errs >= 0")
    it = iter(xs)
    n_ok = n_err = 0
    last_err: E | None = None
    exhausted = False
    try:
        for r in it:
            yield r
            if isinstance(r, Err):
                n_err += 1
                last_err = r.error
                if n_err > max_errs:
                    bi = BreakInfo(
                        code="BREAK/ERR_COUNT",
                        reason=f"errors {n_err} > {max_errs}",
                        last_error=last_err,
                        n_ok=n_ok,
                        n_err=n_err,
                        total=n_ok + n_err,
                        threshold=MappingProxyType({"max_errs": max_errs}),
                    )
                    yield Err(bi)
                    return
            else:
                n_ok += 1
        exhausted = True
    finally:
        if not exhausted:
            close = getattr(it, "close", None)
            if callable(close):
                close()
def circuit_breaker_pred_emit(
    xs: Iterable[Result[T, E]],
    pred: Callable[[Result[T, E]], bool],
) -> Iterator[Result[T, E | BreakInfo[E]]]:
    """Yield until pred(r) is True, then emit terminal BreakInfo."""
    it = iter(xs)
    n_ok = n_err = 0
    last_err: E | None = None
    exhausted = False
    try:
        for r in it:
            yield r
            if isinstance(r, Ok):
                n_ok += 1
            else:
                n_err += 1
                last_err = r.error
            if pred(r):
                bi = BreakInfo(
                    code="BREAK/PRED",
                    reason="predicate triggered",
                    last_error=last_err,
                    n_ok=n_ok,
                    n_err=n_err,
                    total=n_ok + n_err,
                    threshold=MappingProxyType({}),
                )
                yield Err(bi)
                return
        exhausted = True
    finally:
        if not exhausted:
            close = getattr(it, "close", None)
            if callable(close):
                close()

4.2 Truncate Breakers (Silent Termination)

def short_circuit_on_err_truncate(xs: Iterable[Result[T, E]]) -> Iterator[Result[T, E]]:
    """Yield until first Err, then stop silently. No terminal value."""
    it = iter(xs)
    exhausted = False
    try:
        for r in it:
            yield r
            if isinstance(r, Err):
                return
        exhausted = True
    finally:
        if not exhausted:
            close = getattr(it, "close", None)
            if callable(close):
                close()

(The other truncate variants follow the same pattern — return instead of yielding BreakInfo.)

4.3 Idiomatic RAG Usage

embedded = circuit_breaker_rate_emit(
    embedded,
    max_rate=0.2,
    min_samples=500,
)

for r in embedded:
    if isinstance(r, Err) and isinstance(r.error, BreakInfo):
        report_circuit_break(r.error)
        break
    if isinstance(r, Ok):
        index_chunk(r.value)
    else:
        log_err_info(r.error)

5. Property-Based Proofs (capstone/tests/test_breakers.py)

@given(items=st.lists(st.integers()).filter(lambda v: 0 in v))
def test_emit_breakers_short_circuit(items):
    first_err_pos = items.index(0)
    def f(x: int) -> Result[int, str]:
        return Ok(x) if x != 0 else Err("ZERO")
    results = list(short_circuit_on_err_emit(map_result_iter(f, items)))
    assert len(results) == first_err_pos + 2  # items + Err + BreakInfo
    assert isinstance(results[-1], Err) and isinstance(results[-1].error, BreakInfo)
    bi = results[-1].error
    assert bi.n_ok == first_err_pos
    assert bi.n_err == 1
    assert bi.total == bi.n_ok + bi.n_err
    assert bi.last_error == "ZERO"

@given(items=st.lists(st.integers()))
def test_truncate_breakers_stop_silently(items):
    def f(x: int) -> Result[int, str]:
        return Ok(x) if x != 0 else Err("ZERO")
    results = list(short_circuit_on_err_truncate(map_result_iter(f, items)))
    if 0 in items:
        assert len(results) == items.index(0) + 1
    else:
        assert len(results) == len(items)

@given(items=st.lists(st.integers()))
def test_upstream_closed_on_break(items):
    closed = False
    sentinel_seen = False
    def src():
        nonlocal closed, sentinel_seen
        try:
            for x in items:
                yield Ok(x) if x != 0 else Err("ZERO")
                if x == 0:
                    yield Ok("should not be reached")
                    sentinel_seen = True
        finally:
            closed = True
    results = list(short_circuit_on_err_truncate(src()))
    assert not sentinel_seen
    assert closed

@given(items=st.lists(st.integers()))
def test_breaker_ordering(items):
    def f(x: int) -> Result[int, str]:
        return Ok(x) if x != 0 else Err("ZERO")
    full = list(map_result_iter(f, items))
    broken = list(short_circuit_on_err_emit(map_result_iter(f, items)))
    # Strip terminal BreakInfo if present
    if broken and isinstance(broken[-1], Err) and isinstance(broken[-1].error, BreakInfo):
        prefix = broken[:-1]
    else:
        prefix = broken
    assert prefix == full[:len(prefix)]

@given(items=st.lists(st.integers()))
def test_breaker_equivalence_to_full_scan(items):
    def f(x: int) -> Result[int, str]:
        return Ok(x) if x != 0 else Err("ZERO")
    full = list(map_result_iter(f, items))
    broken = list(short_circuit_on_err_emit(map_result_iter(f, items)))
    # Strip terminal BreakInfo if present
    if broken and isinstance(broken[-1], Err) and isinstance(broken[-1].error, BreakInfo):
        prefix = broken[:-1]
    else:
        prefix = broken
    assert prefix == full[:len(prefix)]

def test_count_breaker_off_by_one():
    xs: list[Result[int, str]] = [Err("E"), Err("E"), Err("E")]
    results = list(circuit_breaker_count_emit(xs, max_errs=1))
    # first Err + terminal BreakInfo
    assert len(results) == 2
    assert isinstance(results[0], Err)
    assert isinstance(results[1], Err) and isinstance(results[1].error, BreakInfo)
    bi = results[1].error
    assert bi.n_err == 2
    assert bi.threshold["max_errs"] == 1

6. Big-O & Allocation Guarantees

Variant Time Heap Laziness
*_emit breakers O(k) on trigger / O(N) O(1) Yes
*_truncate breakers O(k) on trigger / O(N) O(1) Yes

All breakers are truly lazy generators with O(1) auxiliary memory.


7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Manual flag-based early exit Duplicated buggy code Use *_emit or *_truncate breakers
Continuing after fatal error rate Wasted hours of compute Use circuit_breaker_rate_*
Resource leaks on early break Open files/connections All breakers close upstream on termination

8. Pre-Core Quiz

  1. Emit breaker for…? → Observable termination with BreakInfo
  2. Truncate breaker for…? → Silent early stop
  3. Rate breaker for…? → Abort on error rate threshold
  4. Resource cleanup on break…? → Automatic via try/finally
  5. Never do manually what…? → A breaker can do safely

9. Post-Core Exercise

  1. Add circuit_breaker_rate_emit to embedding stream → test early abort on injected failures.
  2. Use short_circuit_on_err_truncate for quick debug runs → verify no resource leaks.
  3. Implement custom predicate breaker for "stop on first OOM" → test.
  4. Combine with par_try_map_iter → verify parallel work stops on trigger.

Continue with: Resource-Aware Streams

You now have the complete toolkit to abort doomed runs instantly while keeping every line of code pure, composable, and resource-safe. The rest of Module 4 is about retries and final reporting.