Skip to content

Streaming Error Handling

Concept Position

flowchart TD
  family["Python Programming"] --> program["Python Functional Programming"]
  program --> module["Module 04: Streaming Resilience and Failure Handling"]
  module --> concept["Streaming Error Handling"]
  concept --> capstone["Capstone pressure point"]
flowchart TD
  problem["Start with the design or failure question"] --> example["Study the worked example and trade-offs"]
  example --> boundary["Name the boundary this page is trying to protect"]
  boundary --> proof["Carry that question into code review or the capstone"]

Read the first diagram as a placement map: this page is one concept inside its parent module, not a detached essay, and the capstone is the pressure test for whether the idea holds. Read the second diagram as the working rhythm for the page: name the problem, study the example, identify the boundary, then carry one review question forward.

Progression Note

By the end of Module 4, you will master safe recursion over unpredictable tree-shaped data, monoidal folds as the universal recursion pattern, Result/Option for streaming error handling, validation aggregators, retries, and structured error reporting — all while preserving laziness, equational reasoning, and constant call-stack usage.

Here's a snippet from the progression map:

Module Focus Key Outcomes
3 Lazy Iteration & Generators Memory-efficient streaming, itertools mastery, short-circuiting, observability
4 Safe Recursion & Error Handling in Streams Stack-safe tree recursion, folds, Result/Option, streaming validation/retries/reports
5 Advanced Type-Driven Design ADTs, exhaustive pattern matching, total functions, refined types

Core question:
How do you keep a lazy streaming pipeline flowing when individual records fail, while faithfully collecting every error with full provenance and enabling one-pass routing, recovery, or parallel processing — all without materialising the stream?

We now take the TreeDoc → Chunk → Result[Chunk, ErrInfo] stream from M04C04 and face the real-world production reality:

99 % of chunks embed successfully, but 1 % fail for different reasons (Unicode, OOM, model rejection, network timeout).

A naïve pipeline would:

for chunk in chunks:
    embedded.append(embed_chunk(chunk))   # raises → entire pipeline dies

You lose everything after the first bad chunk.

Even a careful try/except loop either: - halts on first error, - silently drops bad chunks, - or materialises everything just to separate good/bad.

The production solution uses tiny, composable streaming combinators that treat Result as a normal value:

embedded = par_try_map_iter(embed_chunk, chunks_with_path, stage="embed")
# → Iterator[Result[Chunk, ErrInfo]] that never raises and preserves order

The stream continues forever; good chunks flow through immediately; every failure is captured with full provenance and can be logged, retried, or routed without stopping anything.

Audience: Engineers who run RAG (or any data-processing) pipelines over real-world messy data and refuse to lose 99 % of their work because of 1 % bad records.

Outcome:
1. You will process mixed good/bad streams with zero halting and O(1) memory per item.
2. You will route, log, recover, or aggregate errors in one pass using lazy combinators.
3. You will ship a RAG pipeline that survives any per-chunk catastrophe and delivers rich, structured error reports.

We formalise exactly what we want from correct, production-ready streaming error handling: continuation, ordering, separation, bounded work, and perfect containment.


Concrete Motivating Example

Same deep TreeDoc from previous cores:

  • 100 000 chunks total.
  • 99 000 embed successfully.
  • 800 contain truncated UTF-8 → UnicodeDecodeError.
  • 200 are >10 MB → MemoryError in embedder.

Desired behaviour:

embedded: Iterator[Result[Chunk, ErrInfo]] = par_try_map_iter(
    embed_chunk,
    chunks_with_path,
    stage="embed",
    key_path=lambda cp: cp[1],
)

# Never raises, processes all 100k items, yields 99k Ok + 1k Err instantly
oks, errs = partition_results(embedded)   # materialise only at the very end
  • Total time ≈ time for 99k successful embeddings (the 1k failures are near-instant).
  • Full provenance on every error (tree path, stage, cause).

1. Laws & Invariants (machine-checked)

