Serialization Beyond Pydantic¶
Page Maps¶
graph LR
family["Python Programming"]
program["Python Functional Programming"]
section["Algebraic Data Modelling Validation"]
page["Serialization Beyond Pydantic"]
capstone["Capstone evidence"]
family --> program --> section --> page
page -.applies in.-> capstone
flowchart LR
orient["Orient on the page map"] --> read["Read the main claim and examples"]
read --> inspect["Inspect the related code, proof, or capstone surface"]
inspect --> verify["Run or review the verification path"]
verify --> apply["Apply the idea back to the module and capstone"]
Persistence is a modeling problem, not just a library feature. Once a value crosses time or process boundaries, “just dump the dataclass” is no longer a harmless shortcut. It becomes a schema contract whether the team admits it or not.
Start With the Drift Problem¶
Schema drift usually appears long after the first dump/load helper looked convenient. The lesson needs to pull that future failure into view immediately.
- If the serialized form has no explicit tag or version, change will become guesswork later.
- If a persisted sum type has no stable discriminator, decoding logic will eventually become ambiguous.
- If you cannot explain how old data becomes new data, the serialization story is still incomplete.
Core question
How do you define stable, versioned, round-trippable serialization contracts for your core ADTs — using only plain dataclasses and lightweight codecs — so that persistence never silently breaks when you evolve the schema?
This lesson introduces serialization as an explicit contract around domain values:
- wrap persisted values in a stable envelope
- pair each persisted ADT with explicit encoders and decoders
- make versioning and migration rules reviewable instead of accidental
The motivating persistence failure matters because it is the most common reason teams discover too late that serialization was already part of the model.
The naïve pattern everyone writes first:
# BEFORE – unstable, unversioned, fragile
serialized = json.dumps(asdict(chunk)) # field order changes, no tag for sums, no version
chunk = Chunk(**json.loads(serialized)) # missing fields → None, wrong types → crash later
This is the drift problem to recognize before you ship it.
The production pattern turns persistence into an intentional format with explicit structure and migration behavior.
# AFTER – stable, versioned, migratable
enc = enc_chunk() # factory → Encoder[Chunk]
serialized = to_json(core_chunk, enc) # {"tag":"chunk","ver":1,"payload":{...}}
core_chunk = from_json(serialized, dec_chunk()) # migrates automatically
That explicitness is the whole point: changes stop being silent and start becoming governed.
Use this when “just add a field” persistence changes have burned you and you want durable contracts instead of lucky dumps.
Outcome 1. Every core ADT gains explicit encoder/decoder factories + Envelope. 2. All serialization proven round-trippable + migratable. 3. Zero-dependency, near-linear, streaming serde forever.
Tiny Non-Domain Example – Versioned Option[T]¶
# v1
opt_v1 = Some(value=42)
serialized_v1 = to_json(opt_v1, enc_option())
# {"tag":"option","ver":1,"payload":{"kind":"some","value":42}}
# v2 decoder with added metadata field (implemented as the current dec_option)
opt_v2 = from_json(serialized_v1, dec_option())
# Some(value=42, metadata={})
Adding a field never breaks old data.
Why Explicit Serialization Contracts? (Three bullets every engineer should internalise)¶
- Stability: Tagged envelope + explicit payload → field order, missing fields, and sum variants never surprise.
- Versioning + migration:
verfield +MIGRATORS→ schema evolution is explicit and automated. - Zero magic: Plain functions, no reflection → predictable, fast, dependency-free.
Pydantic is for edges only (C06). Core persistence uses explicit contracts.
1. Laws & Invariants (machine-checked)¶
| Invariant | Description | Enforcement |
|---|---|---|
| Round-Trip | from_x(to_x(x)) == x |
Hypothesis on all ADTs |
| Migration Round-Trip | Old data → new decoder yields correct migrated value | Property tests with old payloads |
| Envelope Shape | Every serialized value has tag/ver/payload | _check_env guard |
| Non-empty VFailure | Validation failures always have ≥1 error | Decoder enforcement |
2. Decision Table – When to Use What Serde¶
| Need | Validation? | Binary? | Versioning? | Recommended |
|---|---|---|---|---|
| Simple JSON, no validation | No | No | Yes | Plain envelope + json |
| Schema + validation | Yes | No | Yes | Marshmallow OneOfSchema |
| Compact / fast | No | Yes | Yes | MessagePack + envelope |
| Streaming large datasets | No | Yes/No | Yes | iter_ndjson / iter_msgpack |
3. Public API (boundaries/serde.py – mypy --strict clean)¶
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Callable, Mapping, Protocol, TypeVar, Tuple
import json
import msgpack
from typing_extensions import assert_never
from funcpipe_rag.fp.core import Option, Some, NoneVal, Result, Ok, Err, Validation, VSuccess, VFailure, ErrInfo
__all__ = [
"Envelope",
"Encoder", "Decoder",
"enc_option", "dec_option",
"enc_result", "dec_result",
"enc_validation", "dec_validation",
"to_json", "from_json",
"to_msgpack", "from_msgpack",
"from_json_safe",
"MIGRATORS", "migrate",
"iter_ndjson", "iter_msgpack",
]
T = TypeVar("T")
E = TypeVar("E")
JSON = str | int | float | bool | None | list["JSON"] | dict[str, "JSON"]
@dataclass(frozen=True, slots=True)
class Envelope:
tag: str # e.g. "option", "result", "chunk"
ver: int
payload: dict[str, JSON]
class Encoder(Protocol[T]):
def __call__(self, x: T) -> Envelope: ...
class Decoder(Protocol[T]):
def __call__(self, env: Envelope) -> T: ...
# Default JSON value codecs (identity for JSON-serializable values)
def json_encoder(x: JSON) -> JSON:
return x
def json_decoder(j: JSON) -> JSON:
return j
# Option factories
def enc_option(enc_val: Callable[[T], JSON] | None = None) -> Encoder[Option[T]]:
ev = enc_val or json_encoder # type: ignore[arg-type] # assumes T is JSON-compatible if no codec supplied
def _enc(x: Option[T]) -> Envelope:
match x:
case Some(value=v):
return Envelope(tag="option", ver=1, payload={"kind": "some", "value": ev(v)})
case NoneVal():
return Envelope(tag="option", ver=1, payload={"kind": "none"})
case other:
assert_never(other)
return _enc
def dec_option(dec_val: Callable[[JSON], T] | None = None) -> Decoder[Option[T]]:
dv = dec_val or json_decoder # type: ignore[arg-type]
def _dec(env: Envelope) -> Option[T]:
if env.tag != "option":
raise ValueError(f"expected tag 'option', got {env.tag}")
if env.ver != 1:
raise ValueError(f"unknown version {env.ver}")
kind = env.payload.get("kind")
if kind == "some":
return Some(dv(env.payload["value"]))
if kind == "none":
return NoneVal()
raise ValueError(f"invalid kind {kind!r}")
return _dec
# Result factories (explicit ErrInfo codecs required – no default assumption)
def enc_result(enc_val: Callable[[T], JSON] | None = None,
enc_err: Callable[[ErrInfo], JSON] | None = None) -> Encoder[Result[T, ErrInfo]]:
ev = enc_val or json_encoder # type: ignore[arg-type]
ee = enc_err or (lambda e: {"code": e.code, "msg": e.msg}) # safe default for ErrInfo
def _enc(x: Result[T, ErrInfo]) -> Envelope:
match x:
case Ok(value=v):
return Envelope(tag="result", ver=1, payload={"kind": "ok", "value": ev(v)})
case Err(error=e):
return Envelope(tag="result", ver=1, payload={"kind": "err", "error": ee(e)})
case other:
assert_never(other)
return _enc
def dec_result(dec_val: Callable[[JSON], T] | None = None,
dec_err: Callable[[JSON], ErrInfo] | None = None) -> Decoder[Result[T, ErrInfo]]:
dv = dec_val or json_decoder # type: ignore[arg-type]
de = dec_err or (lambda j: ErrInfo(code=j["code"], msg=j["msg"]))
def _dec(env: Envelope) -> Result[T, ErrInfo]:
if env.tag != "result":
raise ValueError(f"expected tag 'result', got {env.tag}")
if env.ver != 1:
raise ValueError(f"unknown version {env.ver}")
kind = env.payload.get("kind")
if kind == "ok":
return Ok(dv(env.payload["value"]))
if kind == "err":
return Err(de(env.payload["error"]))
raise ValueError(f"invalid kind {kind!r}")
return _dec
# Validation factories (similar pattern)
def enc_validation(enc_val: Callable[[T], JSON] | None = None,
enc_err: Callable[[ErrInfo], JSON] | None = None) -> Encoder[Validation[T, ErrInfo]]:
ev = enc_val or json_encoder
ee = enc_err or (lambda e: {"code": e.code, "msg": e.msg})
def _enc(x: Validation[T, ErrInfo]) -> Envelope:
match x:
case VSuccess(value=v):
return Envelope(tag="validation", ver=1, payload={"kind": "v_success", "value": ev(v)})
case VFailure(errors=es):
return Envelope(tag="validation", ver=1, payload={"kind": "v_failure", "errors": [ee(e) for e in es]})
case other:
assert_never(other)
return _enc
def dec_validation(dec_val: Callable[[JSON], T] | None = None,
dec_err: Callable[[JSON], ErrInfo] | None = None) -> Decoder[Validation[T, ErrInfo]]:
dv = dec_val or json_decoder
de = dec_err or (lambda j: ErrInfo(code=j["code"], msg=j["msg"]))
def _dec(env: Envelope) -> Validation[T, ErrInfo]:
if env.tag != "validation":
raise ValueError(f"expected tag 'validation', got {env.tag}")
if env.ver != 1:
raise ValueError(f"unknown version {env.ver}")
kind = env.payload.get("kind")
if kind == "v_success":
return VSuccess(dv(env.payload["value"]))
if kind == "v_failure":
errs = [de(e) for e in env.payload["errors"]]
if not errs:
raise ValueError("VFailure requires non-empty errors")
return VFailure(errors=tuple(errs))
raise ValueError(f"invalid kind {kind!r}")
return _dec
# JSON / MessagePack
_JSON_KW = dict(ensure_ascii=False, allow_nan=False, separators=(",", ":"))
_MP_PACK = dict(use_bin_type=True)
_MP_UNPACK = dict(raw=False)
def _check_env(obj: Mapping[str, Any]) -> None:
if not isinstance(obj, dict):
raise ValueError("invalid envelope: not a dict")
required = {"tag", "ver", "payload"}
missing = required - set(obj)
if missing:
raise ValueError(f"invalid envelope: missing {missing}")
if not isinstance(obj["tag"], str):
raise ValueError("tag must be str")
if not isinstance(obj["ver"], int):
raise ValueError("ver must be int")
if not isinstance(obj["payload"], dict):
raise ValueError("payload must be dict")
def to_json(x: T, enc: Encoder[T]) -> str:
env = enc(x)
return json.dumps({"tag": env.tag, "ver": env.ver, "payload": env.payload}, **_JSON_KW)
def from_json(s: str, dec: Decoder[T]) -> T:
obj = json.loads(s)
_check_env(obj)
env = Envelope(tag=obj["tag"], ver=obj["ver"], payload=obj["payload"])
return dec(migrate(env))
def to_msgpack(x: T, enc: Encoder[T]) -> bytes:
env = enc(x)
return msgpack.packb({"tag": env.tag, "ver": env.ver, "payload": env.payload}, **_MP_PACK)
def from_msgpack(b: bytes, dec: Decoder[T]) -> T:
obj = msgpack.unpackb(b, **_MP_UNPACK)
_check_env(obj)
env = Envelope(tag=obj["tag"], ver=obj["ver"], payload=obj["payload"])
return dec(migrate(env))
# Safe decode + versioning
@dataclass(frozen=True, slots=True)
class DecodeErr:
path: Tuple[str, ...] = ()
msg: str = ""
def from_json_safe(s: str, dec: Decoder[T]) -> Validation[T, DecodeErr]:
try:
return VSuccess(from_json(s, dec))
except Exception as exc:
return VFailure((DecodeErr(msg=str(exc)),))
MIGRATORS: dict[tuple[str, int], Callable[[Envelope], Envelope]] = {}
MAX_MIGRATION_STEPS = 32
def migrate(env: Envelope) -> Envelope:
key = (env.tag, env.ver)
steps = 0
seen: set[tuple[str, int]] = set()
while key in MIGRATORS:
if key in seen:
raise RuntimeError(f"migration cycle detected at {key}")
seen.add(key)
steps += 1
if steps > MAX_MIGRATION_STEPS:
raise RuntimeError("migration step limit exceeded")
env = MIGRATORS[key](env)
key = (env.tag, env.ver)
return env
# Streaming
def iter_ndjson(fp, dec: Decoder[T]):
for line in fp:
line = line.strip()
if line:
yield from_json(line, dec)
def iter_msgpack(fp, dec: Decoder[T]):
unpacker = msgpack.Unpacker(fp, **_MP_UNPACK)
for obj in unpacker:
_check_env(obj)
yield dec(migrate(Envelope(obj["tag"], obj["ver"], obj["payload"])))
4. Reference Implementations (continued)¶
4.1 Before vs After – Chunk Persistence¶
# BEFORE – unstable dict → JSON
serialized = json.dumps(asdict(core_chunk)) # field order changes, no version
core_chunk = Chunk(**json.loads(serialized)) # missing fields → default, crash later
# AFTER – stable envelope
enc = enc_chunk() # factory
serialized = to_json(core_chunk, enc) # {"tag":"chunk","ver":1,"payload":{...}}
core_chunk = from_json(serialized, dec_chunk()) # migrates if needed, raises clear error if invalid
4.2 RAG Integration – Persistent Chunk Store¶
# Write stream
with open("chunks.ndjson", "w") as f:
enc = enc_chunk()
for core_chunk in pipeline_chunks:
f.write(to_json(core_chunk, enc) + "\n")
# Read stream with migration
with open("chunks.ndjson") as f:
dec = dec_chunk()
for core_chunk in iter_ndjson(f, dec):
yield core_chunk
4.3 Versioning & Migration Example (Chunk v1 → v2)¶
# Suppose v1 Chunk had no metadata field
def migrate_chunk_v1_to_v2(env: Envelope) -> Envelope:
if env.ver != 1 or env.tag != "chunk":
return env
payload = env.payload.copy()
payload.setdefault("metadata", {})
return Envelope(tag="chunk", ver=2, payload=payload)
MIGRATORS[("chunk", 1)] = migrate_chunk_v1_to_v2
5. Property-Based Proofs (capstone/tests/test_serde.py)¶
from hypothesis import given, strategies as st
msgpack_int = st.integers(min_value=-(2**63), max_value=2**64 - 1)
@given(opt=st.one_of(st.builds(Some, value=st.integers()), st.just(NoneVal())))
def test_option_json_roundtrip(opt):
enc = enc_option()
dec = dec_option()
s = to_json(opt, enc)
back = from_json(s, dec)
assert back == opt
@given(res=st.one_of(st.builds(Ok, value=msgpack_int),
st.builds(Err, error=st.builds(ErrInfo, code=st.text(), msg=st.text()))))
def test_result_msgpack_roundtrip(res):
enc = enc_result()
dec = dec_result()
b = to_msgpack(res, enc)
back = from_msgpack(b, dec)
assert back == res
# Migration example test
def test_chunk_v1_migration():
v1_payload = {"text": "hello", "embedding": [0.1, 0.2]}
env_v1 = Envelope(tag="chunk", ver=1, payload=v1_payload)
env_v2 = migrate(env_v1)
assert env_v2.ver == 2
assert env_v2.payload["metadata"] == {}
6. Big-O & Allocation Guarantees¶
| Operation | Time | Heap | Notes |
|---|---|---|---|
| encode/decode | O(#fields) | O(#fields) | Single allocation per envelope |
| JSON/MessagePack | O(N) | O(N) | Linear in payload size |
| Streaming iter | O(1) per item | O(1) per item | Zero-copy when possible |
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Raw asdict → json | Unstable order, no tags | Explicit Envelope + enc/dec |
| No version field | Schema drift breaks old data | ver + MIGRATORS |
| Ad-hoc JSON handling | Silent corruption | _check_env + safe decode |
| Pydantic for core persistence | Heavy deps, slow | Plain codecs for core |
| No migration path | Production data loss | migrate + versioned envelope |
8. Pre-Core Quiz¶
- Envelope contains…? → tag, ver, payload
- Round-trip law → from(to(x)) == x
- For versioning? → MIGRATORS dict
- Streaming with…? → iter_ndjson / iter_msgpack
- Beyond Pydantic for…? → Core persistence
9. Post-Core Exercise¶
- Implement
Encoder/Decoderfor one core ADT → test round-trip. - Add a v2 migrator for an existing type → test old payload loads.
- Replace one
json.dumps(asdict(...))with envelope serde. - Add streaming persistence for a pipeline stage.
Continue with: Compositional Domain Models
You now serialize every core ADT with stable, versioned, migratable contracts — schema evolution is explicit and safe forever. The rest of Module 5 adds compositional domain models and performance guidance for heavy ADTs.