Structured Error Reports¶
Page Maps¶
graph LR
family["Python Programming"]
program["Python Functional Programming"]
section["Streaming Resilience Failure Handling"]
page["Structured Error Reports"]
capstone["Capstone evidence"]
family --> program --> section --> page
page -.applies in.-> capstone
flowchart LR
orient["Orient on the page map"] --> read["Read the main claim and examples"]
read --> inspect["Inspect the related code, proof, or capstone surface"]
inspect --> verify["Run or review the verification path"]
verify --> apply["Apply the idea back to the module and capstone"]
This page is the payoff for the whole module. After making failures explicit, bounded, retryable, and resource-safe, the next move is turning that work into diagnostics that a team can actually read and act on.
Start With the Reporting Gap¶
Without a structured report, the earlier resilience work still tends to collapse back into noisy logs and scattered anecdotes. Reporting is the final functional reduction over the whole failure story.
- If the only output is raw log lines, counts and patterns are hard to trust.
- If sample errors are unbounded, the report becomes another memory risk.
- If grouping rules are implicit, you cannot explain why the report is actionable rather than just verbose.
Core question:
How do you turn every error in a streaming pipeline into structured, serialisable, grouped reports — complete with counts, ordered samples, and retry metadata — while keeping the pipeline pure and memory-bounded?
This lesson introduces error reports as a bounded summary fold over the failure stream:
- group failures by the dimensions the team actually uses to debug runs
- keep bounded ordered samples so reports stay useful without exploding in size
- produce serializable data structures that can be stored, diffed, and surfaced outside the running process
The motivating question is straightforward and important: once failures are captured well, how do we turn them into something actionable instead of noisy?
The naïve solution is scattered logging:
This loses the structure the whole module has been building.
The production solution folds the Result stream into an immutable report that preserves the counts, samples, and provenance worth carrying forward.
Use this when you need complete, structured failure diagnostics from batch RAG runs.
Outcome:
1. You will aggregate every error into a rich, bounded, serialisable report with one fold.
2. You will group by code/stage/path_prefix and extract retry metadata automatically.
3. You will ship perfect JSON error reports that survive 1 % or 50 % failure rates.
This section formalises exactly what you should review in reporting code: completeness, bounded memory, ordered samples, and purity of the resulting report value.
Concrete Motivating Example¶
Same 100 000 chunk tree from previous cores:
- 95 000 embed successfully.
- 4 800 transient failures → retried and succeeded.
- 200 genuine failures (invalid content).
Desired final report (JSON):
{
"total_errs": 200,
"total_items": 100000,
"error_rate": 0.002,
"avg_attempts": 2.41,
"by_code": {
"INVALID_CONTENT": {"count": 200, "samples": [...]}
},
"by_stage": {
"embed": {"count": 200, "samples": [...]}
},
"by_path_prefix": {
"2.4": {"count": 180, "samples": [...]}
}
}
One fold, bounded memory, perfect provenance.
1. Laws & Invariants (machine-checked)¶
| Law | Formal Statement | Enforcement |
|---|---|---|
| Completeness | Every Err in a finite stream contributes exactly once to each relevant group’s count; samples are a prefix of encounter order up to max_samples per group. |
test_report_completeness, test_report_sample_ordering. |
| Purity | Same input stream → identical report (deterministic, no side effects). | Reproducibility test. |
| Bounded Memory | Memory ≤ O(#groups × max_samples). | test_report_bounded_memory. |
| Ordering | Samples within each group appear in original encounter order. | test_report_sample_ordering. |
These laws guarantee your reports are complete, reproducible, and memory-safe.
2. Decision Table – Which Report Do You Actually Need?¶
| Need | Bounded Memory? | Need Samples? | Recommended Fold |
|---|---|---|---|
| Just counts by code | Yes | No | fold_error_counts |
| Full grouped report with samples | Yes | Yes | fold_error_report(max_samples=20) |
| Everything (including successes) | No | Yes | collect_both (Core 6) |
Cap samples aggressively in production — 10–50 per group is plenty for debugging.
3. Public API Surface (end-of-Module-04 refactor note)¶
Refactor note: error reports live in funcpipe_rag.policies.reports (capstone/src/funcpipe_rag/policies/reports.py) and are re-exported from funcpipe_rag.api.core.
from funcpipe_rag.api.core import ErrGroup, ErrReport, fold_error_counts, fold_error_report, report_to_jsonable
4. Reference Implementations¶
4.1 Error Normalisation (for grouping)¶
def _normalize_err(e: Any) -> tuple[str, str, tuple[int, ...]]:
code = getattr(e, "code", "UNKNOWN")
stage = getattr(e, "stage", "UNKNOWN")
path = getattr(e, "path", ())
# Treat breaker errors as a distinct code/stage for reporting
if isinstance(e, BreakInfo):
code = e.code if e.code.startswith("BREAK/") else f"BREAK/{code}"
stage = "BREAK"
return code, stage, path
4.2 Group Builder (internal)¶
@dataclass(slots=True)
class _GroupBuilder(Generic[E]):
count: int = 0
samples: list[E] = field(default_factory=list)
cap: int = 10
def add(self, e: E) -> None:
self.count += 1
if len(self.samples) < self.cap:
self.samples.append(e)
def freeze(self) -> ErrGroup[E]:
return ErrGroup(self.count, tuple(self.samples))
4.3 Full Structured Report¶
def fold_error_report(
stream: Iterable[Result[Any, E]],
*,
max_samples: int = 10,
path_depth: int = 3,
) -> ErrReport[E]:
total_errs = total_items = 0
by_code: dict[str, _GroupBuilder[E]] = {}
by_stage: dict[str, _GroupBuilder[E]] = {}
by_path: dict[tuple[int, ...], _GroupBuilder[E]] = {}
sum_attempts = sum_delay = 0.0
cnt_attempts = cnt_delay = 0
for r in stream:
total_items += 1
if isinstance(r, Err):
total_errs += 1
e = r.error
code, stage, path = _normalize_err(e)
prefix = path[:path_depth]
by_code.setdefault(code, _GroupBuilder(max_samples)).add(e)
by_stage.setdefault(stage, _GroupBuilder(max_samples)).add(e)
by_path.setdefault(prefix, _GroupBuilder(max_samples)).add(e)
ctx = getattr(e, "ctx", None)
if isinstance(ctx, Mapping):
a = ctx.get("attempt")
d = ctx.get("next_delay_ms")
if isinstance(a, (int, float)):
sum_attempts += a
cnt_attempts += 1
if isinstance(d, (int, float)):
sum_delay += d
cnt_delay += 1
return ErrReport(
total_errs=total_errs,
total_items=total_items,
by_code=MappingProxyType({k: v.freeze() for k, v in by_code.items()}),
by_stage=MappingProxyType({k: v.freeze() for k, v in by_stage.items()}),
by_path_prefix=MappingProxyType({k: v.freeze() for k, v in by_path.items()}),
ctx_summary=MappingProxyType({
"avg_attempts": sum_attempts / cnt_attempts if cnt_attempts else 0.0,
"avg_next_delay_ms": sum_delay / cnt_delay if cnt_delay else 0.0,
"error_rate": total_errs / total_items if total_items else 0.0,
}),
)
4.4 JSON Serialisation¶
def report_to_jsonable(report: ErrReport[E]) -> dict[str, Any]:
def group_to_dict(g: ErrGroup[E]) -> dict[str, Any]:
return {
"count": g.count,
"samples": [asdict(s) if is_dataclass(s) else {"value": s} for s in g.samples],
}
return {
"total_errs": report.total_errs,
"total_items": report.total_items,
"error_rate": report.ctx_summary["error_rate"],
"avg_attempts": report.ctx_summary["avg_attempts"],
"avg_next_delay_ms": report.ctx_summary["avg_next_delay_ms"],
"by_code": {k: group_to_dict(v) for k, v in report.by_code.items()},
"by_stage": {k: group_to_dict(v) for k, v in report.by_stage.items()},
"by_path_prefix": { ".".join(map(str, k)): group_to_dict(v) for k, v in report.by_path_prefix.items()},
}
4.5 Idiomatic RAG Usage¶
# The stream is single-pass; split with tee for reporting + indexing
from itertools import tee
report_stream, index_stream = tee(embedded)
report = fold_error_report(report_stream, max_samples=20)
if report.total_errs > 0:
logger.error("RAG embedding failures:\n%s",
json.dumps(report_to_jsonable(report), indent=2))
send_to_monitoring(report_to_jsonable(report))
else:
logger.info("Embedding succeeded for all %d chunks", report.total_items)
index_chunks(filter_ok(index_stream))
5. Property-Based Proofs (capstone/tests/test_reports.py)¶
@given(items=st.lists(st.integers()))
def test_report_completeness(items):
def f(x: int) -> Result[int, ErrInfo]:
return Err(make_errinfo(f"C{x}", f"msg{x}", "stage", (x,))) if x % 2 else Ok(x)
report = fold_error_report(map_result_iter(f, items))
assert report.total_errs == sum(1 for x in items if x % 2)
assert report.total_items == len(items)
@given(items=st.lists(st.integers(), unique=True))
def test_report_sample_ordering(items):
def f(x: int) -> Result[int, ErrInfo]:
# mark odd values as errors
return Err(make_errinfo("ERR", f"msg{x}", "s", (x,))) if x % 2 != 0 else Ok(x)
report = fold_error_report(map_result_iter(f, items))
group = report.by_code.get("ERR")
# Vacuous truth: if there are no ERR samples, ordering is trivially satisfied
if group is None or not group.samples:
return
samples = group.samples
sample_xs = [int(s.msg[3:]) for s in samples] # "msg{X}" -> X
positions = [
next(i for i, x in enumerate(items) if x == sx and x % 2 != 0)
for sx in sample_xs
]
# Encounter order: positions must be strictly increasing
assert positions == sorted(positions)
@given(items=st.lists(st.integers()))
def test_report_bounded_memory(items):
report = fold_error_report(map_result_iter(lambda x: Err("E"), items), max_samples=10)
assert all(len(g.samples) <= 10 for g in report.by_code.values())
6. Big-O & Allocation Guarantees¶
| Variant | Time | Space | Notes |
|---|---|---|---|
| fold_error_report | O(N) | O(#groups × max_samples) | Bounded by max_samples |
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Ad-hoc logging | Scattered diagnostics | Use fold_error_report |
| Unbounded sample collection | Memory blowup | Cap with max_samples |
| No grouping | Hard analysis | Group by code/stage/path_prefix |
| Ignoring retry metadata | Lost insights | Aggregate from ErrInfo.ctx |
8. Pre-Core Quiz¶
fold_error_reportfor…? → Rich grouped diagnosticsmax_samplesfor…? → Bound memory per group- Report includes…? → Counts, samples, ctx_summary
- Law for reports? → Completeness (every Err counted)
- Use after…? → Breakers/retries for final outcomes
9. Post-Core Exercise¶
- Apply
fold_error_reportto a real embedding run → inspect JSON. - Add custom grouping (e.g. by
ctx["attempt"]) → test. - Replace all ad-hoc error logging with structured reports.
- Send report to monitoring on non-zero errors.
You have completed Module 04.
You can now process real-world data at scale with:
- Zero recursion blowups
- Zero cache misses on duplicates
- Zero lost records on failures
- Zero resource leaks
- Zero wasted work on doomed runs
- Clear, structured error reports
This is production-grade functional data processing in Python — the kind that ships to millions of documents and survives anything the real world throws at it.
On to Module 05: Advanced Type-Driven Design.