Skip to content

Structured Error Reports

Page Maps

graph LR
  family["Python Programming"]
  program["Python Functional Programming"]
  section["Streaming Resilience Failure Handling"]
  page["Structured Error Reports"]
  capstone["Capstone evidence"]

  family --> program --> section --> page
  page -.applies in.-> capstone
flowchart LR
  orient["Orient on the page map"] --> read["Read the main claim and examples"]
  read --> inspect["Inspect the related code, proof, or capstone surface"]
  inspect --> verify["Run or review the verification path"]
  verify --> apply["Apply the idea back to the module and capstone"]

This page is the payoff for the whole module. After making failures explicit, bounded, retryable, and resource-safe, the next move is turning that work into diagnostics that a team can actually read and act on.

Start With the Reporting Gap

Without a structured report, the earlier resilience work still tends to collapse back into noisy logs and scattered anecdotes. Reporting is the final functional reduction over the whole failure story.

  • If the only output is raw log lines, counts and patterns are hard to trust.
  • If sample errors are unbounded, the report becomes another memory risk.
  • If grouping rules are implicit, you cannot explain why the report is actionable rather than just verbose.

Core question:
How do you turn every error in a streaming pipeline into structured, serialisable, grouped reports — complete with counts, ordered samples, and retry metadata — while keeping the pipeline pure and memory-bounded?

This lesson introduces error reports as a bounded summary fold over the failure stream:

  • group failures by the dimensions the team actually uses to debug runs
  • keep bounded ordered samples so reports stay useful without exploding in size
  • produce serializable data structures that can be stored, diffed, and surfaced outside the running process

The motivating question is straightforward and important: once failures are captured well, how do we turn them into something actionable instead of noisy?

The naïve solution is scattered logging:

for r in embedded:
    if isinstance(r, Err):
        logger.error("Embedding failed: %s", r.error)

This loses the structure the whole module has been building.

The production solution folds the Result stream into an immutable report that preserves the counts, samples, and provenance worth carrying forward.

Use this when you need complete, structured failure diagnostics from batch RAG runs.

Outcome:
1. You will aggregate every error into a rich, bounded, serialisable report with one fold.
2. You will group by code/stage/path_prefix and extract retry metadata automatically.
3. You will ship perfect JSON error reports that survive 1 % or 50 % failure rates.

This section formalises exactly what you should review in reporting code: completeness, bounded memory, ordered samples, and purity of the resulting report value.


Concrete Motivating Example

Same 100 000 chunk tree from previous cores:

  • 95 000 embed successfully.
  • 4 800 transient failures → retried and succeeded.
  • 200 genuine failures (invalid content).

Desired final report (JSON):

{
  "total_errs": 200,
  "total_items": 100000,
  "error_rate": 0.002,
  "avg_attempts": 2.41,
  "by_code": {
    "INVALID_CONTENT": {"count": 200, "samples": [...]}
  },
  "by_stage": {
    "embed": {"count": 200, "samples": [...]}
  },
  "by_path_prefix": {
    "2.4": {"count": 180, "samples": [...]}
  }
}

One fold, bounded memory, perfect provenance.


1. Laws & Invariants (machine-checked)

