Skip to content

CLI and Config Pipelines

Concept Position

flowchart TD
  family["Python Programming"] --> program["Python Functional Programming"]
  program --> module["Module 09: Ecosystem Interop and Boundary Discipline"]
  module --> concept["CLI and Config Pipelines"]
  concept --> capstone["Capstone pressure point"]
flowchart TD
  problem["Start with the design or failure question"] --> example["Study the worked example and trade-offs"]
  example --> boundary["Name the boundary this page is trying to protect"]
  boundary --> proof["Carry that question into code review or the capstone"]

Read the first diagram as a placement map: this page is one concept inside its parent module, not a detached essay, and the capstone is the pressure test for whether the idea holds. Read the second diagram as the working rhythm for the page: name the problem, study the example, identify the boundary, then carry one review question forward.

Module 09

Core question:
How do you add command-line entry points to FuncPipe without letting framework choice take over the architecture, while keeping config loading, overrides, and pipeline assembly explicit and testable?

In this core, the shipped learner route is deliberately boring: a stdlib argparse shell at the repository edge, pure override helpers in pipelines/cli.py, and pipeline assembly that stays outside the shell. That is the canonical implementation because it proves the architectural point without extra framework noise. Typer and Click still matter, but here they are optional extension seams that should preserve the same contract rather than redefine it. The real lesson is thin shell adapters, config-driven execution, override precedence, explicit exit-code mapping, and tests that compare CLI behavior against the same core pipeline logic.

Motivation Bug: Hard-coded scripts mix I/O with logic, leading to untestable entry points; CLI integration with config-driving separates concerns for reusable, testable FuncPipe.

Delta from Core 5 (Module 09): Data/ML pipelines are internal; this adds CLI entry points with config-loading for user-driven execution.

CLI Protocol (Contract, Entry/Exit Criteria): - Thin Adapters: Commands parse/build RunSpec, call run_from_spec (pure except boundaries), handle render/exit centrally; no policy in CLI. - Config-Driven: Load from file/env with CLI overrides (precedence: CLI > env > file > defaults); deep merge + re-validate; discovery via --config/env/XDG. - Composability: Subcommands for modular pipelines; groups for related funcs. - Purity: Core funcs pure; effects in CLI (e.g., print, file I/O); runtime coercion for primitives. - Semantics: Laws like determinism (fixed spec deterministic under pinned env/artifacts, no timestamps/random); equivalence (CLI call == direct func up to I/O/formatting); verified via properties/invoke tests. - Integration: Keep the shipped route stdlib-first; optional Typer or Click shells should reuse the same pure override/config helpers instead of forking the design. - Mypy Config: --strict on the shipped helpers and stdlib shell; optional framework shells may stay import-guarded.

Audience: Developers exposing FuncPipe as tools/scripts and needing one honest CLI boundary before deciding whether extra framework features are worth the dependency.

Outcome: 1. Build a thin CLI shell that delegates to pure FuncPipe helpers instead of mixing policy into argument parsing. 2. Make the capstone config-driven via explicit override and merge rules. 3. Judge when a richer CLI framework is warranted without changing the underlying contract.


1. Laws & Invariants

Law Description Enforcement
Determinism Law Same spec yields same outputs (conditional on pure core, pinned env/artifacts, no timestamps/random). Invoke tests with preconditions
Equivalence Law CLI response == direct core call (up to I/O/formatting). Hypothesis/invoke
Idempotence Inv Repeat calls same if idempotent logic. Property tests
Config Inv Overrides applied consistently per precedence; re-validated post-merge. Config tests

These laws ensure CLI doesn't break FuncPipe properties.


2. Decision Table

Scenario Type Safety Subcommands Needed Recommended
Canonical learner route Sufficient No stdlib argparse
Type-driven shell with framework help High No Typer
Complex callback-heavy groups Medium Yes Click
Config overrides with the same pure helper split Any Any keep the repo's stdlib helper layer, then wrap as needed

Start with stdlib argparse; reach for Typer or Click only when the shell genuinely needs more than the default proof route.


