Skip to content

Functional Retries

Concept Position

flowchart TD
  family["Python Programming"] --> program["Python Functional Programming"]
  program --> module["Module 04: Streaming Resilience and Failure Handling"]
  module --> concept["Functional Retries"]
  concept --> capstone["Capstone pressure point"]
flowchart TD
  problem["Start with the design or failure question"] --> example["Study the worked example and trade-offs"]
  example --> boundary["Name the boundary this page is trying to protect"]
  boundary --> proof["Carry that question into code review or the capstone"]

Read the first diagram as a placement map: this page is one concept inside its parent module, not a detached essay, and the capstone is the pressure test for whether the idea holds. Read the second diagram as the working rhythm for the page: name the problem, study the example, identify the boundary, then carry one review question forward.

Progression Note

By the end of Module 4, you will master safe recursion over unpredictable tree-shaped data, monoidal folds as the universal recursion pattern, Result/Option for streaming error handling, validation aggregators, retries, and structured error reporting — all while preserving laziness, equational reasoning, and constant call-stack usage.

Here's a snippet from the progression map:

Module Focus Key Outcomes
3 Lazy Iteration & Generators Memory-efficient streaming, itertools mastery, short-circuiting, observability
4 Safe Recursion & Error Handling in Streams Stack-safe tree recursion, folds, Result/Option, streaming validation/retries/reports
5 Advanced Type-Driven Design ADTs, exhaustive pattern matching, total functions, refined types

Core question:
How do you implement pure, bounded, fair retries over a Result stream using policies as ordinary data — guaranteeing termination, no side effects, and perfect composability with breakers and resource managers?

We now take the Iterator[Result[Chunk, ErrInfo]] stream from M04C05–C08 and face the final reliability question:

“My embedding calls are flaky — network timeouts, rate limits, transient GPU OOM. I want to retry each failing chunk a few times with exponential backoff, but I refuse to block the whole pipeline or leak resources on abort.”

The naïve solution is a manual retry loop:

for chunk in chunks:
    r = None
    for attempt in range(5):
        r = safe_embed(chunk)   # returns Result
        if isinstance(r, Ok):
            embedded.append(r.value)
            break
        time.sleep(2 ** attempt)   # blocks everything
    if isinstance(r, Err):
        embedded.append(fallback_chunk(chunk))

This blocks the entire stream on one slow chunk, leaks resources on early breaker termination, and is duplicated everywhere.

The production solution uses a pure, lazy retry combinator that treats retry policy as ordinary data and executes as a fair, bounded loop over the Result stream.

Audience: Engineers who call flaky external services (embedding APIs, vector DBs, OCR) inside RAG pipelines and need per-chunk resilience without sacrificing throughput or resource safety.

Outcome:
1. You will define retry policies as pure data and apply them with a single combinator that works on any Result stream.
2. You will get bounded, fair retries with full provenance on final errors.
3. You will ship a RAG pipeline that automatically retries transient failures while respecting breakers and resource cleanup.

We formalise exactly what we want from correct, production-ready retries: bounded execution, fairness, purity, bounded completion semantics, and seamless composition.


Concrete Motivating Example

Same 100 000 chunk tree from previous cores:

  • 95 000 embed successfully on first try.
  • 4 800 hit transient network timeout → succeed on retry 2–3.
  • 200 are genuine failures (invalid content).

Desired behaviour:

embedded = retry_map_iter(
    safe_embed,                  # returns Result[Chunk, ErrInfo]
    chunks_with_path,
    classifier=is_transient_err,
    policy=exp_policy(total_attempts=5, base_ms=100, cap_ms=5000),
    inflight_cap=128,
)

# → Iterator[Result[Chunk, ErrInfo]]
# Retries happen fairly; total work ≈ 100k + ~10k retries
# Final Errs annotated with attempt count, next_delay_ms, etc.

1. Laws & Invariants (machine-checked)

Law Formal Statement Enforcement
Bounded Execution At most max_attempts calls per input item (engine cap overrides policy). test_bounded_attempts, test_engine_cap_overrides_policy.
Purity Deterministic on (inputs, policy); no side effects, no sleeps, no mutation. Reproducibility + no global state.
Fairness No item is starved; progress guaranteed within inflight_cap window (round-robin priming). test_fairness_interleaving.
Completion All items eventually complete under bounded retries; completion order is implementation-defined (fair within window). test_retry_completion.
Provenance Final Err annotated with attempt, max_attempts, policy, next_delay_ms when E supports it. test_final_err_annotation.

These laws guarantee retries are safe, observable, and composable.


2. Decision Table – Which Policy Do You Actually Use?

Failure Pattern Need Backoff? Need Jitter? Recommended Policy
Simple transient (network blip) No No fixed_policy(3)
Rate-limited API Yes Optional exp_policy(7, base_ms=200, cap_ms=30000)
Very flaky service Yes Yes Custom policy with jitter
Custom logic (e.g. retry only 5xx) User-defined Policy

