Skip to content

Configurable Pipelines

Concept Position

flowchart TD
  family["Python Programming"] --> program["Python Functional Programming"]
  program --> module["Module 06: Monadic Flow and Explicit Context"]
  module --> concept["Configurable Pipelines"]
  concept --> capstone["Capstone pressure point"]
flowchart TD
  problem["Start with the design or failure question"] --> example["Study the worked example and trade-offs"]
  example --> boundary["Name the boundary this page is trying to protect"]
  boundary --> proof["Carry that question into code review or the capstone"]

Read the first diagram as a placement map: this page is one concept inside its parent module, not a detached essay, and the capstone is the pressure test for whether the idea holds. Read the second diagram as the working rhythm for the page: name the problem, study the example, identify the boundary, then carry one review question forward.

Progression Note

Module 6 shifts from pure data modelling to effect-aware composition.
We now treat failure, absence, observability, config, and state as first-class effects that propagate automatically through pipelines — eliminating nested conditionals forever.

Module Focus Key Outcomes
5 Algebraic Data Modelling ADTs, exhaustive pattern matching, total functions, refined types
6 Monadic Flows as Composable Pipelines bind/and_then, Reader/State-like patterns, error-typed flows
7 Effect Boundaries & Resource Safety Dependency injection, boundaries, testing, evolution

Core question
How do you make a single monadic pipeline fully configurable at runtime — toggling validation, logging, metrics, and other cross-cutting concerns — using only pure higher-order combinators and Reader, with zero duplication, zero globals, and zero runtime if statements inside the pipeline itself?

This is the true capstone of Module 6. You now have every tool required to ship real-world pipelines that are pure, composable, observable, refactor-safe, and fully configurable with a single config value.

Audience: Engineers who want their beautiful monadic pipelines to actually run in production with different behaviours for dev/prod/test — without compromising on purity or testability.

Outcome 1. You will toggle validation, logging, and metrics with small, composable combinators. 2. You will build one pipeline that behaves completely differently under different configs — with zero duplication. 3. You will have mechanical proof for the featured toggle laws across the modeled enabled and disabled toggle states, so the supported paths stay refactor-safe.

Why Higher-Order Combinators + Reader Fit This Core Best

Pattern Duplication Purity Testability Reconfigurability Refactor Safety
Global flags None No Bad Medium Bad
Duplicated pipelines High Yes Good Bad Bad
Runtime if inside pipeline Medium No Medium Good Bad
Combinators + Reader None Yes Strong Strong Strong

Higher-order combinators + Reader are the best fit for the explicit configuration, testability, and refactor-safety pressures this core is modeling.

1. Laws & Invariants (machine-checked in CI)

Law Formal Statement Why it matters
Identity (disabled) toggle_validation(False, validate, pipeline) == pipeline Disabled validation is a no-op
Projection Equivalence fst(toggle_metrics(enabled, measure, zero, p)(x)) == p(x) Metrics wrapping does not change the computed value
Endomorphic Composition Preservation toggle_validation(True, validate, pipeline) == (lambda x: validate(x).and_then(pipeline)) Enabled validation composes without inventing a second control-flow shape
Writer Law Compatibility run(toggle_logging(enabled, p, mk_msg)(x)).value == p(x) Logging preserves the payload while accumulating logs

The capstone verifies these claims in capstone/tests/unit/fp/test_configurable.py across enabled and disabled toggle states, and it relies on the dedicated Writer and Result law suites under capstone/tests/unit/fp/laws/ for the container-level guarantees. The proof here is intentionally bounded to the toggle combinators and test strategies the repository models.