Law Formal Statement Enforcement
Continuation Pipeline never halts on Err; every input item produces exactly one output item. test_continuation_full_output.
Ordering All combinators preserve input order in their outputs (successes and failures appear in original sequence). test_ordering_preservation.
Separation (lazy) filter_ok / filter_err partition the stream into good/bad subsequences, each preserving relative order, without materialisation. test_lazy_separation_equivalence.
Bounded-Work Processing first k items (good or bad) performs exactly k applications of the wrapped function. test_bounded_work.
Containment No unhandled exception escapes any combinator; every failure becomes an Err. test_containment_no_leak.
One-Pass Routing split_results_to_sinks visits each item exactly once. test_single_pass_split.

These laws guarantee the stream is robust, predictable, and truly lazy.


2. Decision Table – Which Combinator Do You Actually Use?

Goal Need Parallelism? Need Recovery? Need One-Pass Routing? Recommended Combinator
Simple exception containment No No No try_map_iter
Order-preserving parallelism Yes No No par_try_map_iter
Stream only successes No No filter_ok
Stream/log only failures No No filter_err / tap_err
Recover failures to values Yes No recover_iter
Recover failures to Result Yes No recover_result_iter
Route to two sinks in one pass No Yes split_results_to_sinks
Route + contain sink exceptions No Yes split_results_to_sinks_guarded
Materialise good/bad at end No No partition_results (endpoint only)

Never materialise early for separation — use one-pass routing instead.


3. Public API Surface (end-of-Module-04 refactor note)

Refactor note: streaming Result combinators live in funcpipe_rag.result.stream (capstone/src/funcpipe_rag/result/stream.py) and are re-exported from funcpipe_rag.result and funcpipe_rag.api.core.

from funcpipe_rag.api.core import (
    filter_err,
    filter_ok,
    par_try_map_iter,
    partition_results,
    recover_iter,
    recover_result_iter,
    split_results_to_sinks,
    split_results_to_sinks_guarded,
    tap_err,
    tap_ok,
    try_map_iter,
)

4. Reference Implementations

4.1 try_map_iter – Exception-Safe Lazy Wrapping

def try_map_iter(
    fn: Callable[[T], U],
    xs: Iterable[T],
    *,
    stage: str,
    key_path: Callable[[T], tuple[int, ...]] | None = None,
    code: str = "PIPE/EXC",
) -> Iterator[Result[U, ErrInfo]]:
    for x in xs:
        try:
            yield Ok(fn(x))
        except Exception as exc:
            p = key_path(x) if key_path is not None else ()
            yield Err(make_errinfo(code, str(exc), stage, p, exc))

4.2 par_try_map_iter – Order-Preserving Parallel Mapping

from collections import deque
from concurrent.futures import ThreadPoolExecutor, Future

def par_try_map_iter(
    fn: Callable[[T], U],
    xs: Iterable[T],
    *,
    stage: str,
    key_path: Callable[[T], tuple[int, ...]] | None = None,
    code: str = "PIPE/EXC",
    max_workers: int = 8,
    max_in_flight: int = 32,
) -> Iterator[Result[U, ErrInfo]]:
    it = iter(xs)
    inflight: deque[tuple[int, T, Future[U]]] = deque()
    idx = 0

    with ThreadPoolExecutor(max_workers=max_workers) as ex:
        # Prime the pipeline
        while len(inflight) < max_in_flight:
            try:
                x = next(it)
            except StopIteration:
                break
            inflight.append((idx, x, ex.submit(fn, x)))
            idx += 1

        out_idx = 0
        while inflight:
            # Drain in order
            while inflight and inflight[0][0] == out_idx:
                i, x, fut = inflight.popleft()
                try:
                    yield Ok(fut.result())
                except Exception as exc:
                    p = key_path(x) if key_path is not None else ()
                    yield Err(make_errinfo(code, str(exc), stage, p, exc))
                out_idx += 1

            # Refill
            if len(inflight) < max_in_flight:
                try:
                    x = next(it)
                except StopIteration:
                    continue
                inflight.append((idx, x, ex.submit(fn, x)))
                idx += 1

4.3 The Rest (concise, correct, lazy)

def filter_ok(xs: Iterable[Result[T, E]]) -> Iterator[T]:
    for r in xs:
        if isinstance(r, Ok):
            yield r.value

def filter_err(xs: Iterable[Result[T, E]]) -> Iterator[E]:
    for r in xs:
        if isinstance(r, Err):
            yield r.error