Always combine with engine max_attempts cap and Core 7 breakers for global safety.


3. Public API Surface (end-of-Module-04 refactor note)

Refactor note: retries live in funcpipe_rag.policies.retries (capstone/src/funcpipe_rag/policies/retries.py) and are re-exported from funcpipe_rag.api.core.

from funcpipe_rag.api.core import (
    RetryCtx,
    RetryDecision,
    exp_policy,
    fixed_policy,
    is_retriable_errinfo,
    restore_input_order,
    retry_map_iter,
)

4. Reference Implementations

4.1 Core Retry Engine (fair, bounded, pure)

from collections import deque

def _annotate_err(
    e: E,
    *,
    attempt: int,
    max_attempts: int,
    policy: str,
    next_delay_ms: int | None = None,
) -> E:
    """Annotate error if it supports _replace and ctx (ErrInfo does)."""
    if hasattr(e, "_replace") and hasattr(e, "ctx"):
        ctx = dict(e.ctx) if e.ctx else {}
        ctx.update({
            "attempt": attempt,
            "max_attempts": max_attempts,
            "policy": policy,
        })
        if next_delay_ms is not None:
            ctx["next_delay_ms"] = next_delay_ms
        e = e._replace(ctx=MappingProxyType(ctx))  # type: ignore
    return e

def retry_map_iter(
    fn: Callable[[X], Result[Y, E]],
    xs: Iterable[X],
    *,
    classifier: Classifier,
    policy: Policy,
    stage: str,
    key_path: Callable[[X], tuple[int, ...]] | None = None,
    max_attempts: int = 10,
    policy_name: str | None = None,
    inflight_cap: int = 64,
) -> Iterator[Result[Y, E]]:
    """Pure, fair, bounded retry over a Result-returning fn."""
    if max_attempts < 1:
        raise ValueError("max_attempts >= 1")
    if inflight_cap < 1:
        raise ValueError("inflight_cap >= 1")

    name = policy_name or getattr(policy, "__name__", "anonymous")
    it = iter(xs)
    work: deque[tuple[X, int]] = deque()  # (item, attempt)

    def prime() -> None:
        while len(work) < inflight_cap:
            try:
                work.append((next(it), 1))
            except StopIteration:
                break

    prime()

    while work:
        x, attempt = work.popleft()
        r = fn(x)

        if isinstance(r, Ok):
            yield r
            prime()
            continue

        e = r.error
        if not classifier(e):
            yield Err(_annotate_err(e, attempt=attempt, max_attempts=max_attempts, policy=name))
            prime()
            continue

        p = key_path(x) if key_path is not None else ()
        ctx = RetryCtx(item=x, attempt=attempt, error=e, stage=stage, path=p, policy_name=name)
        try:
            dec = policy(ctx)
        except Exception as pe:
            dec = RetryDecision(retry=False, next_delay_ms=None)

        if dec.retry and attempt < max_attempts:
            work.append((x, attempt + 1))
        else:
            yield Err(_annotate_err(
                e,
                attempt=attempt,
                max_attempts=max_attempts,
                policy=name,
                next_delay_ms=dec.next_delay_ms,
            ))
        prime()

4.2 Policies (pure data → decision)

def fixed_policy(total_attempts: int) -> Policy:
    def p(ctx: RetryCtx[Any, Any]) -> RetryDecision:
        return RetryDecision(retry=ctx.attempt < total_attempts, next_delay_ms=None)
    p.__name__ = f"fixed_policy[{total_attempts}]"
    return p

def exp_policy(total_attempts: int, base_ms: int, cap_ms: int) -> Policy:
    def p(ctx: RetryCtx[Any, Any]) -> RetryDecision:
        delay = min(cap_ms, base_ms * (2 ** (ctx.attempt - 1)))
        return RetryDecision(retry=ctx.attempt < total_attempts, next_delay_ms=delay)
    p.__name__ = f"exp_policy[{total_attempts},{base_ms},{cap_ms}]"
    return p

4.3 Default Classifier

def is_retriable_errinfo(e: Any) -> bool:
    code = getattr(e, "code", None)
    return code in {"RATE_LIMIT", "TIMEOUT", "CONN_RESET", "EMBED/UNAVAILABLE", "TRANSIENT"}

4.4 Resequencer (restore input order when needed)

def restore_input_order(
    tagged: Iterable[tuple[int, Result[Y, E]]],
) -> Iterator[Result[Y, E]]:
    """Restore input order from (idx, result) pairs. Assumes indices are 0-based consecutive integers."""
    buffer: dict[int, Result[Y, E]] = {}
    expect = 0
    for idx, r in tagged:
        buffer[idx] = r
        while expect in buffer:
            yield buffer.pop(expect)
            expect += 1