2. Public API – Three combinators (that's all you need)

# capstone/src/funcpipe_rag/fp/effects/configurable.py – end-of-Module-06 (mypy --strict clean target)

from __future__ import annotations
from typing import Callable, TypeVar
from funcpipe_rag.result.types import Result
from funcpipe_rag.fp.effects import Writer, tell

T = TypeVar("T")
U = TypeVar("U")
E = TypeVar("E")
A = TypeVar("A")   # arbitrary payload for logging / metrics

# 1. Validation toggle – endomorphic (preserves exact type)
def toggle_validation(
    enabled: bool,
    validate: Callable[[T], Result[T, E]],
    pipeline: Callable[[T], Result[U, E]],
) -> Callable[[T], Result[U, E]]:
    if not enabled:
        return pipeline
    return lambda x: validate(x).and_then(pipeline)

# 2. Logging toggle – shape-changing (injects Writer)
def toggle_logging(
    enabled: bool,
    pipeline: Callable[[T], A],
    mk_msg: Callable[[T, A], str] | None = None,
) -> Callable[[T], Writer[A]]:
    if not enabled:
        return lambda x: Writer(lambda: (pipeline(x), ()))

    mk_msg = mk_msg or (lambda x, _: f"processing {x}")

    def wrapped(x: T) -> Writer[A]:
        value = pipeline(x)
        return tell(mk_msg(x, value)).map(lambda _: value)

    return wrapped

# 3. Metrics toggle – shape-changing (adds metrics pair, single evaluation)
M = TypeVar("M")

def toggle_metrics(
    enabled: bool,
    measure: Callable[[T, A], M],
    zero: M,
    pipeline: Callable[[T], A],
) -> Callable[[T], tuple[A, M]]:
    if not enabled:
        return lambda x: (pipeline(x), zero)

    def wrapped(x: T) -> tuple[A, M]:
        value = pipeline(x)
        return value, measure(x, value)

    return wrapped

Three combinators. Zero boilerplate. Everything else is composition.

3. Real-World Example – Full RAG Pipeline with Runtime Toggles

@dataclass(frozen=True)
class PipelineConfig:
    strict_validation: bool
    enable_logging: bool
    enable_metrics: bool

# Core pipeline (pure, no config)
def embed_chunk_core(chunk: Chunk) -> Result[EmbeddedChunk, ErrInfo]:
    return (
        pure(chunk.text.content)
        .and_then(tokenize)
        .and_then(model.encode)
        .map(lambda vec: replace(chunk, embedding=Embedding(vec, "unknown")))
    )

# Validation step
def validate_chunk(chunk: Chunk) -> Result[Chunk, ErrInfo]:
    return Ok(chunk) if len(chunk.text.content) < 10_000 else Err(ErrInfo("TOO_LONG", ...))

# Metrics
@dataclass(frozen=True)
class Metrics:
    chunks: int = 0
    tokens: int = 0

def measure_chunk(chunk: Chunk, _: Result[EmbeddedChunk, ErrInfo]) -> Metrics:
    # NOTE: in real code you'd reuse token count from upstream; here we re-compute for illustration
    return Metrics(chunks=1, tokens=len(tokenize(chunk.text.content)))

# Build the final configurable pipeline
def build_pipeline() -> Reader[PipelineConfig, Callable[[Chunk], Writer[tuple[Result[EmbeddedChunk, ErrInfo], Metrics]]]]:
    def build(cfg: PipelineConfig):
        step = embed_chunk_core

        if cfg.strict_validation:
            step = toggle_validation(True, validate_chunk, step)

        step = toggle_metrics(cfg.enable_metrics, measure_chunk, Metrics(), step)

        def mk_msg(chunk: Chunk, pair: tuple[Result[EmbeddedChunk, ErrInfo], Metrics]) -> str:
            res, metrics = pair
            status = "ok" if isinstance(res, Ok) else "err"
            return f"chunk={chunk.id} status={status} tokens={metrics.tokens}"

        return toggle_logging(cfg.enable_logging, step, mk_msg)

    return Reader(build)

# Usage – completely different behaviour with different configs
dev_pipeline  = build_pipeline().run(PipelineConfig(strict_validation=True,  enable_logging=True,  enable_metrics=True))
prod_pipeline = build_pipeline().run(PipelineConfig(strict_validation=False, enable_logging=False, enable_metrics=True))

# Same code, different behaviour — zero duplication

4. Before → After – The Same Pipeline, Three Different Behaviours

# BEFORE – three completely duplicated pipelines
def embed_chunk_dev(chunk: Chunk) -> Result[EmbeddedChunk, ErrInfo]: ...
def embed_chunk_prod(chunk: Chunk) -> Result[EmbeddedChunk, ErrInfo]: ...
def embed_chunk_test(chunk: Chunk) -> Result[EmbeddedChunk, ErrInfo]: ...

# AFTER – one pipeline, many configs
pipeline = build_pipeline().run(current_config)   # dev / prod / test — one line change

Zero duplication. Full testability. Instant reconfiguration.

5. Property-Based Proofs (capstone/tests/test_configurable.py)

@given(x=st.integers(), enabled=st.booleans())
def test_toggle_validation_identity_when_disabled(x, enabled):
    if not enabled:
        toggled = toggle_validation(enabled, validate_positive, base_step)
        assert toggled(x) == base_step(x)

@given(x=st.integers())
def test_toggle_validation_preserves_behaviour_when_enabled(x):
    toggled = toggle_validation(True, validate_positive, base_step)
    expected = validate_positive(x).and_then(base_step)
    assert toggled(x) == expected

@given(x=st.integers(), enabled=st.booleans())
def test_toggle_metrics_projection_equivalence(x, enabled):
    def base_step(v: int) -> int:
        return v * 2

    def measure(v: int, y: int) -> int:
        return v + y

    toggled = toggle_metrics(enabled, measure, zero=0, pipeline=base_step)
    value, metric = toggled(x)

    # Projection equivalence
    assert value == base_step(x)

    # Identity on metrics when disabled
    if not enabled:
        assert metric == 0

@given(x=st.integers(), enabled=st.booleans())
def test_toggle_logging_projection_equivalence(x, enabled):
    def base_step(v: int) -> int:
        return v * 3

    def mk_msg(inp: int, out: int) -> str:
        return f"{inp}->{out}"

    toggled = toggle_logging(enabled, base_step, mk_msg)
    writer = toggled(x)
    value, logs = writer.run()

    # Projection equivalence
    assert value == base_step(x)

    # Identity on logs when disabled
    if not enabled:
        assert logs == ()

6. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Duplicated pipelines Copy-paste hell One pipeline + Reader[Config]
Global feature flags Hidden behaviour, untestable Configurable via combinators
Runtime if inside pipeline Impure, breaks referential transparency Pure higher-order combinators

7. Pre-Core Quiz

  1. How do you toggle a feature? → Higher-order combinator + Reader[Config]
  2. Shape-changing toggles are for? → Logging (Writer) and metrics
  3. Endomorphic toggles are for? → Validation (same input/output type)
  4. Where do you apply toggles? → In the pipeline builder
  5. The golden rule? → One pipeline, many configs — zero duplication

8. Post-Core Exercise

  1. Add a toggle_strict_tokenization combinator to your real embedding pipeline.
  2. Write a test that runs the same pipeline code under three different configs and asserts different behaviour.
  3. Sleep well — Module 6 is complete.

End of Module 6

You have now completed the entire effect-encoding toolbox:
Result, Option, Validation, Reader, State, Writer, layering, boundaries, and configurable combinators.

Every production pipeline you write from this point forward will be pure, composable, observable, refactor-safe, and mathematically proven correct.

Module 7 begins the architectural layer: boundaries, resources, and real-world deployment.