def tap_ok(xs: Iterable[Result[T, E]], fn: Callable[[T], None]) -> Iterator[Result[T, E]]:
    """Observational tap only – fn may log or increment metrics but must not mutate values."""
    for r in xs:
        if isinstance(r, Ok):
            fn(r.value)
        yield r

def tap_err(xs: Iterable[Result[T, E]], fn: Callable[[E], None]) -> Iterator[Result[T, E]]:
    """Observational tap only – fn may log or increment metrics but must not mutate values."""
    for r in xs:
        if isinstance(r, Err):
            fn(r.error)
        yield r

def recover_iter(xs: Iterable[Result[T, E]], fn: Callable[[E], T]) -> Iterator[T]:
    for r in xs:
        yield r.value if isinstance(r, Ok) else fn(r.error)

def recover_result_iter(xs: Iterable[Result[T, E]], fn: Callable[[E], Result[T, E]]) -> Iterator[Result[T, E]]:
    for r in xs:
        yield r if isinstance(r, Ok) else fn(r.error)

def split_results_to_sinks(
    xs: Iterable[Result[T, E]],
    on_ok: Callable[[T], None],
    on_err: Callable[[E], None],
) -> None:
    for r in xs:
        if isinstance(r, Ok):
            on_ok(r.value)
        else:
            on_err(r.error)

def split_results_to_sinks_guarded(
    xs: Iterable[Result[T, E]],
    on_ok: Callable[[T], None],
    on_err: Callable[[E], None],
    *,
    stage: str = "sink",
) -> Iterator[Result[None, ErrInfo]]:
    """Contain sink exceptions; original Err is processed before sink may fail."""
    for r in xs:
        try:
            if isinstance(r, Ok):
                on_ok(r.value)
            else:
                on_err(r.error)
            yield Ok(None)
        except Exception as exc:
            yield Err(make_errinfo("SINK/EXC", str(exc), stage, (), exc))

def partition_results(xs: Iterable[Result[T, E]]) -> tuple[list[T], list[E]]:
    oks: list[T] = []
    errs: list[E] = []
    for r in xs:
        if isinstance(r, Ok):
            oks.append(r.value)
        else:
            errs.append(r.error)
    return oks, errs

4.4 Idiomatic RAG Usage

embedded = par_try_map_iter(
    embed_chunk,
    chunks_with_path,
    stage="embed",
    key_path=lambda cp: cp[1],
)

split_results_to_sinks(
    tap_err(embedded, log_err_info),
    on_ok=index_chunk,
    on_err=error_warehouse,
)

5. Property-Based Proofs (capstone/tests/test_result_stream.py)

@given(items=st.lists(st.integers()))
def test_continuation_full_output(items):
    def f(x: int) -> int:
        if x == -1:
            raise ValueError("boom")
        return x
    results = list(try_map_iter(f, items, stage="test"))
    assert len(results) == len(items)

@given(items=st.lists(st.integers()))
def test_ordering_preservation(items):
    tagged = list(enumerate(items))
    def f(iv):
        i, v = iv
        if v % 2 == 0:
            raise ValueError("even")
        return iv
    results = list(try_map_iter(f, tagged, stage="test", key_path=lambda iv: (iv[0],)))
    ok_indices = [r.value[0] for r in results if isinstance(r, Ok)]
    err_indices = [r.error.path[0] for r in results if isinstance(r, Err)]
    assert ok_indices + err_indices == list(range(len(items)))

@given(items=st.lists(st.integers()))
def test_lazy_separation_equivalence(items):
    def f(x: int) -> int:
        if x % 2 == 0:
            raise ValueError("even")
        return x
    stream = try_map_iter(f, items, stage="test")
    oks = list(filter_ok(stream))
    stream2 = try_map_iter(f, items, stage="test")
    errs = list(filter_err(stream2))
    assert len(oks) + len(errs) == len(items)

@given(items=st.lists(st.integers()))
def test_single_pass_split(items):
    seen = 0
    def on_ok(_): nonlocal seen; seen += 1
    def on_err(_): nonlocal seen; seen += 1
    def f(x: int) -> int:
        if x % 2 == 0:
            raise ValueError("even")
        return x
    split_results_to_sinks(try_map_iter(f, items, stage="test"), on_ok, on_err)
    assert seen == len(items)

