Callbacks to Combinators¶
Page Maps¶
graph LR
family["Python Programming"]
program["Python Functional Programming"]
section["Data First Apis Expression Style"]
page["Callbacks to Combinators"]
capstone["Capstone evidence"]
family --> program --> section --> page
page -.applies in.-> capstone
flowchart LR
orient["Orient on the page map"] --> read["Read the main claim and examples"]
read --> inspect["Inspect the related code, proof, or capstone surface"]
inspect --> verify["Run or review the verification path"]
verify --> apply["Apply the idea back to the module and capstone"]
Treat combinators as a readability tool before treating them as an abstraction tool. The payoff is that a pipeline becomes something you can scan stage by stage instead of mentally stepping through nesting and control glue.
Start With the Composition Problem¶
You may know each individual function you want and still wire them together with loops, callbacks, and local bookkeeping. That wiring is the thing this lesson simplifies.
- If the pipeline shape is hidden inside callback nesting, the composition is hard to inspect.
- If each stage is configured but the orchestration is still imperative, the full flow is still hard to see.
- If adding one new transformation means rewriting control glue, combinators are likely missing.
Keep This Question In View¶
Core question:
How do you replace nested callbacks and imperative chains with combinators (flow, fmap, ffilter, flatmap) that compose lazy, configured, boundary-sealed functions—so pipelines from M02C01–M02C06 are efficient, readable, and testable?
This lesson introduces combinators as disciplined orchestration:
- use
flowto make the overall pipeline shape visible - use
fmap,ffilter, andflatmapwhen the stage purpose is transform, keep, or expand - keep earlier configuration and boundary work intact instead of burying it under control glue
The running project keeps the lesson practical: a combinator chain should make the document pipeline easier to read, not more magical.
Use this when you have config-as-data but still wire stages together with nested callbacks or imperative loops that hide the real pipeline.
Outcome:
1. Identify callback smells (nested functions, imperative chains) and explain their impact on composability.
2. Refactor a callback chain into combinators with bound pure functions.
3. Write Hypothesis properties proving pipeline equivalence, with a shrinking example.
1. Conceptual Foundation¶
1.1 Callback Hell to Combinators in One Precise Sentence¶
Combinators replace callback hell with higher-order functions (flow, fmap, ffilter, flatmap) that compose lazy streams of bound pure functions—ensuring pipelines are readable, efficient, and configurable without nesting or boilerplate.
1.2 The One-Sentence Rule¶
Replace nested callbacks and loops with flow for pipeline orchestration, fmap/ffilter/flatmap for transformations—bind config to pure functions via partial or by passing configuration-bearing callables, keeping effects sealed and streams lazy.
1.3 Why This Matters Now¶
By this point in the module, you can configure behavior, express value-producing logic, and stream data lazily. What still gets in the way is the orchestration layer. When the pipeline itself is hard to read, all the earlier improvements become harder to review. Combinators solve that by making the sequence of stages explicit.
1.4 Combinators as Values in 5 Lines¶
The next example matters because it contrasts a hand-built loop with a stage that can be dropped into a larger pipeline without rewriting the surrounding control flow.
from funcpipe_rag import CleanDoc, ChunkWithoutEmbedding, RagEnv
from funcpipe_rag import gen_chunk_doc
from funcpipe_rag import flatmap
from functools import partial
# Before: Imperative, eager loop
def before_chunk(cd: CleanDoc, env: RagEnv) -> list[ChunkWithoutEmbedding]:
chunks = []
text = cd.abstract
for start in range(0, len(text), env.chunk_size):
chunk_text = text[start: start + env.chunk_size]
if chunk_text:
chunks.append(ChunkWithoutEmbedding(cd.doc_id, chunk_text, start, start + len(chunk_text)))
return chunks # Eager list
# After: Lazy combinator with bound config
bound_chunk = partial(gen_chunk_doc, env=RagEnv(512))
lazy_chunk = flatmap(bound_chunk) # flatmap defined in §4.1
# Usage: lazy_chunk(cleaned_docs_iter) → Iterator[ChunkWithoutEmbedding]
Combinators, bound to config via partial, allow storage in dicts, composition with M02C01, and lazy application—readable and efficient.
Note: Raw dicts from env/CLI live only at the boundary; inside, configuration is always represented as frozen dataclasses (possibly stored in dict lookups).
2. Mental Model: Callback Hell vs Combinator Chains¶
2.1 One Picture¶
Callback Hell (Nested) Combinator Chains (Linear)
+---------------------------+ +------------------------------+
| def rag(docs, on_done): | | flow( |
| cleaned = clean(docs, |. | ffilter(bound_keep), |
| lambda c: | | fmap(bound_clean), |
| chunks = chunk(c, | | flatmap(bound_chunk), |
| lambda ch: | | fmap(bound_embed) |
| on_done(ch) | | )() |
| )) | +------------------------------+
+---------------------------+
↑ Nested, Eager, Opaque ↑ Linear, Lazy, Config-Bound
2.2 Contract Table¶
| Aspect | Callback Hell | Combinator Chains |
|---|---|---|
| Readability | Nested indentation | Linear flow |
| Laziness | Often eager | Iterator-based |
| Configurability | Hardcoded | Bound via partial |
| Composability | Manual nesting | Higher-order (flow) |
| Testing | Mock callbacks | Property-based streams |
| Mutable Defaults in Partials | Breaks Determinism | Use frozen dataclasses or immutable types for configs |
Note on Callback Choice: Use callbacks only for legacy APIs; always prefer combinators for pipelines.
3. Running Project: FuncPipe RAG Builder¶
We extend the FuncPipe RAG Builder from m02-rag.md:
- Dataset: 10k arXiv CS abstracts (arxiv_cs_abstracts_10k.csv).
- Goal: Replace callback chains with combinator pipelines.
- Start: Callback-heavy version (core7_start.py).
- End: Linear combinator chain, preserving equivalence for chunk sequence.
3.1 Types (Canonical, Used Throughout)¶
From previous cores.
3.2 Callback Hell Start (Anti-Pattern)¶
# core7_start.py (anti-pattern): nested callbacks obscure the pipeline
from collections.abc import Callable
from funcpipe_rag import (
Chunk,
Ok,
Observations,
RagBoundaryDeps,
RagConfig,
eval_pred,
gen_chunk_doc,
structural_dedup_chunks,
)
def callback_full_rag_api(
path: str,
config: RagConfig,
deps: RagBoundaryDeps,
on_done: Callable[[tuple[list[Chunk], Observations]], None]
) -> None:
def on_docs(docs):
def on_cleaned(cleaned):
def on_chunks(chunks):
obs = Observations(
total_docs=len(docs),
total_chunks=len(chunks),
kept_docs=len(docs),
cleaned_docs=len(cleaned),
)
on_done((chunks, obs))
chunks = [deps.core.embedder(c) for cd in cleaned for c in gen_chunk_doc(cd, config.env)]
chunks = structural_dedup_chunks(chunks)
on_chunks(chunks)
kept = [d for d in docs if eval_pred(d, config.keep.keep_pred)]
cleaned = [deps.core.cleaner(d) for d in kept]
on_cleaned(cleaned)
docs_res = deps.reader.read_docs(path)
if isinstance(docs_res, Ok):
on_docs(docs_res.value)
Smells:
- Nested callbacks (on_docs, on_cleaned).
- Eager lists mid-chain.
- Hard to compose/test.
Problem: Obscures flow; breaks laziness.
4. Refactor to Combinators: Linear Chains with Bound Functions¶
4.1 Combinators (Lazy, Generic)¶
Core combinators:
from funcpipe_rag import ffilter, flatmap, fmap, flow
# `flow` builds a 0-arg pipeline from a producer + iterable→iterable stages.
pipeline = flow(
lambda: range(5),
ffilter(lambda x: x % 2 == 0),
fmap(lambda x: x + 1),
flatmap(lambda x: (x, x)),
)
assert list(pipeline()) == [1, 1, 3, 3, 5, 5]
Properties:
- Lazy: Iterator-based.
- Generic: Work on any iterable.
- Pure: No effects.
Note: While combinators promote expression-oriented code, prioritize readability: If a combinator chain becomes nested or complex (e.g., 3+ layers), refactor to named helper functions or consider a simple loop inside a trivial pure wrapper. Purity matters, but so does maintainability.
4.2 Refactored Pipeline (Combinator Chain in Internal Logic)¶
Bound pure functions:
def _run_core_on_docs(
docs: list[RawDoc],
config: RagConfig,
deps: RagCoreDeps
) -> Result[tuple[list[Chunk], Observations]]:
keep_rule = lambda d: eval_pred(d, config.keep.keep_pred)
bound_keep = ffilter(keep_rule)
bound_clean = fmap(deps.cleaner)
bound_chunk = flatmap(lambda cd: gen_chunk_doc(cd, config.env))
bound_embed = fmap(deps.embedder)
# Metrics pass (pedagogical; duplicates pure work)
kept_docs = list(bound_keep(docs))
cleaned = list(bound_clean(kept_docs))
# Main pipeline
pipeline = flow(lambda: docs, bound_keep, bound_clean, bound_chunk, bound_embed)
chunks_iter = pipeline()
chunks = structural_dedup_chunks(chunks_iter)
obs = Observations(
total_docs=len(docs),
total_chunks=len(chunks),
kept_docs=len(kept_docs),
cleaned_docs=len(cleaned),
sample_doc_ids=tuple(d.doc_id for d in kept_docs[: config.env.sample_size]),
sample_chunk_starts=tuple(c.start for c in chunks[: config.env.sample_size]),
)
return Ok((chunks, obs))
Properties:
- Linear: Clear flow.
- Lazy: Streams until structural_dedup_chunks.
- Config-bound: Via partial; same semantics as M02C05–M02C06 (only wiring changes).
Note: For Observations we recompute the keep/clean steps on docs; in real code you’d thread the intermediate results or refactor Observations to avoid duplicate work.
4.3 Public API (Unchanged from M02C05–M02C06)¶
from funcpipe_rag import full_rag_api_path
res = full_rag_api_path("arxiv_cs_abstracts_10k.csv", config, deps)
Properties:
- Keeps Result; boundaries unchanged.
4.4 Configurator Tie-In (M02C01)¶
Wins: Combinators compose with M02C01 partial for variants.
5. Equational Reasoning: Substitution Exercise¶
Hand Exercise: Substitute in fmap/ffilter.
1. Inline bound_keep = ffilter(config.keep) → fixed predicate.
2. Substitute into filterer → parametric iterator.
3. Result: Pipeline fixed for fixed config/deps (immutable).
Bug Hunt: In callback version, nesting obscures substitution.
Example:
- Callback: Nested lambdas → hard to substitute.
- Combinator: Linear stages → substitutable.
6. Property-Based Testing: Proving Pipeline Behaviour¶
Use Hypothesis to prove refactor preserves laziness and config-driven behaviour.
6.1 Custom Strategy¶
From capstone/tests/conftest.py (as in Module 1).
6.2 Pipeline Equivalence Property¶
# capstone/tests/test_rag_api.py (equivalence via combinators)
from hypothesis import given
import hypothesis.strategies as st
from funcpipe_rag import (
RagConfig,
RagEnv,
eval_pred,
ffilter,
flatmap,
flow,
fmap,
full_rag_api_docs,
gen_chunk_doc,
get_deps,
structural_dedup_chunks,
)
from tests.conftest import doc_list_strategy
@given(docs=doc_list_strategy(), chunk_size=st.integers(128, 1024))
def test_pipeline_equivalence(docs, chunk_size):
config = RagConfig(env=RagEnv(chunk_size))
deps = get_deps(config)
keep_rule = lambda d: eval_pred(d, config.keep.keep_pred)
pipeline = flow(
lambda: docs,
ffilter(keep_rule),
fmap(deps.cleaner),
flatmap(lambda cd: gen_chunk_doc(cd, config.env)),
fmap(deps.embedder),
)
chunks = structural_dedup_chunks(pipeline())
expected, _ = full_rag_api_docs(docs, config, deps)
assert chunks == expected
Note: Tests combinator pipeline matches Module 1 (chunk sequence equivalence; Observations simplified for pedagogy).
6.3 Lazy Prefix Equivalence¶
from itertools import islice
@given(docs=doc_list_strategy(), chunk_size=st.integers(128, 1024), k=st.integers(0, 50))
def test_lazy_prefix(docs, chunk_size, k):
config = RagConfig(env=RagEnv(chunk_size))
deps = get_deps(config)
keep_rule = lambda d: eval_pred(d, config.keep.keep_pred)
pipeline = flow(
lambda: docs,
ffilter(keep_rule),
fmap(deps.cleaner),
flatmap(lambda cd: gen_chunk_doc(cd, config.env)),
fmap(deps.embedder),
)
chunks_prefix = list(islice(pipeline(), k))
assert chunks_prefix == list(pipeline())[:k]
Note: Verifies lazy pipeline matches Module 1 on prefixes.
6.4 Idempotence Property¶
from funcpipe_rag import Ok, RagBoundaryDeps, full_rag_api_path
class FakeReader:
def __init__(self, docs):
self._docs = docs
def read_docs(self, path):
_ = path
return Ok(self._docs)
@given(docs=doc_list_strategy(), chunk_size=st.integers(128, 1024))
def test_pipeline_idempotence(docs, chunk_size):
config = RagConfig(env=RagEnv(chunk_size))
deps = RagBoundaryDeps(core=get_deps(config), reader=FakeReader(docs))
res1 = full_rag_api_path("fake.csv", config, deps)
res2 = full_rag_api_path("fake.csv", config, deps)
assert res1 == res2
Note: Ensures no hidden state in combinator pipeline.
6.5 Shrinking Demo: Catching a Leaky Bug¶
Bad pipeline with missing filter:
from funcpipe_rag import RulesConfig, StartsWith
@given(docs=doc_list_strategy(), chunk_size=st.integers(128, 1024))
def test_bad_pipeline(docs, chunk_size):
config = RagConfig(
env=RagEnv(chunk_size),
keep=RulesConfig(keep_pred=StartsWith("categories", "cs.")),
)
deps = get_deps(config)
# Missing ffilter(keep_rule)!
bad_pipeline = flow(
lambda: docs,
fmap(deps.cleaner),
flatmap(lambda cd: gen_chunk_doc(cd, config.env)),
fmap(deps.embedder),
)
chunks = structural_dedup_chunks(bad_pipeline())
expected, _ = full_rag_api_docs(docs, config, deps)
assert chunks == expected
Failure Trace (Example):
Falsifying example: test_bad_pipeline(
docs=[RawDoc(doc_id='cs-123', title='Title', abstract='Abstract', categories='invalid')],
chunk_size=128,
)
AssertionError
Analysis: Shrinks to docs failing keep (e.g., invalid category); catches missing filter bug.
7. When Combinators Aren't Worth It¶
Use callbacks/loops only in:
- Trivial one-step operations.
- Legacy integrations wrapping combinators.
Guardrails: Isolate to <10 lines; prefer combinators for pipelines.
Example:
8. Pre-Core Quiz¶
- Nested callbacks? → Use flow.
- Eager
list(gen)? → Laziness with fmap. - Unbound predicate? → Partial with config.
- Effect in mapper? → Seal in stage.
- Prove pipeline? → Hypothesis over outputs/prefixes.
9. Post-Core Reflection & Exercise¶
Reflect: Find a callback chain or loop. Refactor to combinators with bound functions; add Hypothesis for equivalence/idempotence.
Project Exercise: Apply to RAG (e.g., pipeline with fmap/ffilter); run properties.
- Did linearity improve readability?
- Did laziness reduce memory?
- Did binding clarify config?
Continue with: Tiny Function DSLs
Verify all patterns with Hypothesis—examples provided show how to detect impurities like globals or non-determinism.
Further Reading: For more on closures in Python, see 'Fluent Python' by Luciano Ramalho. Explore toolz for advanced partials once comfortable.