Law Formal Statement Enforcement
Completeness Every Err in a finite stream contributes exactly once to each relevant group’s count; samples are a prefix of encounter order up to max_samples per group. test_report_completeness, test_report_sample_ordering.
Purity Same input stream → identical report (deterministic, no side effects). Reproducibility test.
Bounded Memory Memory ≤ O(#groups × max_samples). test_report_bounded_memory.
Ordering Samples within each group appear in original encounter order. test_report_sample_ordering.

These laws guarantee your reports are complete, reproducible, and memory-safe.


2. Decision Table – Which Report Do You Actually Need?

Need Bounded Memory? Need Samples? Recommended Fold
Just counts by code Yes No fold_error_counts
Full grouped report with samples Yes Yes fold_error_report(max_samples=20)
Everything (including successes) No Yes collect_both (Core 6)

Cap samples aggressively in production — 10–50 per group is plenty for debugging.


3. Public API Surface (end-of-Module-04 refactor note)

Refactor note: error reports live in funcpipe_rag.policies.reports (capstone/src/funcpipe_rag/policies/reports.py) and are re-exported from funcpipe_rag.api.core.

from funcpipe_rag.api.core import ErrGroup, ErrReport, fold_error_counts, fold_error_report, report_to_jsonable

4. Reference Implementations

4.1 Error Normalisation (for grouping)

def _normalize_err(e: Any) -> tuple[str, str, tuple[int, ...]]:
    code = getattr(e, "code", "UNKNOWN")
    stage = getattr(e, "stage", "UNKNOWN")
    path = getattr(e, "path", ())
    # Treat breaker errors as a distinct code/stage for reporting
    if isinstance(e, BreakInfo):
        code = e.code if e.code.startswith("BREAK/") else f"BREAK/{code}"
        stage = "BREAK"
    return code, stage, path

4.2 Group Builder (internal)

@dataclass(slots=True)
class _GroupBuilder(Generic[E]):
    count: int = 0
    samples: list[E] = field(default_factory=list)
    cap: int = 10

    def add(self, e: E) -> None:
        self.count += 1
        if len(self.samples) < self.cap:
            self.samples.append(e)

    def freeze(self) -> ErrGroup[E]:
        return ErrGroup(self.count, tuple(self.samples))

4.3 Full Structured Report

def fold_error_report(
    stream: Iterable[Result[Any, E]],
    *,
    max_samples: int = 10,
    path_depth: int = 3,
) -> ErrReport[E]:
    total_errs = total_items = 0
    by_code: dict[str, _GroupBuilder[E]] = {}
    by_stage: dict[str, _GroupBuilder[E]] = {}
    by_path: dict[tuple[int, ...], _GroupBuilder[E]] = {}
    sum_attempts = sum_delay = 0.0
    cnt_attempts = cnt_delay = 0

    for r in stream:
        total_items += 1
        if isinstance(r, Err):
            total_errs += 1
            e = r.error
            code, stage, path = _normalize_err(e)
            prefix = path[:path_depth]

            by_code.setdefault(code, _GroupBuilder(max_samples)).add(e)
            by_stage.setdefault(stage, _GroupBuilder(max_samples)).add(e)
            by_path.setdefault(prefix, _GroupBuilder(max_samples)).add(e)

            ctx = getattr(e, "ctx", None)
            if isinstance(ctx, Mapping):
                a = ctx.get("attempt")
                d = ctx.get("next_delay_ms")
                if isinstance(a, (int, float)):
                    sum_attempts += a
                    cnt_attempts += 1
                if isinstance(d, (int, float)):
                    sum_delay += d
                    cnt_delay += 1

    return ErrReport(
        total_errs=total_errs,
        total_items=total_items,
        by_code=MappingProxyType({k: v.freeze() for k, v in by_code.items()}),
        by_stage=MappingProxyType({k: v.freeze() for k, v in by_stage.items()}),
        by_path_prefix=MappingProxyType({k: v.freeze() for k, v in by_path.items()}),
        ctx_summary=MappingProxyType({
            "avg_attempts": sum_attempts / cnt_attempts if cnt_attempts else 0.0,
            "avg_next_delay_ms": sum_delay / cnt_delay if cnt_delay else 0.0,
            "error_rate": total_errs / total_items if total_items else 0.0,
        }),
    )

4.4 JSON Serialisation

def report_to_jsonable(report: ErrReport[E]) -> dict[str, Any]:
    def group_to_dict(g: ErrGroup[E]) -> dict[str, Any]:
        return {
            "count": g.count,
            "samples": [asdict(s) if is_dataclass(s) else {"value": s} for s in g.samples],
        }

    return {
        "total_errs": report.total_errs,
        "total_items": report.total_items,
        "error_rate": report.ctx_summary["error_rate"],
        "avg_attempts": report.ctx_summary["avg_attempts"],
        "avg_next_delay_ms": report.ctx_summary["avg_next_delay_ms"],
        "by_code": {k: group_to_dict(v) for k, v in report.by_code.items()},
        "by_stage": {k: group_to_dict(v) for k, v in report.by_stage.items()},
        "by_path_prefix": { ".".join(map(str, k)): group_to_dict(v) for k, v in report.by_path_prefix.items()},
    }

4.5 Idiomatic RAG Usage

# The stream is single-pass; split with tee for reporting + indexing
from itertools import tee

report_stream, index_stream = tee(embedded)

report = fold_error_report(report_stream, max_samples=20)

if report.total_errs > 0:
    logger.error("RAG embedding failures:\n%s", 
                 json.dumps(report_to_jsonable(report), indent=2))
    send_to_monitoring(report_to_jsonable(report))
else:
    logger.info("Embedding succeeded for all %d chunks", report.total_items)

index_chunks(filter_ok(index_stream))

5. Property-Based Proofs (capstone/tests/test_reports.py)

@given(items=st.lists(st.integers()))
def test_report_completeness(items):
    def f(x: int) -> Result[int, ErrInfo]:
        return Err(make_errinfo(f"C{x}", f"msg{x}", "stage", (x,))) if x % 2 else Ok(x)
    report = fold_error_report(map_result_iter(f, items))
    assert report.total_errs == sum(1 for x in items if x % 2)
    assert report.total_items == len(items)

@given(items=st.lists(st.integers(), unique=True))
def test_report_sample_ordering(items):
    def f(x: int) -> Result[int, ErrInfo]:
        # mark odd values as errors
        return Err(make_errinfo("ERR", f"msg{x}", "s", (x,))) if x % 2 != 0 else Ok(x)

    report = fold_error_report(map_result_iter(f, items))
    group = report.by_code.get("ERR")
    # Vacuous truth: if there are no ERR samples, ordering is trivially satisfied
    if group is None or not group.samples:
        return

    samples = group.samples
    sample_xs = [int(s.msg[3:]) for s in samples]  # "msg{X}" -> X
    positions = [
        next(i for i, x in enumerate(items) if x == sx and x % 2 != 0)
        for sx in sample_xs
    ]
    # Encounter order: positions must be strictly increasing
    assert positions == sorted(positions)

@given(items=st.lists(st.integers()))
def test_report_bounded_memory(items):
    report = fold_error_report(map_result_iter(lambda x: Err("E"), items), max_samples=10)
    assert all(len(g.samples) <= 10 for g in report.by_code.values())

6. Big-O & Allocation Guarantees

Variant Time Space Notes
fold_error_report O(N) O(#groups × max_samples) Bounded by max_samples

7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Ad-hoc logging Scattered diagnostics Use fold_error_report
Unbounded sample collection Memory blowup Cap with max_samples
No grouping Hard analysis Group by code/stage/path_prefix
Ignoring retry metadata Lost insights Aggregate from ErrInfo.ctx

8. Pre-Core Quiz

  1. fold_error_report for…? → Rich grouped diagnostics
  2. max_samples for…? → Bound memory per group
  3. Report includes…? → Counts, samples, ctx_summary
  4. Law for reports? → Completeness (every Err counted)
  5. Use after…? → Breakers/retries for final outcomes

9. Post-Core Exercise

  1. Apply fold_error_report to a real embedding run → inspect JSON.
  2. Add custom grouping (e.g. by ctx["attempt"]) → test.
  3. Replace all ad-hoc error logging with structured reports.
  4. Send report to monitoring on non-zero errors.

You have completed Module 04.

You can now process real-world data at scale with:

  • Zero recursion blowups
  • Zero cache misses on duplicates
  • Zero lost records on failures
  • Zero resource leaks
  • Zero wasted work on doomed runs
  • Clear, structured error reports

This is production-grade functional data processing in Python — the kind that ships to millions of documents and survives anything the real world throws at it.

On to Module 05: Advanced Type-Driven Design.