3. Public API (CLI Commands & Config Loaders)

Commands are thin adapters. Start with the shipped shell, then compare optional wrappers against it.

Repo alignment note (end-of-Module-09): - This repo ships a stdlib argparse CLI at capstone/src/funcpipe_rag/boundaries/shells/cli.py. - Override parsing/merge lives in capstone/src/funcpipe_rag/pipelines/cli.py. - A minimal optional Typer shell exists at capstone/src/funcpipe_rag/boundaries/shells/typer_cli.py (import-guarded).

Canonical learner route: - Open boundaries/shells/cli.py first. - Open pipelines/cli.py second. - Treat the Typer block below as an optional extension sketch, not as a competing production path. - If an example here and the repo disagree, the shipped stdlib shell wins.

Exit Code Mapping (from FPResult): | Result | Code | Example | |--------|------|---------| | Ok | 0 | Success | | Err(domain) | 2 | Invalid input | | Err(infra) | 3 | File not found | | Err(unexpected) | 1 | Runtime error |

from typing import Optional, Dict, Any, TypeVar, List
from pathlib import Path
from functools import reduce
import os
import json
import yaml
import typer
from pydantic import BaseModel
from funcpipe_rag import rag_pipeline
from funcpipe_rag import PipelineConfig, FPResult, Ok, ErrInfo, StepConfig

Out = TypeVar('Out')

app = typer.Typer()


class RunSpec(BaseModel):
    input_path: Path
    config: PipelineConfig
    artifacts: Dict[str, Any]  # Model handles, etc.
    seed: Optional[int] = None  # For determinism
    output_format: str = 'json'  # json/text/etc.


def deep_merge(base: Dict[str, Any], over: Dict[str, Any]) -> Dict[str, Any]:
    merged = base.copy()  # Non-mutating
    for k, v in over.items():
        if isinstance(v, dict) and isinstance(merged.get(k), dict):
            merged[k] = deep_merge(merged[k], v)
        else:
            merged[k] = v
    return merged


def load_and_override(cli_config_overrides: Dict[str, Any], env_config_overrides: Dict[str, Any],
                      file_path: Optional[Path] = None) -> PipelineConfig:
    # Precedence: CLI > env > file > defaults for config-level
    if file_path is None:
        file_path = find_config()
    if file_path and file_path.exists():
        with open(file_path, 'r') as f:
            if file_path.suffix in ('.yaml', '.yml'):
                base_dict = yaml.safe_load(f)
            else:
                base_dict = json.load(f)
        base_dict = PipelineConfig.model_validate(base_dict).model_dump()
    else:
        base_dict = PipelineConfig().model_dump()  # Defaults
    merged = reduce(deep_merge, [base_dict, env_config_overrides, cli_config_overrides])
    return PipelineConfig.model_validate(merged)  # Re-validate post-merge


def find_config() -> Optional[Path]:
    if env_path := os.getenv('FUNCPipe_CONFIG'):
        return Path(env_path)
    xdg = Path(os.getenv('XDG_CONFIG_HOME', '~/.config')).expanduser() / 'funcpipe'
    for ext in ['.json', '.yaml', '.yml']:
        candidate = xdg / ('config' + ext)
        if candidate.exists():
            return candidate
    return None


def parse_override(path_value: str) -> Dict[str, Any]:
    path, value = path_value.split('=', 1)
    keys = path.split('.')
    d = {keys[-1]: coerce(value)}
    for k in reversed(keys[:-1]):
        d = {k: d}
    return d


def coerce(v: str) -> Any:
    try:
        return json.loads(v)
    except:
        return v


def apply_overrides(cfg: PipelineConfig, overrides: Dict[str, Any]) -> PipelineConfig:
    cfg2 = cfg.model_copy(deep=True)
    new_steps: List[StepConfig] = []
    seen: set[str] = set()
    for step in cfg2.steps:
        if step.name in overrides:
            seen.add(step.name)
            over = overrides[step.name]
            new_params = deep_merge(step.params, over.get('params', {}))
            step = step.model_copy(update={"params": new_params})
        new_steps.append(step)
    unknown = set(overrides.keys()) - seen
    if unknown:
        raise ValueError(f"Unknown step override(s): {sorted(unknown)}")
    return cfg2.model_copy(update={"steps": new_steps})