@given(items=st.lists(st.integers()))
def test_bounded_work(items):
    seen = 0
    def f(x):
        nonlocal seen
        seen += 1
        if x == 0:
            raise ValueError("zero")
        return x
    stream = try_map_iter(f, items, stage="test")
    list(islice(stream, 25))
    assert seen == min(25, len(items))

@given(items=st.lists(st.integers()))
def test_try_map_iter_matches_try_except(items):
    def f(x: int) -> int:
        if x == 0:
            raise ValueError("boom")
        return 100 // x

    # reference try/except
    ref: list[Result[int, ErrInfo]] = []
    for x in items:
        try:
            ref.append(Ok(100 // x))
        except Exception as exc:
            # match production arity: include `exc` as cause
            ref.append(Err(make_errinfo("PIPE/EXC", str(exc), "test", (), exc)))

    got = list(try_map_iter(f, items, stage="test"))

    # Compare shapes, values, and error codes/messages (cause/path may differ)
    assert [isinstance(r, Ok) for r in got] == [isinstance(r, Ok) for r in ref]
    assert [r.value for r in got if isinstance(r, Ok)] == \
           [r.value for r in ref if isinstance(r, Ok)]
    assert [r.error.code for r in got if isinstance(r, Err)] == \
           [r.error.code for r in ref if isinstance(r, Err)]

@given(items=st.lists(st.integers(), min_size=1, max_size=200))
def test_par_try_map_iter_matches_try_map_iter(items):
    def f(x: int) -> int:
        if x == 0:
            raise ValueError("boom")
        return 100 // x

    seq = list(try_map_iter(f, items, stage="test"))
    par = list(par_try_map_iter(f, items, stage="test", max_workers=4, max_in_flight=8))

    assert [isinstance(r, Ok) for r in par] == [isinstance(r, Ok) for r in seq]
    assert [r.value if isinstance(r, Ok) else r.error.msg for r in par] == \
           [r.value if isinstance(r, Ok) else r.error.msg for r in seq]

@given(items=st.lists(st.integers()))
def test_containment_no_leak(items):
    def f(x: int) -> int:
        raise ValueError("always fail")
    # This should never raise
    list(try_map_iter(f, items, stage="test"))
    list(par_try_map_iter(f, items, stage="test"))

6. Big-O & Allocation Guarantees

Variant Time per item Heap per item Laziness
try_map_iter O(1) O(1) Yes
par_try_map_iter O(1) amortised O(max_in_flight) total Yes
filter_ok / filter_err / tap_* O(1) O(1) Yes
recover_iter / recover_result_iter O(1) O(1) Yes
split_results_to_sinks O(1) O(1) Yes
partition_results O(1) O(N) total No

All streaming operations are truly lazy.


7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Halting on first exception Lost data after first failure Use try_map_iter / par_try_map_iter
Materialising early for separation Memory blowup Use split_results_to_sinks (one-pass)
Silent drop of bad records Incomplete results Use filter_err or tap_err to capture
Sink exceptions crashing pipeline Partial processing Use split_results_to_sinks_guarded

8. Pre-Core Quiz

  1. try_map_iter for…? → Exception-safe lazy mapping
  2. par_try_map_iter for…? → Order-preserving parallel mapping
  3. filter_ok for…? → Stream only successes
  4. split_results_to_sinks for…? → One-pass routing to two sinks
  5. recover_iter for…? → Recover Err to values lazily

9. Post-Core Exercise

  1. Replace a try/except loop with try_map_iter → verify full output on mixed data.
  2. Add tap_err for logging → test laziness with islice.
  3. Use par_try_map_iter for embedding → measure speedup on real dataset.
  4. Implement split_results_to_sinks_guarded for safe indexing + error warehouse.
  5. Add ordering test to your own pipeline using tagged inputs.

Continue with: Error Aggregation

You now have the complete toolkit to process real-world messy data without ever losing a single record or halting on one bad apple. The rest of Module 4 is about aggregating and reporting those errors beautifully.