Circuit Breakers¶
Concept Position¶
flowchart TD
family["Python Programming"] --> program["Python Functional Programming"]
program --> module["Module 04: Streaming Resilience and Failure Handling"]
module --> concept["Circuit Breakers"]
concept --> capstone["Capstone pressure point"]
flowchart TD
problem["Start with the design or failure question"] --> example["Study the worked example and trade-offs"]
example --> boundary["Name the boundary this page is trying to protect"]
boundary --> proof["Carry that question into code review or the capstone"]
Read the first diagram as a placement map: this page is one concept inside its parent module, not a detached essay, and the capstone is the pressure test for whether the idea holds. Read the second diagram as the working rhythm for the page: name the problem, study the example, identify the boundary, then carry one review question forward.
Progression Note¶
By the end of Module 4, you will master safe recursion over unpredictable tree-shaped data, monoidal folds as the universal recursion pattern, Result/Option for streaming error handling, validation aggregators, retries, and structured error reporting — all while preserving laziness, equational reasoning, and constant call-stack usage.
Here's a snippet from the progression map:
| Module | Focus | Key Outcomes |
|---|---|---|
| 3 | Lazy Iteration & Generators | Memory-efficient streaming, itertools mastery, short-circuiting, observability |
| 4 | Safe Recursion & Error Handling in Streams | Stack-safe tree recursion, folds, Result/Option, streaming validation/retries/reports |
| 5 | Advanced Type-Driven Design | ADTs, exhaustive pattern matching, total functions, refined types |
Core question:
How do you implement short-circuiting and circuit-breaker patterns in streaming pipelines using pure Result types, ensuring early termination on thresholds or failures while maintaining purity, composability, and resource safety?
We now take the Iterator[Result[Chunk, ErrInfo]] stream from M04C05–C06 and face the final real-world question:
“Processing 100 000 chunks is expensive. If the error rate climbs above 20 % after the first 500 chunks, the whole run is doomed — why waste hours finishing it?”
The naïve solution is a manual flag inside a loop:
error_rate = 0.0
seen = 0
for r in embedded:
seen += 1
if isinstance(r, Err):
n_err += 1
if seen >= 500:
error_rate = n_err / seen
if error_rate > 0.2:
logger.critical("Aborting run – error rate too high")
break
process(r)
This works once — but it’s duplicated everywhere, easy to get wrong, and breaks when you later add parallelism or recovery.
The production solution uses pure, composable circuit-breakers — lazy iterator transducers over Result streams that give you early termination with mathematical guarantees and automatic resource cleanup.
Audience: Engineers who run long-running batch pipelines and cannot afford to process doomed data for hours.
Outcome:
1. You will short-circuit on first error, error count, error rate, or arbitrary predicate — all with O(k) work.
2. You will choose between observable breakers (emit BreakInfo) and silent truncate breakers.
3. You will ship a RAG pipeline that aborts gracefully the moment it becomes hopeless, with full provenance on why.
We formalise exactly what we want from correct, production-ready breakers: short-circuiting, ordering, bounded work, resource cleanup, and equivalence to reference implementations.
Concrete Motivating Example¶
Same 100 000 chunk tree from previous cores:
- Normal success rate ≈ 99 %.
- One malformed section causes 5 000 consecutive failures → error rate spikes to 30 % after 10 000 chunks processed.
Desired behaviour:
embedded = circuit_breaker_rate_emit(
embedded,
max_rate=0.2,
min_samples=500,
)
for r in embedded:
if isinstance(r, Err) and isinstance(r.error, BreakInfo):
report_circuit_break(r.error)
break
if isinstance(r, Ok):
index_chunk(r.value)
else:
log_err_info(r.error)
Total work: ~10 500 chunk attempts (stops instantly when threshold hit).
1. Laws & Invariants (machine-checked)¶
| Law | Formal Statement | Enforcement |
|---|---|---|
| Short-Circuit | Emit/truncate breakers stop after trigger, performing O(k) work where k ≤ position of trigger. | test_emit_breakers_short_circuit, test_truncate_breakers_stop_silently. |
| Ordering | Items (including terminal BreakInfo) appear in original stream order. |
test_breaker_ordering. |
| Resource Cleanup | Upstream generators are closed via their close() method on early termination. |
test_upstream_closed_on_break. |
| Purity | Breakers are pure functions — deterministic, no side effects beyond iteration. | Reproducibility tests. |
| Equivalence | For finite streams, breaker output equals the full stream truncated at the first trigger position, optionally followed by a terminal BreakInfo. |
test_breaker_equivalence_to_full_scan. |
These laws guarantee breakers are safe, predictable, and resource-correct.
2. Decision Table – Which Breaker Do You Actually Use?¶
| Trigger | Need Observable Break? | Need Terminal Value? | Recommended Variant |
|---|---|---|---|
| First error | Yes | Yes | short_circuit_on_err_emit |
| First error (silent) | No | No | short_circuit_on_err_truncate |
| Error rate threshold | Yes | Yes | circuit_breaker_rate_emit |
| Error rate threshold (silent) | No | No | circuit_breaker_rate_truncate |
| Error count threshold | Yes | Yes | circuit_breaker_count_emit |
| Custom predicate | Yes | Yes | circuit_breaker_pred_emit |
Prefer emit variants — they give you a terminal BreakInfo for reporting without breaking composability.
3. Public API Surface (end-of-Module-04 refactor note)¶
Refactor note: breakers live in funcpipe_rag.policies.breakers (capstone/src/funcpipe_rag/policies/breakers.py) and are re-exported from funcpipe_rag.api.core.
All snippets assume the Result / Ok / Err ADT from earlier Module 4 cores is already imported.
from funcpipe_rag.api.core import (
BreakInfo,
circuit_breaker_count_emit,
circuit_breaker_count_truncate,
circuit_breaker_pred_emit,
circuit_breaker_pred_truncate,
circuit_breaker_rate_emit,
circuit_breaker_rate_truncate,
short_circuit_on_err_emit,
short_circuit_on_err_truncate,
)
4. Reference Implementations¶
All breakers are pure generators and guarantee upstream cleanup via try/finally.
4.1 Emit Breakers (Observable Termination)¶
def short_circuit_on_err_emit(
xs: Iterable[Result[T, E]],
) -> Iterator[Result[T, E | BreakInfo[E]]]:
"""Yield items until first Err (which is yielded), then emit terminal BreakInfo."""
it = iter(xs)
n_ok = n_err = 0
last_err: E | None = None
exhausted = False
try:
for r in it:
yield r
if isinstance(r, Ok):
n_ok += 1
else:
n_err += 1
last_err = r.error
bi = BreakInfo(
code="BREAK/FIRST_ERR",
reason="first error encountered",
last_error=last_err,
n_ok=n_ok,
n_err=n_err,
total=n_ok + n_err,
threshold=MappingProxyType({}),
)
yield Err(bi)
return
exhausted = True
finally:
if not exhausted:
close = getattr(it, "close", None)
if callable(close):
close()
def circuit_breaker_rate_emit(
xs: Iterable[Result[T, E]],
*,
max_rate: float,
min_samples: int = 100,
) -> Iterator[Result[T, E | BreakInfo[E]]]:
"""Yield until error rate > max_rate after min_samples, then emit terminal BreakInfo."""
if not 0.0 < max_rate < 1.0:
raise ValueError("max_rate must be in (0,1)")
if min_samples < 1:
raise ValueError("min_samples >= 1")
it = iter(xs)
n_ok = n_err = 0
last_err: E | None = None
exhausted = False
try:
for r in it:
yield r
if isinstance(r, Ok):
n_ok += 1
else:
n_err += 1
last_err = r.error
total = n_ok + n_err
if total >= min_samples and n_err / total > max_rate:
bi = BreakInfo(
code="BREAK/ERR_RATE",
reason=f"error rate {n_err/total:.3f} > {max_rate}",
last_error=last_err,
n_ok=n_ok,
n_err=n_err,
total=total,
threshold=MappingProxyType({"max_rate": max_rate, "min_samples": min_samples}),
)
yield Err(bi)
return
exhausted = True
finally:
if not exhausted:
close = getattr(it, "close", None)
if callable(close):
close()
def circuit_breaker_count_emit(
xs: Iterable[Result[T, E]],
*,
max_errs: int,
) -> Iterator[Result[T, E | BreakInfo[E]]]:
"""Yield until error count > max_errs, then emit terminal BreakInfo."""
if max_errs < 0:
raise ValueError("max_errs >= 0")
it = iter(xs)
n_ok = n_err = 0
last_err: E | None = None
exhausted = False
try:
for r in it:
yield r
if isinstance(r, Err):
n_err += 1
last_err = r.error
if n_err > max_errs:
bi = BreakInfo(
code="BREAK/ERR_COUNT",
reason=f"errors {n_err} > {max_errs}",
last_error=last_err,
n_ok=n_ok,
n_err=n_err,
total=n_ok + n_err,
threshold=MappingProxyType({"max_errs": max_errs}),
)
yield Err(bi)
return
else:
n_ok += 1
exhausted = True
finally:
if not exhausted:
close = getattr(it, "close", None)
if callable(close):
close()
def circuit_breaker_pred_emit(
xs: Iterable[Result[T, E]],
pred: Callable[[Result[T, E]], bool],
) -> Iterator[Result[T, E | BreakInfo[E]]]:
"""Yield until pred(r) is True, then emit terminal BreakInfo."""
it = iter(xs)
n_ok = n_err = 0
last_err: E | None = None
exhausted = False
try:
for r in it:
yield r
if isinstance(r, Ok):
n_ok += 1
else:
n_err += 1
last_err = r.error
if pred(r):
bi = BreakInfo(
code="BREAK/PRED",
reason="predicate triggered",
last_error=last_err,
n_ok=n_ok,
n_err=n_err,
total=n_ok + n_err,
threshold=MappingProxyType({}),
)
yield Err(bi)
return
exhausted = True
finally:
if not exhausted:
close = getattr(it, "close", None)
if callable(close):
close()
4.2 Truncate Breakers (Silent Termination)¶
def short_circuit_on_err_truncate(xs: Iterable[Result[T, E]]) -> Iterator[Result[T, E]]:
"""Yield until first Err, then stop silently. No terminal value."""
it = iter(xs)
exhausted = False
try:
for r in it:
yield r
if isinstance(r, Err):
return
exhausted = True
finally:
if not exhausted:
close = getattr(it, "close", None)
if callable(close):
close()
(The other truncate variants follow the same pattern — return instead of yielding BreakInfo.)
4.3 Idiomatic RAG Usage¶
embedded = circuit_breaker_rate_emit(
embedded,
max_rate=0.2,
min_samples=500,
)
for r in embedded:
if isinstance(r, Err) and isinstance(r.error, BreakInfo):
report_circuit_break(r.error)
break
if isinstance(r, Ok):
index_chunk(r.value)
else:
log_err_info(r.error)
5. Property-Based Proofs (capstone/tests/test_breakers.py)¶
@given(items=st.lists(st.integers()).filter(lambda v: 0 in v))
def test_emit_breakers_short_circuit(items):
first_err_pos = items.index(0)
def f(x: int) -> Result[int, str]:
return Ok(x) if x != 0 else Err("ZERO")
results = list(short_circuit_on_err_emit(map_result_iter(f, items)))
assert len(results) == first_err_pos + 2 # items + Err + BreakInfo
assert isinstance(results[-1], Err) and isinstance(results[-1].error, BreakInfo)
bi = results[-1].error
assert bi.n_ok == first_err_pos
assert bi.n_err == 1
assert bi.total == bi.n_ok + bi.n_err
assert bi.last_error == "ZERO"
@given(items=st.lists(st.integers()))
def test_truncate_breakers_stop_silently(items):
def f(x: int) -> Result[int, str]:
return Ok(x) if x != 0 else Err("ZERO")
results = list(short_circuit_on_err_truncate(map_result_iter(f, items)))
if 0 in items:
assert len(results) == items.index(0) + 1
else:
assert len(results) == len(items)
@given(items=st.lists(st.integers()))
def test_upstream_closed_on_break(items):
closed = False
sentinel_seen = False
def src():
nonlocal closed, sentinel_seen
try:
for x in items:
yield Ok(x) if x != 0 else Err("ZERO")
if x == 0:
yield Ok("should not be reached")
sentinel_seen = True
finally:
closed = True
results = list(short_circuit_on_err_truncate(src()))
assert not sentinel_seen
assert closed
@given(items=st.lists(st.integers()))
def test_breaker_ordering(items):
def f(x: int) -> Result[int, str]:
return Ok(x) if x != 0 else Err("ZERO")
full = list(map_result_iter(f, items))
broken = list(short_circuit_on_err_emit(map_result_iter(f, items)))
# Strip terminal BreakInfo if present
if broken and isinstance(broken[-1], Err) and isinstance(broken[-1].error, BreakInfo):
prefix = broken[:-1]
else:
prefix = broken
assert prefix == full[:len(prefix)]
@given(items=st.lists(st.integers()))
def test_breaker_equivalence_to_full_scan(items):
def f(x: int) -> Result[int, str]:
return Ok(x) if x != 0 else Err("ZERO")
full = list(map_result_iter(f, items))
broken = list(short_circuit_on_err_emit(map_result_iter(f, items)))
# Strip terminal BreakInfo if present
if broken and isinstance(broken[-1], Err) and isinstance(broken[-1].error, BreakInfo):
prefix = broken[:-1]
else:
prefix = broken
assert prefix == full[:len(prefix)]
def test_count_breaker_off_by_one():
xs: list[Result[int, str]] = [Err("E"), Err("E"), Err("E")]
results = list(circuit_breaker_count_emit(xs, max_errs=1))
# first Err + terminal BreakInfo
assert len(results) == 2
assert isinstance(results[0], Err)
assert isinstance(results[1], Err) and isinstance(results[1].error, BreakInfo)
bi = results[1].error
assert bi.n_err == 2
assert bi.threshold["max_errs"] == 1
6. Big-O & Allocation Guarantees¶
| Variant | Time | Heap | Laziness |
|---|---|---|---|
| *_emit breakers | O(k) on trigger / O(N) | O(1) | Yes |
| *_truncate breakers | O(k) on trigger / O(N) | O(1) | Yes |
All breakers are truly lazy generators with O(1) auxiliary memory.
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Manual flag-based early exit | Duplicated buggy code | Use *_emit or *_truncate breakers |
| Continuing after fatal error rate | Wasted hours of compute | Use circuit_breaker_rate_* |
| Resource leaks on early break | Open files/connections | All breakers close upstream on termination |
8. Pre-Core Quiz¶
- Emit breaker for…? → Observable termination with BreakInfo
- Truncate breaker for…? → Silent early stop
- Rate breaker for…? → Abort on error rate threshold
- Resource cleanup on break…? → Automatic via try/finally
- Never do manually what…? → A breaker can do safely
9. Post-Core Exercise¶
- Add
circuit_breaker_rate_emitto embedding stream → test early abort on injected failures. - Use
short_circuit_on_err_truncatefor quick debug runs → verify no resource leaks. - Implement custom predicate breaker for "stop on first OOM" → test.
- Combine with
par_try_map_iter→ verify parallel work stops on trigger.
Continue with: Resource-Aware Streams
You now have the complete toolkit to abort doomed runs instantly while keeping every line of code pure, composable, and resource-safe. The rest of Module 4 is about retries and final reporting.