Functional Retries¶
Concept Position¶
flowchart TD
family["Python Programming"] --> program["Python Functional Programming"]
program --> module["Module 04: Streaming Resilience and Failure Handling"]
module --> concept["Functional Retries"]
concept --> capstone["Capstone pressure point"]
flowchart TD
problem["Start with the design or failure question"] --> example["Study the worked example and trade-offs"]
example --> boundary["Name the boundary this page is trying to protect"]
boundary --> proof["Carry that question into code review or the capstone"]
Read the first diagram as a placement map: this page is one concept inside its parent module, not a detached essay, and the capstone is the pressure test for whether the idea holds. Read the second diagram as the working rhythm for the page: name the problem, study the example, identify the boundary, then carry one review question forward.
Progression Note¶
By the end of Module 4, you will master safe recursion over unpredictable tree-shaped data, monoidal folds as the universal recursion pattern, Result/Option for streaming error handling, validation aggregators, retries, and structured error reporting — all while preserving laziness, equational reasoning, and constant call-stack usage.
Here's a snippet from the progression map:
| Module | Focus | Key Outcomes |
|---|---|---|
| 3 | Lazy Iteration & Generators | Memory-efficient streaming, itertools mastery, short-circuiting, observability |
| 4 | Safe Recursion & Error Handling in Streams | Stack-safe tree recursion, folds, Result/Option, streaming validation/retries/reports |
| 5 | Advanced Type-Driven Design | ADTs, exhaustive pattern matching, total functions, refined types |
Core question:
How do you implement pure, bounded, fair retries over aResultstream using policies as ordinary data — guaranteeing termination, no side effects, and perfect composability with breakers and resource managers?
We now take the Iterator[Result[Chunk, ErrInfo]] stream from M04C05–C08 and face the final reliability question:
“My embedding calls are flaky — network timeouts, rate limits, transient GPU OOM. I want to retry each failing chunk a few times with exponential backoff, but I refuse to block the whole pipeline or leak resources on abort.”
The naïve solution is a manual retry loop:
for chunk in chunks:
r = None
for attempt in range(5):
r = safe_embed(chunk) # returns Result
if isinstance(r, Ok):
embedded.append(r.value)
break
time.sleep(2 ** attempt) # blocks everything
if isinstance(r, Err):
embedded.append(fallback_chunk(chunk))
This blocks the entire stream on one slow chunk, leaks resources on early breaker termination, and is duplicated everywhere.
The production solution uses a pure, lazy retry combinator that treats retry policy as ordinary data and executes as a fair, bounded loop over the Result stream.
Audience: Engineers who call flaky external services (embedding APIs, vector DBs, OCR) inside RAG pipelines and need per-chunk resilience without sacrificing throughput or resource safety.
Outcome:
1. You will define retry policies as pure data and apply them with a single combinator that works on any Result stream.
2. You will get bounded, fair retries with full provenance on final errors.
3. You will ship a RAG pipeline that automatically retries transient failures while respecting breakers and resource cleanup.
We formalise exactly what we want from correct, production-ready retries: bounded execution, fairness, purity, bounded completion semantics, and seamless composition.
Concrete Motivating Example¶
Same 100 000 chunk tree from previous cores:
- 95 000 embed successfully on first try.
- 4 800 hit transient network timeout → succeed on retry 2–3.
- 200 are genuine failures (invalid content).
Desired behaviour:
embedded = retry_map_iter(
safe_embed, # returns Result[Chunk, ErrInfo]
chunks_with_path,
classifier=is_transient_err,
policy=exp_policy(total_attempts=5, base_ms=100, cap_ms=5000),
inflight_cap=128,
)
# → Iterator[Result[Chunk, ErrInfo]]
# Retries happen fairly; total work ≈ 100k + ~10k retries
# Final Errs annotated with attempt count, next_delay_ms, etc.
1. Laws & Invariants (machine-checked)¶
| Law | Formal Statement | Enforcement |
|---|---|---|
| Bounded Execution | At most max_attempts calls per input item (engine cap overrides policy). |
test_bounded_attempts, test_engine_cap_overrides_policy. |
| Purity | Deterministic on (inputs, policy); no side effects, no sleeps, no mutation. | Reproducibility + no global state. |
| Fairness | No item is starved; progress guaranteed within inflight_cap window (round-robin priming). |
test_fairness_interleaving. |
| Completion | All items eventually complete under bounded retries; completion order is implementation-defined (fair within window). | test_retry_completion. |
| Provenance | Final Err annotated with attempt, max_attempts, policy, next_delay_ms when E supports it. |
test_final_err_annotation. |
These laws guarantee retries are safe, observable, and composable.
2. Decision Table – Which Policy Do You Actually Use?¶
| Failure Pattern | Need Backoff? | Need Jitter? | Recommended Policy |
|---|---|---|---|
| Simple transient (network blip) | No | No | fixed_policy(3) |
| Rate-limited API | Yes | Optional | exp_policy(7, base_ms=200, cap_ms=30000) |
| Very flaky service | Yes | Yes | Custom policy with jitter |
| Custom logic (e.g. retry only 5xx) | – | – | User-defined Policy |
Always combine with engine max_attempts cap and Core 7 breakers for global safety.
3. Public API Surface (end-of-Module-04 refactor note)¶
Refactor note: retries live in funcpipe_rag.policies.retries (capstone/src/funcpipe_rag/policies/retries.py) and are re-exported from funcpipe_rag.api.core.
from funcpipe_rag.api.core import (
RetryCtx,
RetryDecision,
exp_policy,
fixed_policy,
is_retriable_errinfo,
restore_input_order,
retry_map_iter,
)
4. Reference Implementations¶
4.1 Core Retry Engine (fair, bounded, pure)¶
from collections import deque
def _annotate_err(
e: E,
*,
attempt: int,
max_attempts: int,
policy: str,
next_delay_ms: int | None = None,
) -> E:
"""Annotate error if it supports _replace and ctx (ErrInfo does)."""
if hasattr(e, "_replace") and hasattr(e, "ctx"):
ctx = dict(e.ctx) if e.ctx else {}
ctx.update({
"attempt": attempt,
"max_attempts": max_attempts,
"policy": policy,
})
if next_delay_ms is not None:
ctx["next_delay_ms"] = next_delay_ms
e = e._replace(ctx=MappingProxyType(ctx)) # type: ignore
return e
def retry_map_iter(
fn: Callable[[X], Result[Y, E]],
xs: Iterable[X],
*,
classifier: Classifier,
policy: Policy,
stage: str,
key_path: Callable[[X], tuple[int, ...]] | None = None,
max_attempts: int = 10,
policy_name: str | None = None,
inflight_cap: int = 64,
) -> Iterator[Result[Y, E]]:
"""Pure, fair, bounded retry over a Result-returning fn."""
if max_attempts < 1:
raise ValueError("max_attempts >= 1")
if inflight_cap < 1:
raise ValueError("inflight_cap >= 1")
name = policy_name or getattr(policy, "__name__", "anonymous")
it = iter(xs)
work: deque[tuple[X, int]] = deque() # (item, attempt)
def prime() -> None:
while len(work) < inflight_cap:
try:
work.append((next(it), 1))
except StopIteration:
break
prime()
while work:
x, attempt = work.popleft()
r = fn(x)
if isinstance(r, Ok):
yield r
prime()
continue
e = r.error
if not classifier(e):
yield Err(_annotate_err(e, attempt=attempt, max_attempts=max_attempts, policy=name))
prime()
continue
p = key_path(x) if key_path is not None else ()
ctx = RetryCtx(item=x, attempt=attempt, error=e, stage=stage, path=p, policy_name=name)
try:
dec = policy(ctx)
except Exception as pe:
dec = RetryDecision(retry=False, next_delay_ms=None)
if dec.retry and attempt < max_attempts:
work.append((x, attempt + 1))
else:
yield Err(_annotate_err(
e,
attempt=attempt,
max_attempts=max_attempts,
policy=name,
next_delay_ms=dec.next_delay_ms,
))
prime()
4.2 Policies (pure data → decision)¶
def fixed_policy(total_attempts: int) -> Policy:
def p(ctx: RetryCtx[Any, Any]) -> RetryDecision:
return RetryDecision(retry=ctx.attempt < total_attempts, next_delay_ms=None)
p.__name__ = f"fixed_policy[{total_attempts}]"
return p
def exp_policy(total_attempts: int, base_ms: int, cap_ms: int) -> Policy:
def p(ctx: RetryCtx[Any, Any]) -> RetryDecision:
delay = min(cap_ms, base_ms * (2 ** (ctx.attempt - 1)))
return RetryDecision(retry=ctx.attempt < total_attempts, next_delay_ms=delay)
p.__name__ = f"exp_policy[{total_attempts},{base_ms},{cap_ms}]"
return p
4.3 Default Classifier¶
def is_retriable_errinfo(e: Any) -> bool:
code = getattr(e, "code", None)
return code in {"RATE_LIMIT", "TIMEOUT", "CONN_RESET", "EMBED/UNAVAILABLE", "TRANSIENT"}
4.4 Resequencer (restore input order when needed)¶
def restore_input_order(
tagged: Iterable[tuple[int, Result[Y, E]]],
) -> Iterator[Result[Y, E]]:
"""Restore input order from (idx, result) pairs. Assumes indices are 0-based consecutive integers."""
buffer: dict[int, Result[Y, E]] = {}
expect = 0
for idx, r in tagged:
buffer[idx] = r
while expect in buffer:
yield buffer.pop(expect)
expect += 1
4.5 Idiomatic RAG Usage¶
embedded = retry_map_iter(
safe_embed, # returns Result[Chunk, ErrInfo]
chunks_with_path,
classifier=is_retriable_errinfo,
policy=exp_policy(total_attempts=5, base_ms=100, cap_ms=10000),
stage="embed",
key_path=lambda cp: cp[1],
inflight_cap=128,
max_attempts=10, # hard engine cap
)
# Optional: restore input order if downstream requires it
embedded = restore_input_order(enumerate(embedded))
for r in circuit_breaker_rate_emit(embedded, max_rate=0.2):
if isinstance(r, Err) and isinstance(r.error, BreakInfo):
report_circuit_break(r.error)
break
process(r)
5. Property-Based Proofs (capstone/tests/test_retries.py)¶
from hypothesis import given, strategies as st
from collections import defaultdict
@given(items=st.lists(st.integers()))
def test_bounded_attempts(items):
attempts = defaultdict(int)
def fn(x: int) -> Result[int, str]:
attempts[x] += 1
return Ok(x) if attempts[x] >= 3 else Err("TRANSIENT")
out = list(retry_map_iter(
fn, items,
classifier=lambda e: e == "TRANSIENT",
policy=fixed_policy(5),
stage="test",
max_attempts=10,
))
assert all(a <= 5 for a in attempts.values())
@given()
def test_engine_cap_overrides_policy():
attempts = [0]
def fn(_):
attempts[0] += 1
return Err("TRANSIENT")
def always_retry(_): return RetryDecision(True, None)
out = list(retry_map_iter(
fn, [0],
classifier=lambda _: True,
policy=always_retry,
stage="test",
max_attempts=4,
))
assert attempts[0] == 4
@given(items=st.lists(st.integers(), min_size=10))
def test_fairness_interleaving(items):
attempts = defaultdict(int)
def fn(x: int):
attempts[x] += 1
return Ok(x) if attempts[x] >= 2 else Err("TRANSIENT")
out = list(retry_map_iter(
fn, items,
classifier=lambda _: True,
policy=fixed_policy(3),
stage="test",
inflight_cap=4,
))
# Every item gets at least one chance before any gets a third
assert max(attempts.values()) <= min(attempts.values()) + 1
@given(items=st.lists(st.integers()))
def test_retry_completion(items):
tagged = list(enumerate(items))
attempts = defaultdict(int)
def fn(iv: tuple[int, int]):
i, v = iv
attempts[i] += 1
needed = (v % 5) + 1
return Ok(iv) if attempts[i] >= needed else Err("TRANSIENT")
results = list(retry_map_iter(
fn, tagged,
classifier=lambda _: True,
policy=fixed_policy(10),
stage="test",
inflight_cap=32,
))
# All items eventually complete
assert len(results) == len(items)
@given(items=st.lists(st.integers()))
def test_final_err_annotation(items):
def fn(x: int) -> Result[int, str]:
return Err("TRANSIENT")
out = list(retry_map_iter(
fn, items,
classifier=lambda _: True,
policy=fixed_policy(3),
stage="test",
max_attempts=5,
))
for r in out:
assert isinstance(r, Err)
e = r.error
assert e == "TRANSIENT" # annotation skipped for str errors
6. Big-O & Allocation Guarantees¶
| Variant | Time | Heap | Laziness |
|---|---|---|---|
| retry_map_iter | O(N × max_attempts) worst-case | O(inflight_cap) | Yes |
Bounded by max_attempts and inflight_cap; pure and lazy.
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Infinite retries | Non-termination | Always bound via policy + engine cap |
| Blocking sleep in retry | Pipeline stalls | Policy returns delay only; schedule async |
| Head-of-line blocking | One slow item delays all | Bounded inflight_cap + fair priming |
| Mutable retry state | Nondeterminism | Policy as pure data |
8. Pre-Core Quiz¶
retry_map_iterfor…? → Fair bounded retries on Result streamfixed_policyfor…? → Fixed attempt countinflight_capfor…? → Fairness / prevent starvation- Final
Errcontains…? → retry metadata when the error type supports it (e.g. ErrInfo.ctx) - Order semantics…? → Implementation-defined completion order; resequence if input order is required
9. Post-Core Exercise¶
- Apply
retry_map_iterto a flaky embedder → verify bounded attempts + fairness. - Write a jitter policy → test
next_delay_msvariance. - Refactor an imperative retry loop →
retry_map_iter. - Combine with breaker → confirm rate reflects final outcomes after retries.
Continue with: Structured Error Reports
You now have the complete toolkit to make any flaky operation resilient — pure, bounded, fair, and composable with every previous core. The final core is about turning all those errors into beautiful reports.