Streaming Error Handling¶
Concept Position¶
flowchart TD
family["Python Programming"] --> program["Python Functional Programming"]
program --> module["Module 04: Streaming Resilience and Failure Handling"]
module --> concept["Streaming Error Handling"]
concept --> capstone["Capstone pressure point"]
flowchart TD
problem["Start with the design or failure question"] --> example["Study the worked example and trade-offs"]
example --> boundary["Name the boundary this page is trying to protect"]
boundary --> proof["Carry that question into code review or the capstone"]
Read the first diagram as a placement map: this page is one concept inside its parent module, not a detached essay, and the capstone is the pressure test for whether the idea holds. Read the second diagram as the working rhythm for the page: name the problem, study the example, identify the boundary, then carry one review question forward.
Progression Note¶
By the end of Module 4, you will master safe recursion over unpredictable tree-shaped data, monoidal folds as the universal recursion pattern, Result/Option for streaming error handling, validation aggregators, retries, and structured error reporting — all while preserving laziness, equational reasoning, and constant call-stack usage.
Here's a snippet from the progression map:
| Module | Focus | Key Outcomes |
|---|---|---|
| 3 | Lazy Iteration & Generators | Memory-efficient streaming, itertools mastery, short-circuiting, observability |
| 4 | Safe Recursion & Error Handling in Streams | Stack-safe tree recursion, folds, Result/Option, streaming validation/retries/reports |
| 5 | Advanced Type-Driven Design | ADTs, exhaustive pattern matching, total functions, refined types |
Core question:
How do you keep a lazy streaming pipeline flowing when individual records fail, while faithfully collecting every error with full provenance and enabling one-pass routing, recovery, or parallel processing — all without materialising the stream?
We now take the TreeDoc → Chunk → Result[Chunk, ErrInfo] stream from M04C04 and face the real-world production reality:
99 % of chunks embed successfully, but 1 % fail for different reasons (Unicode, OOM, model rejection, network timeout).
A naïve pipeline would:
You lose everything after the first bad chunk.
Even a careful try/except loop either: - halts on first error, - silently drops bad chunks, - or materialises everything just to separate good/bad.
The production solution uses tiny, composable streaming combinators that treat Result as a normal value:
embedded = par_try_map_iter(embed_chunk, chunks_with_path, stage="embed")
# → Iterator[Result[Chunk, ErrInfo]] that never raises and preserves order
The stream continues forever; good chunks flow through immediately; every failure is captured with full provenance and can be logged, retried, or routed without stopping anything.
Audience: Engineers who run RAG (or any data-processing) pipelines over real-world messy data and refuse to lose 99 % of their work because of 1 % bad records.
Outcome:
1. You will process mixed good/bad streams with zero halting and O(1) memory per item.
2. You will route, log, recover, or aggregate errors in one pass using lazy combinators.
3. You will ship a RAG pipeline that survives any per-chunk catastrophe and delivers rich, structured error reports.
We formalise exactly what we want from correct, production-ready streaming error handling: continuation, ordering, separation, bounded work, and perfect containment.
Concrete Motivating Example¶
Same deep TreeDoc from previous cores:
- 100 000 chunks total.
- 99 000 embed successfully.
- 800 contain truncated UTF-8 →
UnicodeDecodeError. - 200 are >10 MB →
MemoryErrorin embedder.
Desired behaviour:
embedded: Iterator[Result[Chunk, ErrInfo]] = par_try_map_iter(
embed_chunk,
chunks_with_path,
stage="embed",
key_path=lambda cp: cp[1],
)
# Never raises, processes all 100k items, yields 99k Ok + 1k Err instantly
oks, errs = partition_results(embedded) # materialise only at the very end
- Total time ≈ time for 99k successful embeddings (the 1k failures are near-instant).
- Full provenance on every error (tree path, stage, cause).
1. Laws & Invariants (machine-checked)¶
| Law | Formal Statement | Enforcement |
|---|---|---|
| Continuation | Pipeline never halts on Err; every input item produces exactly one output item. |
test_continuation_full_output. |
| Ordering | All combinators preserve input order in their outputs (successes and failures appear in original sequence). | test_ordering_preservation. |
| Separation (lazy) | filter_ok / filter_err partition the stream into good/bad subsequences, each preserving relative order, without materialisation. |
test_lazy_separation_equivalence. |
| Bounded-Work | Processing first k items (good or bad) performs exactly k applications of the wrapped function. | test_bounded_work. |
| Containment | No unhandled exception escapes any combinator; every failure becomes an Err. |
test_containment_no_leak. |
| One-Pass Routing | split_results_to_sinks visits each item exactly once. |
test_single_pass_split. |
These laws guarantee the stream is robust, predictable, and truly lazy.
2. Decision Table – Which Combinator Do You Actually Use?¶
| Goal | Need Parallelism? | Need Recovery? | Need One-Pass Routing? | Recommended Combinator |
|---|---|---|---|---|
| Simple exception containment | No | No | No | try_map_iter |
| Order-preserving parallelism | Yes | No | No | par_try_map_iter |
| Stream only successes | – | No | No | filter_ok |
| Stream/log only failures | – | No | No | filter_err / tap_err |
| Recover failures to values | – | Yes | No | recover_iter |
| Recover failures to Result | – | Yes | No | recover_result_iter |
| Route to two sinks in one pass | – | No | Yes | split_results_to_sinks |
| Route + contain sink exceptions | – | No | Yes | split_results_to_sinks_guarded |
| Materialise good/bad at end | – | No | No | partition_results (endpoint only) |
Never materialise early for separation — use one-pass routing instead.
3. Public API Surface (end-of-Module-04 refactor note)¶
Refactor note: streaming Result combinators live in funcpipe_rag.result.stream (capstone/src/funcpipe_rag/result/stream.py) and are re-exported from
funcpipe_rag.result and funcpipe_rag.api.core.
from funcpipe_rag.api.core import (
filter_err,
filter_ok,
par_try_map_iter,
partition_results,
recover_iter,
recover_result_iter,
split_results_to_sinks,
split_results_to_sinks_guarded,
tap_err,
tap_ok,
try_map_iter,
)
4. Reference Implementations¶
4.1 try_map_iter – Exception-Safe Lazy Wrapping¶
def try_map_iter(
fn: Callable[[T], U],
xs: Iterable[T],
*,
stage: str,
key_path: Callable[[T], tuple[int, ...]] | None = None,
code: str = "PIPE/EXC",
) -> Iterator[Result[U, ErrInfo]]:
for x in xs:
try:
yield Ok(fn(x))
except Exception as exc:
p = key_path(x) if key_path is not None else ()
yield Err(make_errinfo(code, str(exc), stage, p, exc))
4.2 par_try_map_iter – Order-Preserving Parallel Mapping¶
from collections import deque
from concurrent.futures import ThreadPoolExecutor, Future
def par_try_map_iter(
fn: Callable[[T], U],
xs: Iterable[T],
*,
stage: str,
key_path: Callable[[T], tuple[int, ...]] | None = None,
code: str = "PIPE/EXC",
max_workers: int = 8,
max_in_flight: int = 32,
) -> Iterator[Result[U, ErrInfo]]:
it = iter(xs)
inflight: deque[tuple[int, T, Future[U]]] = deque()
idx = 0
with ThreadPoolExecutor(max_workers=max_workers) as ex:
# Prime the pipeline
while len(inflight) < max_in_flight:
try:
x = next(it)
except StopIteration:
break
inflight.append((idx, x, ex.submit(fn, x)))
idx += 1
out_idx = 0
while inflight:
# Drain in order
while inflight and inflight[0][0] == out_idx:
i, x, fut = inflight.popleft()
try:
yield Ok(fut.result())
except Exception as exc:
p = key_path(x) if key_path is not None else ()
yield Err(make_errinfo(code, str(exc), stage, p, exc))
out_idx += 1
# Refill
if len(inflight) < max_in_flight:
try:
x = next(it)
except StopIteration:
continue
inflight.append((idx, x, ex.submit(fn, x)))
idx += 1
4.3 The Rest (concise, correct, lazy)¶
def filter_ok(xs: Iterable[Result[T, E]]) -> Iterator[T]:
for r in xs:
if isinstance(r, Ok):
yield r.value
def filter_err(xs: Iterable[Result[T, E]]) -> Iterator[E]:
for r in xs:
if isinstance(r, Err):
yield r.error
def tap_ok(xs: Iterable[Result[T, E]], fn: Callable[[T], None]) -> Iterator[Result[T, E]]:
"""Observational tap only – fn may log or increment metrics but must not mutate values."""
for r in xs:
if isinstance(r, Ok):
fn(r.value)
yield r
def tap_err(xs: Iterable[Result[T, E]], fn: Callable[[E], None]) -> Iterator[Result[T, E]]:
"""Observational tap only – fn may log or increment metrics but must not mutate values."""
for r in xs:
if isinstance(r, Err):
fn(r.error)
yield r
def recover_iter(xs: Iterable[Result[T, E]], fn: Callable[[E], T]) -> Iterator[T]:
for r in xs:
yield r.value if isinstance(r, Ok) else fn(r.error)
def recover_result_iter(xs: Iterable[Result[T, E]], fn: Callable[[E], Result[T, E]]) -> Iterator[Result[T, E]]:
for r in xs:
yield r if isinstance(r, Ok) else fn(r.error)
def split_results_to_sinks(
xs: Iterable[Result[T, E]],
on_ok: Callable[[T], None],
on_err: Callable[[E], None],
) -> None:
for r in xs:
if isinstance(r, Ok):
on_ok(r.value)
else:
on_err(r.error)
def split_results_to_sinks_guarded(
xs: Iterable[Result[T, E]],
on_ok: Callable[[T], None],
on_err: Callable[[E], None],
*,
stage: str = "sink",
) -> Iterator[Result[None, ErrInfo]]:
"""Contain sink exceptions; original Err is processed before sink may fail."""
for r in xs:
try:
if isinstance(r, Ok):
on_ok(r.value)
else:
on_err(r.error)
yield Ok(None)
except Exception as exc:
yield Err(make_errinfo("SINK/EXC", str(exc), stage, (), exc))
def partition_results(xs: Iterable[Result[T, E]]) -> tuple[list[T], list[E]]:
oks: list[T] = []
errs: list[E] = []
for r in xs:
if isinstance(r, Ok):
oks.append(r.value)
else:
errs.append(r.error)
return oks, errs
4.4 Idiomatic RAG Usage¶
embedded = par_try_map_iter(
embed_chunk,
chunks_with_path,
stage="embed",
key_path=lambda cp: cp[1],
)
split_results_to_sinks(
tap_err(embedded, log_err_info),
on_ok=index_chunk,
on_err=error_warehouse,
)
5. Property-Based Proofs (capstone/tests/test_result_stream.py)¶
@given(items=st.lists(st.integers()))
def test_continuation_full_output(items):
def f(x: int) -> int:
if x == -1:
raise ValueError("boom")
return x
results = list(try_map_iter(f, items, stage="test"))
assert len(results) == len(items)
@given(items=st.lists(st.integers()))
def test_ordering_preservation(items):
tagged = list(enumerate(items))
def f(iv):
i, v = iv
if v % 2 == 0:
raise ValueError("even")
return iv
results = list(try_map_iter(f, tagged, stage="test", key_path=lambda iv: (iv[0],)))
ok_indices = [r.value[0] for r in results if isinstance(r, Ok)]
err_indices = [r.error.path[0] for r in results if isinstance(r, Err)]
assert ok_indices + err_indices == list(range(len(items)))
@given(items=st.lists(st.integers()))
def test_lazy_separation_equivalence(items):
def f(x: int) -> int:
if x % 2 == 0:
raise ValueError("even")
return x
stream = try_map_iter(f, items, stage="test")
oks = list(filter_ok(stream))
stream2 = try_map_iter(f, items, stage="test")
errs = list(filter_err(stream2))
assert len(oks) + len(errs) == len(items)
@given(items=st.lists(st.integers()))
def test_single_pass_split(items):
seen = 0
def on_ok(_): nonlocal seen; seen += 1
def on_err(_): nonlocal seen; seen += 1
def f(x: int) -> int:
if x % 2 == 0:
raise ValueError("even")
return x
split_results_to_sinks(try_map_iter(f, items, stage="test"), on_ok, on_err)
assert seen == len(items)
@given(items=st.lists(st.integers()))
def test_bounded_work(items):
seen = 0
def f(x):
nonlocal seen
seen += 1
if x == 0:
raise ValueError("zero")
return x
stream = try_map_iter(f, items, stage="test")
list(islice(stream, 25))
assert seen == min(25, len(items))
@given(items=st.lists(st.integers()))
def test_try_map_iter_matches_try_except(items):
def f(x: int) -> int:
if x == 0:
raise ValueError("boom")
return 100 // x
# reference try/except
ref: list[Result[int, ErrInfo]] = []
for x in items:
try:
ref.append(Ok(100 // x))
except Exception as exc:
# match production arity: include `exc` as cause
ref.append(Err(make_errinfo("PIPE/EXC", str(exc), "test", (), exc)))
got = list(try_map_iter(f, items, stage="test"))
# Compare shapes, values, and error codes/messages (cause/path may differ)
assert [isinstance(r, Ok) for r in got] == [isinstance(r, Ok) for r in ref]
assert [r.value for r in got if isinstance(r, Ok)] == \
[r.value for r in ref if isinstance(r, Ok)]
assert [r.error.code for r in got if isinstance(r, Err)] == \
[r.error.code for r in ref if isinstance(r, Err)]
@given(items=st.lists(st.integers(), min_size=1, max_size=200))
def test_par_try_map_iter_matches_try_map_iter(items):
def f(x: int) -> int:
if x == 0:
raise ValueError("boom")
return 100 // x
seq = list(try_map_iter(f, items, stage="test"))
par = list(par_try_map_iter(f, items, stage="test", max_workers=4, max_in_flight=8))
assert [isinstance(r, Ok) for r in par] == [isinstance(r, Ok) for r in seq]
assert [r.value if isinstance(r, Ok) else r.error.msg for r in par] == \
[r.value if isinstance(r, Ok) else r.error.msg for r in seq]
@given(items=st.lists(st.integers()))
def test_containment_no_leak(items):
def f(x: int) -> int:
raise ValueError("always fail")
# This should never raise
list(try_map_iter(f, items, stage="test"))
list(par_try_map_iter(f, items, stage="test"))
6. Big-O & Allocation Guarantees¶
| Variant | Time per item | Heap per item | Laziness |
|---|---|---|---|
| try_map_iter | O(1) | O(1) | Yes |
| par_try_map_iter | O(1) amortised | O(max_in_flight) total | Yes |
| filter_ok / filter_err / tap_* | O(1) | O(1) | Yes |
| recover_iter / recover_result_iter | O(1) | O(1) | Yes |
| split_results_to_sinks | O(1) | O(1) | Yes |
| partition_results | O(1) | O(N) total | No |
All streaming operations are truly lazy.
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Halting on first exception | Lost data after first failure | Use try_map_iter / par_try_map_iter |
| Materialising early for separation | Memory blowup | Use split_results_to_sinks (one-pass) |
| Silent drop of bad records | Incomplete results | Use filter_err or tap_err to capture |
| Sink exceptions crashing pipeline | Partial processing | Use split_results_to_sinks_guarded |
8. Pre-Core Quiz¶
- try_map_iter for…? → Exception-safe lazy mapping
- par_try_map_iter for…? → Order-preserving parallel mapping
- filter_ok for…? → Stream only successes
- split_results_to_sinks for…? → One-pass routing to two sinks
- recover_iter for…? → Recover Err to values lazily
9. Post-Core Exercise¶
- Replace a try/except loop with
try_map_iter→ verify full output on mixed data. - Add
tap_errfor logging → test laziness withislice. - Use
par_try_map_iterfor embedding → measure speedup on real dataset. - Implement
split_results_to_sinks_guardedfor safe indexing + error warehouse. - Add ordering test to your own pipeline using tagged inputs.
Continue with: Error Aggregation
You now have the complete toolkit to process real-world messy data without ever losing a single record or halting on one bad apple. The rest of Module 4 is about aggregating and reporting those errors beautifully.