Domain State ADTs¶
Page Maps¶
graph LR
family["Python Programming"]
program["Python Functional Programming"]
section["Algebraic Data Modelling Validation"]
page["Domain State ADTs"]
capstone["Capstone evidence"]
family --> program --> section --> page
page -.applies in.-> capstone
flowchart LR
orient["Orient on the page map"] --> read["Read the main claim and examples"]
read --> inspect["Inspect the related code, proof, or capstone surface"]
inspect --> verify["Run or review the verification path"]
verify --> apply["Apply the idea back to the module and capstone"]
State modelling needs to feel concrete. Do not treat “ADTs for status values” as just a nicer syntax. Explicit states and transitions remove entire classes of stuck, partial, and contradictory workflow bugs.
Start With the Lifecycle Smell¶
The first sign that a state model is too weak is usually not a type error. It is a workflow bug: something stays “running” forever, skips required payload, or silently accepts an impossible event.
- If state is a string plus several optional fields, illegal combinations are probably still representable.
- If transitions mutate in place, you cannot easily reason about previous versus next state.
- If terminal states do not have explicit behavior, later code will keep inventing its own rules.
Core question
How do you model domain states like Pending/Running/Done/Failed as algebraic data types in Python — ensuring exhaustive handling, type-safe transitions, and total functions in every FuncPipe pipeline stage?
This lesson extends the product/sum lesson into real state machines:
- make each lifecycle state a first-class variant with the payload it truly owns
- define transitions as pure functions so state movement is reviewable and testable
- keep terminal and invalid-transition behavior explicit rather than scattered through callers
The motivating processing example matters because it shows how “just one status field” quickly turns into a brittle hidden protocol unless the states themselves are modeled directly.
The naïve pattern everyone writes first:
class Job:
def __init__(self):
self.status = "pending"
self.progress = 0
self.error = None
def advance(self, event):
if self.status == "running" and event == "succeed":
self.status = "done" # someone forgets to copy artifact_id
This is the state-machine smell to recognize: silent invalid transitions, mutable corruption, and missing payload at exactly the moment it matters.
The production pattern: every state is a tagged variant with its own payload; every transition is a pure function forced to handle all cases.
ProcessingState = Pending | Running | Done | Failed
def transition(state: ProcessingState, event: Event) -> ProcessingState:
match state, event:
case Pending(), EvStart(started_at=s):
return running(started_at=s, progress_permille=0)
case Running(), EvAdvance(delta_permille=d):
return running(started_at=state.started_at,
progress_permille=min(1000, state.progress_permille + d))
case Running(), EvSucceed(completed_at=c, artifact_id=a, dim=d, sha256=h):
return done(
completed_at=c,
artifact_id=a,
dim=d,
sha256=h,
)
case Running(), EvFail(failed_at=f, code=code, msg=msg, attempt=attempt):
return failed(
failed_at=f,
code=code,
msg=msg,
attempt=attempt,
)
case (Done() | Failed()), _:
return state
raise ValueError("invalid transition")
With explicit state variants and transition functions, the lifecycle becomes something the team can inspect and evolve deliberately instead of a hidden agreement spread across methods.
Use this when you are tired of “stuck in running forever” and “missing failure payload” bugs and want state machines the codebase can actually defend.
Outcome 1. Every flag/field soup replaced with exhaustive, payload-carrying state ADTs. 2. Pure, validated transitions — no mutation, no invalid states. 3. Immutable, serialisable states that survive crashes/refactors without corruption.
Tiny Non-Domain Example – Traffic Light State Machine¶
from dataclasses import dataclass
from typing import Literal
try: # Python ≥3.11
from typing import assert_never
except ImportError: # Python ≤3.10
from typing_extensions import assert_never
@dataclass(frozen=True, slots=True)
class Red:
kind: Literal["red"] = "red"
@dataclass(frozen=True, slots=True)
class Green:
kind: Literal["green"] = "green"
@dataclass(frozen=True, slots=True)
class Yellow:
kind: Literal["yellow"] = "yellow"
TrafficLight = Red | Green | Yellow
def next_light(light: TrafficLight) -> TrafficLight:
match light:
case Red(): return Green()
case Green(): return Yellow()
case Yellow(): return Red()
assert_never(light)
Adding Blinking forces every call site to update — no silent infinite green.
Why State ADTs? (Three bullets every engineer should internalise)¶
- Exhaustiveness: Adding a new state/event breaks every handler/transition until you update it — no silent missing cases.
- Immutability + Value semantics: Frozen + structural eq/hash → safe as dict keys, cache keys, pure function results.
- Validated transitions + payload safety: Smart constructors + pure transition functions make illegal states unrepresentable.
1. Laws & Invariants (machine-checked)¶
| Law | Formal Statement | Enforcement |
|---|---|---|
| Exhaustiveness | Every match/transition handles all state and event variants | mypy --strict + assert_never + tests |
| Immutability | No field can be mutated post-construction | frozen=True, slots=True + tests |
| Structural Equality | x == y iff all fields equal |
Hypothesis equality tests |
| Transition Validity | Only legal transitions allowed; invalid raise early | Property tests on invalid event sequences |
| Progress Monotonic | Progress never decreases; always 0 ≤ progress_permille ≤ 1000 | Property tests + smart constructors |
| Terminal Idempotence | Applying any event to Done | Failed yields a value-equal state |
| JSON Round-Trip | from_dict(to_dict(x)) == x for all instances |
test_processing_state_roundtrip |
2. Decision Table – Enum vs Tagged Union¶
| Need | Payload? | Recommended Construction |
|---|---|---|
| Simple status flags | No | Enum |
| States with data | Yes | Union of tagged dataclasses (what we use here) |
Never use strings or mutable status objects for domain state.
3. Public API (fp/core.py – mypy --strict clean)¶
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from typing import Generic, TypeVar, Literal, Mapping, Sequence, TypeAlias
from datetime import datetime, timezone
try: # Python ≥3.11
from typing import assert_never
except ImportError: # Python ≤3.10
from typing_extensions import assert_never
JSONPrimitive: TypeAlias = str | int | float | bool | None
JSON: TypeAlias = JSONPrimitive | Mapping[str, "JSON"] | Sequence["JSON"]
UTC = timezone.utc
T = TypeVar("T")
E = TypeVar("E")
class ErrorCode(str, Enum):
RATE_LIMIT = "RATE_LIMIT"
TIMEOUT = "TIMEOUT"
EMBED_FAIL = "EMBED_FAIL"
INTERNAL = "INTERNAL"
# Events — pure data + smart constructors
@dataclass(frozen=True, slots=True, kw_only=True)
class EvStart:
started_at: datetime
@dataclass(frozen=True, slots=True, kw_only=True)
class EvAdvance:
delta_permille: int # ≥0
@dataclass(frozen=True, slots=True, kw_only=True)
class EvSucceed:
completed_at: datetime
artifact_id: str
dim: int
sha256: str
@dataclass(frozen=True, slots=True, kw_only=True)
class EvFail:
failed_at: datetime
code: ErrorCode
msg: str
attempt: int
Event = EvStart | EvAdvance | EvSucceed | EvFail
def start_event(*, started_at: datetime) -> EvStart:
if started_at.tzinfo is None:
raise ValueError("started_at must be timezone-aware")
return EvStart(started_at=started_at)
def advance_event(*, delta_permille: int) -> EvAdvance:
if delta_permille < 0:
raise ValueError("delta_permille must be ≥0 — progress is monotonic")
return EvAdvance(delta_permille=delta_permille)
def succeed_event(*, completed_at: datetime, artifact_id: str, dim: int, sha256: str) -> EvSucceed:
if completed_at.tzinfo is None:
raise ValueError("completed_at must be timezone-aware")
return EvSucceed(completed_at=completed_at, artifact_id=artifact_id, dim=dim, sha256=sha256)
def fail_event(*, failed_at: datetime, code: ErrorCode, msg: str, attempt: int) -> EvFail:
if failed_at.tzinfo is None:
raise ValueError("failed_at must be timezone-aware")
if attempt < 1:
raise ValueError("attempt must be ≥1")
return EvFail(failed_at=failed_at, code=code, msg=msg, attempt=attempt)
# Processing state variants + smart constructors
@dataclass(frozen=True, slots=True, kw_only=True)
class Pending:
kind: Literal["pending"] = "pending"
queued_at: datetime
version: Literal[1] = 1
@dataclass(frozen=True, slots=True, kw_only=True)
class Running:
kind: Literal["running"] = "running"
started_at: datetime
progress_permille: int # 0–1000
version: Literal[1] = 1
@dataclass(frozen=True, slots=True, kw_only=True)
class Done:
kind: Literal["done"] = "done"
completed_at: datetime
artifact_id: str
dim: int
sha256: str
version: Literal[1] = 1
@dataclass(frozen=True, slots=True, kw_only=True)
class Failed:
kind: Literal["failed"] = "failed"
failed_at: datetime
code: ErrorCode
msg: str
attempt: int
version: Literal[1] = 1
ProcessingState = Pending | Running | Done | Failed
from funcpipe_rag.result.types import Result, Ok, Err
def pending(*, queued_at: datetime) -> Pending:
if queued_at.tzinfo is None:
raise ValueError("queued_at must be timezone-aware")
return Pending(queued_at=queued_at)
def running(*, started_at: datetime, progress_permille: int) -> Running:
if started_at.tzinfo is None:
raise ValueError("started_at must be timezone-aware")
if not 0 <= progress_permille <= 1000:
raise ValueError("progress_permille must be 0–1000")
return Running(started_at=started_at, progress_permille=progress_permille)
def done(*, completed_at: datetime, artifact_id: str, dim: int, sha256: str) -> Done:
if completed_at.tzinfo is None:
raise ValueError("completed_at must be timezone-aware")
if dim <= 0 or len(sha256) != 64 or not all(c in "0123456789abcdefABCDEF" for c in sha256):
raise ValueError("invalid Done payload")
return Done(completed_at=completed_at, artifact_id=artifact_id, dim=dim, sha256=sha256)
def failed(*, failed_at: datetime, code: ErrorCode, msg: str, attempt: int) -> Failed:
if failed_at.tzinfo is None:
raise ValueError("failed_at must be timezone-aware")
if attempt < 1:
raise ValueError("attempt must be >= 1")
return Failed(failed_at=failed_at, code=code, msg=msg, attempt=attempt)
4. Reference Implementations (continued)¶
4.1 Pure State Transition (match-based, exhaustive)¶
def transition(state: ProcessingState, event: Event) -> ProcessingState:
match state, event:
case Pending(), EvStart(started_at=s):
if s < state.queued_at:
raise ValueError("started_at cannot be earlier than queued_at")
return running(started_at=s, progress_permille=0)
case Running(), EvAdvance(delta_permille=d):
new_p = min(1000, state.progress_permille + d)
return running(started_at=state.started_at, progress_permille=new_p)
case Running(), EvSucceed(completed_at=c, artifact_id=a, dim=d, sha256=h):
if c < state.started_at:
raise ValueError("completed_at cannot be earlier than started_at")
return done(completed_at=c, artifact_id=a, dim=d, sha256=h)
case Running(), EvFail(failed_at=f, code=c, msg=m, attempt=a):
if f < state.started_at:
raise ValueError("failed_at cannot be earlier than started_at")
return failed(failed_at=f, code=c, msg=m, attempt=a)
case (Done() | Failed()), _:
return state
raise ValueError(f"invalid transition {state.kind} ← {event.__class__.__name__}")
4.2 Exhaustive Handling Example¶
def describe(state: ProcessingState) -> str:
match state:
case Pending(queued_at=q):
return f"Pending – queued at {q.isoformat()}"
case Running(started_at=s, progress_permille=p):
return f"Running – {p/10:.1f}% (started {s.isoformat()})"
case Done(completed_at=c, artifact_id=a, dim=d):
return f"Done – artifact {a} ({d} dims) at {c.isoformat()}"
case Failed(failed_at=f, code=c, msg=m, attempt=a):
return f"Failed – {c.value} (attempt {a}): {m}"
assert_never(state)
4.3 JSON Round-Trip (stable, versioned)¶
def processing_state_to_dict(s: ProcessingState) -> dict[str, JSON]:
base: dict[str, JSON] = {"kind": s.kind, "version": s.version}
match s:
case Pending(queued_at=q):
return base | {"queued_at": q.isoformat()}
case Running(started_at=s, progress_permille=p):
return base | {"started_at": s.isoformat(), "progress_permille": p}
case Done(completed_at=c, artifact_id=a, dim=d, sha256=h):
return base | {"completed_at": c.isoformat(), "artifact_id": a, "dim": d, "sha256": h}
case Failed(failed_at=f, code=c, msg=m, attempt=a):
return base | {"failed_at": f.isoformat(), "code": c.value, "msg": m, "attempt": a}
assert_never(s)
def processing_state_from_dict(d: Mapping[str, JSON]) -> ProcessingState:
if d.get("version") != 1:
raise ValueError("unsupported version")
kind = d["kind"]
match kind:
case "pending":
return pending(queued_at=datetime.fromisoformat(d["queued_at"])) # type: ignore
case "running":
return running(
started_at=datetime.fromisoformat(d["started_at"]),
progress_permille=int(d["progress_permille"]),
)
case "done":
return done(
completed_at=datetime.fromisoformat(d["completed_at"]),
artifact_id=str(d["artifact_id"]),
dim=int(d["dim"]),
sha256=str(d["sha256"]),
)
case "failed":
return failed(
failed_at=datetime.fromisoformat(d["failed_at"]),
code=ErrorCode(d["code"]), # type: ignore
msg=str(d["msg"]),
attempt=int(d["attempt"]),
)
raise ValueError(f"unknown kind {kind}")
4.4 Pipeline Integration (state machine is single source of truth)¶
def process_chunk(chunk: Chunk) -> Result[Done, Failed]:
state: ProcessingState = pending(queued_at=datetime.now(UTC))
try:
now = datetime.now(UTC)
state = transition(state, start_event(started_at=now))
# ... actual embedding work ...
completed_at = datetime.now(UTC)
state = transition(
state,
succeed_event(
completed_at=completed_at,
artifact_id="emb-123",
dim=1536,
sha256="0" * 64,
),
)
assert isinstance(state, Done)
return ok(state)
except Exception as exc:
state = transition(
state,
fail_event(
failed_at=datetime.now(UTC),
code=ErrorCode.EMBED_FAIL,
msg=str(exc),
attempt=1,
),
)
assert isinstance(state, Failed)
return err(state)
5. Property-Based Proofs (capstone/tests/test_module_05_c02.py)¶
import dataclasses
import pytest
from hypothesis import given, strategies as st
from datetime import datetime, timezone
UTC = timezone.utc
# Valid aware datetimes only
aware_dt = st.datetimes(timezones=st.just(UTC))
@given(queued_at=aware_dt)
def test_pending_naive_rejected(queued_at):
naive = queued_at.replace(tzinfo=None)
with pytest.raises(ValueError):
pending(queued_at=naive)
@given(delta=st.integers(max_value=-1))
def test_advance_negative_rejected(delta):
with pytest.raises(ValueError):
advance_event(delta_permille=delta)
@given(state=running_st, delta=st.integers(min_value=0, max_value=2000))
def test_progress_monotonic_and_clamped(state, delta):
ev = advance_event(delta_permille=delta)
new_state = transition(state, ev)
assert new_state.progress_permille >= state.progress_permille
assert new_state.progress_permille <= 1000
@given(state=st.one_of(done_st, failed_st))
def test_terminal_idempotent(state):
# any event leaves terminal state unchanged
for ev in [
start_event(started_at=datetime.now(UTC)),
advance_event(delta_permille=100),
succeed_event(completed_at=datetime.now(UTC), artifact_id="x", dim=1, sha256="0"*64),
fail_event(failed_at=datetime.now(UTC), code=ErrorCode.INTERNAL, msg="", attempt=1),
]:
assert transition(state, ev) == state
6. Big-O & Allocation Guarantees¶
| Operation | Time | Heap | Notes |
|---|---|---|---|
| State / event creation | O(1) | O(1) | slots=True → no dict |
| Transition / match | O(1) | O(1) | Fixed number of variants |
| JSON round-trip | O(1) | O(1) | Fixed fields (O(L) in string length) |
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| String/enum status flags | Invalid states possible | Tagged union with payloads |
| Mutable state objects | Corruption, races | frozen=True + pure transition functions |
| Negative progress | Confusing UI, bugs | delta_permille ≥0 enforced at event creation |
| Naive datetimes | TypeError on comparison | Smart constructors require tz-aware |
8. Pre-Core Quiz¶
- Simple flags → Enum
- States with data → Tagged union of dataclasses
- Pure transitions → No mutation, return new state
- assert_never → Proves exhaustiveness
- Progress rule → Monotonic non-decreasing, clamped 0–1000
9. Post-Core Exercise¶
- Model your current chunk processing state as
ProcessingState→ implement transition and test invalid paths. - Add a new event (e.g.
EvPause) and new statePaused→ watch mypy break every transition site. - Make a pipeline step return
Result[Done, Failed]wrapping the terminal state produced by the state machine.
Continue with: Functors
You now model every domain state as pure, immutable, exhaustively-handled ADTs — illegal states are unrepresentable, invalid transitions impossible, and the type checker enforces totality. The rest of Module 5 composes these ADTs into functors, applicatives, and monoids for bulletproof pipelines.