4.5 Idiomatic RAG Usage

embedded = retry_map_iter(
    safe_embed,                  # returns Result[Chunk, ErrInfo]
    chunks_with_path,
    classifier=is_retriable_errinfo,
    policy=exp_policy(total_attempts=5, base_ms=100, cap_ms=10000),
    stage="embed",
    key_path=lambda cp: cp[1],
    inflight_cap=128,
    max_attempts=10,   # hard engine cap
)

# Optional: restore input order if downstream requires it
embedded = restore_input_order(enumerate(embedded))

for r in circuit_breaker_rate_emit(embedded, max_rate=0.2):
    if isinstance(r, Err) and isinstance(r.error, BreakInfo):
        report_circuit_break(r.error)
        break
    process(r)

5. Property-Based Proofs (capstone/tests/test_retries.py)

from hypothesis import given, strategies as st
from collections import defaultdict

@given(items=st.lists(st.integers()))
def test_bounded_attempts(items):
    attempts = defaultdict(int)
    def fn(x: int) -> Result[int, str]:
        attempts[x] += 1
        return Ok(x) if attempts[x] >= 3 else Err("TRANSIENT")
    out = list(retry_map_iter(
        fn, items,
        classifier=lambda e: e == "TRANSIENT",
        policy=fixed_policy(5),
        stage="test",
        max_attempts=10,
    ))
    assert all(a <= 5 for a in attempts.values())

@given()
def test_engine_cap_overrides_policy():
    attempts = [0]
    def fn(_):
        attempts[0] += 1
        return Err("TRANSIENT")
    def always_retry(_): return RetryDecision(True, None)
    out = list(retry_map_iter(
        fn, [0],
        classifier=lambda _: True,
        policy=always_retry,
        stage="test",
        max_attempts=4,
    ))
    assert attempts[0] == 4

@given(items=st.lists(st.integers(), min_size=10))
def test_fairness_interleaving(items):
    attempts = defaultdict(int)
    def fn(x: int):
        attempts[x] += 1
        return Ok(x) if attempts[x] >= 2 else Err("TRANSIENT")
    out = list(retry_map_iter(
        fn, items,
        classifier=lambda _: True,
        policy=fixed_policy(3),
        stage="test",
        inflight_cap=4,
    ))
    # Every item gets at least one chance before any gets a third
    assert max(attempts.values()) <= min(attempts.values()) + 1

@given(items=st.lists(st.integers()))
def test_retry_completion(items):
    tagged = list(enumerate(items))
    attempts = defaultdict(int)
    def fn(iv: tuple[int, int]):
        i, v = iv
        attempts[i] += 1
        needed = (v % 5) + 1
        return Ok(iv) if attempts[i] >= needed else Err("TRANSIENT")
    results = list(retry_map_iter(
        fn, tagged,
        classifier=lambda _: True,
        policy=fixed_policy(10),
        stage="test",
        inflight_cap=32,
    ))
    # All items eventually complete
    assert len(results) == len(items)

@given(items=st.lists(st.integers()))
def test_final_err_annotation(items):
    def fn(x: int) -> Result[int, str]:
        return Err("TRANSIENT")
    out = list(retry_map_iter(
        fn, items,
        classifier=lambda _: True,
        policy=fixed_policy(3),
        stage="test",
        max_attempts=5,
    ))
    for r in out:
        assert isinstance(r, Err)
        e = r.error
        assert e == "TRANSIENT"  # annotation skipped for str errors

6. Big-O & Allocation Guarantees

Variant Time Heap Laziness
retry_map_iter O(N × max_attempts) worst-case O(inflight_cap) Yes

Bounded by max_attempts and inflight_cap; pure and lazy.


7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Infinite retries Non-termination Always bound via policy + engine cap
Blocking sleep in retry Pipeline stalls Policy returns delay only; schedule async
Head-of-line blocking One slow item delays all Bounded inflight_cap + fair priming
Mutable retry state Nondeterminism Policy as pure data

8. Pre-Core Quiz

  1. retry_map_iter for…? → Fair bounded retries on Result stream
  2. fixed_policy for…? → Fixed attempt count
  3. inflight_cap for…? → Fairness / prevent starvation
  4. Final Err contains…? → retry metadata when the error type supports it (e.g. ErrInfo.ctx)
  5. Order semantics…? → Implementation-defined completion order; resequence if input order is required

9. Post-Core Exercise

  1. Apply retry_map_iter to a flaky embedder → verify bounded attempts + fairness.
  2. Write a jitter policy → test next_delay_ms variance.
  3. Refactor an imperative retry loop → retry_map_iter.
  4. Combine with breaker → confirm rate reflects final outcomes after retries.

Continue with: Structured Error Reports

You now have the complete toolkit to make any flaky operation resilient — pure, bounded, fair, and composable with every previous core. The final core is about turning all those errors into beautiful reports.