def run_from_spec(spec: RunSpec) -> FPResult[Out, ErrInfo]:
    if spec.seed is not None:
        set_seed(spec.seed)  # Example boundary hook; the shipped shell keeps determinism in config and fixtures.
    docs = load_docs(spec.input_path)  # Boundary I/O enters here before delegating into the pipeline builder.
    return rag_pipeline(spec.config, docs, spec.artifacts)


def handle_result(result: FPResult[Out, ErrInfo], output_format: str) -> int:
    if isinstance(result, Ok):
        out = format_result(result.value, output_format)  # Placeholder; json.dumps for json
        typer.echo(out)
        return 0
    else:
        if output_format == 'json':
            err_out = json.dumps({
                "error": {"kind": result.error.kind, "code": result.error.code, "message": result.error.msg}
            }, ensure_ascii=False)
        else:
            err_out = f"Error [{result.error.kind}]: {result.error.msg} (code: {result.error.code})"
        typer.echo(err_out, err=True)
        code_map = {'domain': 2, 'infra': 3, 'unexpected': 1}
        return code_map.get(result.error.kind, 1)


def build_spec_from_cli(
        input_path: Path,
        config_path: Optional[Path],
        override: Optional[list[str]],
        seed: Optional[int],
        output_format: str,
) -> RunSpec:
    cli_config_overrides = {}  # Config-level
    cli_step_overrides = {}  # Step-level
    if override:
        for ov in override:
            parsed = parse_override(ov)
            if 'steps' in parsed:
                cli_config_overrides = deep_merge(cli_config_overrides, parsed)
            else:
                cli_step_overrides = deep_merge(cli_step_overrides, parsed)
    env_config_overrides = parse_override(os.getenv('FUNCPipe_OVERRIDE', '')) if os.getenv(
        'FUNCPipe_OVERRIDE') else {}  # Config-level example
    config = load_and_override(cli_config_overrides, env_config_overrides, config_path)
    config = apply_overrides(config, cli_step_overrides)
    config = PipelineConfig.model_validate(config.model_dump())  # Re-validate after apply
    return RunSpec(input_path=input_path, config=config, artifacts=load_artifacts(), seed=seed,
                   output_format=output_format)  # Example artifact injection seam.


@app.command()
def rag_process(
        input_path: Path,
        config_path: Optional[Path] = typer.Option(None, '--config'),
        override: Optional[list[str]] = typer.Option(None, '--set', help="Override step.params.key=value"),
        seed: Optional[int] = typer.Option(None, '--seed'),
        output_format: str = typer.Option('json', '--format'),
        dry_run: bool = typer.Option(False, '--dry-run'),
        print_config: bool = typer.Option(False, '--print-config'),
):
    if print_config:
        config = load_and_override({}, {}, config_path)
        typer.echo(config.model_dump_json(indent=2))
        raise typer.Exit(0)
    spec = build_spec_from_cli(input_path, config_path, override, seed, output_format)
    if dry_run:
        typer.echo("Dry run: would process with spec above")
        raise typer.Exit(0)
    result = run_from_spec(spec)
    exit_code = handle_result(result, spec.output_format)
    raise typer.Exit(exit_code)

4. Reference Implementations

4.1 Click Basics in FP

import click

@click.group()
def cli():
    pass

@cli.command()
@click.argument('input_path', type=click.Path(exists=True, path_type=Path))
@click.option('--config-path', type=click.Path(exists=True, path_type=Path))
@click.option('--set', 'overrides', multiple=True, help="Override step.params.key=value")
@click.option('--seed', type=int)
@click.option('--format', 'output_format', default='json')
@click.option('--dry-run', is_flag=True)
@click.option('--print-config', is_flag=True)
def rag_process(input_path, config_path, overrides, seed, output_format, dry_run, print_config):
    if print_config:
        config = load_and_override({}, {}, config_path)
        click.echo(config.model_dump_json(indent=2))
        raise click.exceptions.Exit(0)
    spec = build_spec_from_cli(input_path, config_path, overrides, seed, output_format)
    if dry_run:
        click.echo("Dry run: would process with spec above")
        raise click.exceptions.Exit(0)
    result = run_from_spec(spec)
    exit_code = handle_result(result, spec.output_format)  # Use click.echo in handle
    raise click.exceptions.Exit(exit_code)

4.2 Optional Typer Extension Sketch

# See Public API above. The shipped repo keeps argparse as the canonical learner route
# and offers the Typer shell as an optional extension seam with the same helper split.

4.3 Config Loading/Overrides

# See load_and_override and parse_override above.
# The shipped repo supports dotted dict overrides and keeps list-index syntax out of the public contract.

4.4 Integration in RAG

# capstone/pyproject.toml for entrypoints (this repo uses a stdlib argparse shell)
[project.scripts]
funcpipe-rag = "funcpipe_rag.boundaries.shells.cli:main"

4.5 Before/After Refactor

# Before: Script
config = load_config('default.json')
result = rag_pipeline(config, docs)
# After: CLI
$ rag rag-process input.txt --set chunk.params.size=256

5. Property-Based Proofs (repo tests)

Runnable tests live in capstone/tests/unit/pipelines/test_cli_overrides.py.

from typer.testing import CliRunner
from hypothesis import given
import hypothesis.strategies as st
import os
import pytest
from funcpipe_rag import StepConfig  # Import for tests
from unittest.mock import patch  # Import for tests


# Property tests on pure layers
@given(st.dictionaries(st.text(), st.text()))
def test_deep_merge_precedence(base):
    env = {'a': 'env'}
    cli = {'a': 'cli', 'b': 'cli'}
    merged = deep_merge(deep_merge(base, env), cli)
    assert merged['a'] == 'cli'
    assert 'b' in merged


@given(st.integers(min_value=100, max_value=1000))
def test_runner_equiv(chunk_size):
    config = PipelineConfig(steps=[StepConfig(name='chunk', params={'size': chunk_size})])
    spec = RunSpec(input_path=Path('test.txt'), config=config, artifacts={}, seed=42, output_format='json')
    # In this repo, the CLI shell lives in `funcpipe_rag.boundaries.shells.cli`.
    # Patch boundary I/O (storage) rather than core pipeline building.
    with patch('funcpipe_rag.infra.adapters.file_storage.FileStorage.read_docs') as mock_load:
        mock_load.return_value = []  # Mock boundary
        result = run_from_spec(spec)
    assert result.is_ok()  # Or check value


# CLI example-based
def test_cli_dry_run():
    runner = CliRunner()
    with runner.isolated_filesystem():
        Path('test.txt').write_text('data')
        os.environ['FUNCPipe_CONFIG'] = ''  # Pin
        result = runner.invoke(app, ['rag-process', 'test.txt', '--dry-run'])
        assert result.exit_code == 0
        assert "Dry run" in result.output

6. Runtime Preservation Guarantee

CLI adds no overhead; core perf preserved.


7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Logic in CLI Untestable Delegate to pure runner
No overrides Rigid Add options/env/merge
No validation Bad args Use typer/click types + Pydantic

8. Pre-Core Quiz

  1. Click for…? → Flexible CLIs
  2. Typer for…? → Type-driven
  3. Config for…? → Reproducibility
  4. Thin adapters? → Testability
  5. Benefit? → User-friendly entry

9. Post-Core Exercise

  1. Add CLI to RAG pipeline.
  2. Test equivalence with invoke.

Pipeline Usage (Idiomatic)

@app.command()
def handler(arg: Type):
    spec = build_spec_from_cli(arg)
    result = run_from_spec(spec)
    raise typer.Exit(handle_result(result, spec.output_format))

Continue with: Distributed Dataflow