async/await as Descriptions¶
Concept Position¶
flowchart TD
family["Python Programming"] --> program["Python Functional Programming"]
program --> module["Module 08: Async Pipelines, Backpressure, and Fairness"]
module --> concept["async/await as Descriptions"]
concept --> capstone["Capstone pressure point"]
flowchart TD
problem["Start with the design or failure question"] --> example["Study the worked example and trade-offs"]
example --> boundary["Name the boundary this page is trying to protect"]
boundary --> proof["Carry that question into code review or the capstone"]
Read the first diagram as a placement map: this page is one concept inside its parent module, not a detached essay, and the capstone is the pressure test for whether the idea holds. Read the second diagram as the working rhythm for the page: name the problem, study the example, identify the boundary, then carry one review question forward.
Module 08 – Main Track Core
Main track: Cores 1–10 (Async / Concurrent Pipelines → Production).
This is a required core. Every production FuncPipe async system treatsasync defas a pure description of steps — never hidden magic.
Progression Note¶
Module 8 is Async FuncPipe & Backpressure — the lightweight, production-grade concurrency layer that sits directly on top of Module 7’s effect boundaries.
| Module | Focus | Key Outcomes |
|---|---|---|
| 7 | Effect Boundaries & Resource Safety | Ports & adapters, capability protocols, idempotent effects, explicit sessions |
| 8 | Async FuncPipe & Backpressure | Async FuncPipe, non-blocking pipelines, backpressure, basic fairness |
| 9 | FP Across Libraries and Frameworks | FuncPipe style in Pandas/Polars/Dask, FastAPI, CLI, distributed systems |
| 10 | Refactoring, Performance, Future-Proofing | Systematic refactors, performance budgets, governance and long-term evolution |
Why this module matters in the course¶
This is where many Python courses accidentally turn concurrency into mysticism. The goal here is the opposite: treat async behavior as explicit description plus explicit driving boundaries, so the reader can still reason about the system locally.
Module 08 matters because it is the place where production pressure is highest: retries, timeouts, queues, fairness, and coordination. If the mental model stays weak here, the course stops being practical.
Questions this module should answer¶
By the end of the module, you should be able to answer:
- What is an async plan describing, and when does actual execution begin?
- Which async concerns belong in the pure description and which belong in the shell?
- How do backpressure and bounded concurrency preserve system behavior instead of merely tuning speed?
- Why is explicit async structure more testable than ad-hoc
awaitchains spread across the codebase?
Those answers are what prevent "async functional programming" from becoming hand-wavy branding.
What to inspect in the capstone¶
Keep the FuncPipe capstone open while reading this module and inspect:
- async plan and stream helpers under the async effects packages
- tests that prove replayability, law-like behavior, and deterministic execution
- the shell boundaries that actually drive async plans
- places where retry, timeout, and queue policies are explicit data
The capstone should make one point visible here: concurrency only stays reviewable when its control flow is explicit.
Core question
How do you model asynchronous operations as pure, composable descriptions using coroutines — so that concurrency is explicit data, testable, and never hidden magic?
Symmetry with Module 7
What you now have after M07 + this core
- Pure domain core
- All effects behind ports
- Effectful operations described as pure data (IOPlan for sync, AsyncPlan for async)
- Typed capability protocols
- Reliable resource cleanup
- Pure logging via Writer
- Idempotent effects
- Explicit sessions & transactions
- Async operations as pure descriptions – async def returns a coroutine object (a value), not execution. await inside coroutine bodies is required and allowed. Driving the plan (perform_async, asyncio.run, create_task, top-level await) happens only in shells.
What the rest of Module 8 adds
- Async generators & streaming (M08C02)
- Bounded concurrency, backpressure, fairness (M08C03–C05)
- Retry & timeout policies as data
- Deterministic async testing
- Full async RAG pipeline
You are now one step away from production-grade async/concurrency.
1. Algebraic Laws & Architectural Invariants (machine-checked where possible)¶
| Category | Name | Description | Enforcement |
|---|---|---|---|
| Algebraic Laws | Left Identity | async_bind(async_pure(x), f) == f(x) |
Hypothesis |
| Algebraic Laws | Right Identity | async_bind(plan, async_pure) == plan |
Hypothesis |
| Algebraic Laws | Associativity | async_bind(async_bind(m, f), g) == async_bind(m, lambda x: async_bind(f(x), g)) |
Hypothesis |
| Architectural Invariants | Description Purity | Constructing an AsyncPlan performs zero side effects. Effects happen only when a plan is driven in the shell. |
Mock tests + static analysis |
| Architectural Invariants | Replayability | The same AsyncPlan value can be called multiple times → each call produces a fresh coroutine. |
Property tests |
| Architectural Invariants | Interpreter Isolation | Core may await inside coroutine bodies, but must never drive a plan (perform_async, asyncio.run, create_task, top-level await). Only shells drive plans. |
mypy --strict + grep-CI + review |
The algebraic laws are true monad laws over Result. The architectural invariants are enforced by tooling and review.
2. Decision Table – Sync vs Async Description¶
| Scenario | Blocking OK? | Needs Concurrency / Non-blocking I/O? | Recommended Pattern |
|---|---|---|---|
| CPU-bound pure logic | Yes | No | Sync functions |
| I/O-bound single step | Yes | No | IOPlan (Module 7) |
| I/O-bound with dependent steps | No | Yes | AsyncPlan + async_bind |
| Independent parallel I/O | No | Yes | async_gather (M08C03) |
| Streaming over network / DB | No | Yes | Async generator + backpressure (M08C02) |
Rule: Use AsyncPlan exactly when you need non-blocking, dependent I/O steps. Never drive a plan in core.
3. Public API – AsyncPlan (capstone/src/funcpipe_rag/domain/effects/async_/plan.py – mypy --strict clean)¶
# funcpipe_rag/domain/effects/async_/plan.py
from __future__ import annotations
from collections.abc import Awaitable, Callable
from typing import TypeAlias, TypeVar
from funcpipe_rag import Result, Ok, Err, ErrInfo, assert_never
A = TypeVar("A")
B = TypeVar("B")
# AsyncPlan[T] = a pure, replayable description of async work that eventually yields Result[T, ErrInfo]
AsyncPlan: TypeAlias = Callable[[], Awaitable[Result[A, ErrInfo]]]
def async_pure(value: A) -> AsyncPlan[A]:
async def _coro() -> Result[A, ErrInfo]:
return Ok(value)
return lambda: _coro()
def async_bind(
plan: AsyncPlan[A],
f: Callable[[A], AsyncPlan[B]],
) -> AsyncPlan[B]:
async def _coro() -> Result[B, ErrInfo]:
res = await plan()
match res:
case Ok(value=v):
return await f(v)()
case Err(error=e):
return Err(e)
assert_never(res)
return lambda: _coro()
def async_map(
plan: AsyncPlan[A],
f: Callable[[A], B],
) -> AsyncPlan[B]:
return async_bind(plan, lambda x: async_pure(f(x)))
# Lift a parameterless async capability call into a replayable AsyncPlan
# Precondition: make_coro MUST return a fresh coroutine each call.
# Correct: lambda: capability.method(...)
# Incorrect: coro = capability.method(...); async_lift(lambda: coro)
def async_lift(
make_coro: Callable[[], Awaitable[Result[A, ErrInfo]]],
) -> AsyncPlan[A]:
return make_coro
Shell-only interpreter (capstone/src/funcpipe_rag/infra/adapters/async_runtime.py – never imported into domain)¶
# funcpipe_rag/infra/adapters/async_runtime.py
async def perform_async(plan: AsyncPlan[A]) -> Result[A, ErrInfo]:
"""The one interpreter – runs the async description."""
return await plan()
4. Reference Implementations (pure domain code)¶
4.1 Capability protocols (domain/ports.py)¶
class HttpClientCap(Protocol):
async def get_json(self, url: str) -> Result[dict, ErrInfo]: ...
class DbCap(Protocol):
async def save_user(self, user: UserProfile) -> Result[None, ErrInfo]: ...
4.2 Pure description – dependent async steps (domain/user.py)¶
# Factored version – the canonical, readable style
def _fetch(user_id: str, http: HttpClientCap) -> AsyncPlan[dict]:
return async_lift(lambda: http.get_json(f"https://api.example.com/users/{user_id}"))
def _validate(raw: dict) -> AsyncPlan[UserProfile]:
return async_pure(validate_user(raw))
def _save(user: UserProfile, db: DbCap) -> AsyncPlan[UserProfile]:
return async_bind(
async_lift(lambda: db.save_user(user)),
lambda _: async_pure(user),
)
def async_user_pipeline(
user_id: str,
http: HttpClientCap,
db: DbCap,
) -> AsyncPlan[UserProfile]:
return async_bind(
_fetch(user_id, http),
lambda raw: async_bind(
_validate(raw),
lambda user: _save(user, db),
),
)
4.3 Before → After (the real transformation)¶
# BEFORE – hidden magic, immediate execution, untestable
async def old_process(user_id: str):
raw = await http.get_json(f"/users/{user_id}") # immediate effect
user = validate_user(raw)
await db.save_user(user)
return user
# AFTER – pure description, no execution until shell calls perform_async
def async_user_pipeline(
user_id: str,
http: HttpClientCap,
db: DbCap,
) -> AsyncPlan[UserProfile]:
return async_bind(
_fetch(user_id, http),
lambda raw: async_bind(
_validate(raw),
lambda user: _save(user, db),
),
)
Shell usage (only place driving happens):
plan = async_user_pipeline("123", real_http, real_db)
result = await perform_async(plan) # or asyncio.run(perform_async(plan))
4.4 Real-world RAG example (domain/rag_async.py)¶
from dataclasses import replace
from funcpipe_rag.domain.effects.async_ import async_gather
def _embed_chunk(
chunk: ChunkWithoutEmbedding,
embedder: EmbeddingCap,
) -> AsyncPlan[EmbeddedChunk]:
return async_bind(
async_lift(lambda: embedder.embed(chunk.text.content)),
lambda vector: async_pure(
replace(chunk, embedding=Embedding(vector, embedder.model_name))
),
)
def async_rag_pipeline(
docs: Iterator[RawDoc],
env: RagEnv,
embedder: EmbeddingCap,
) -> AsyncPlan[list[EmbeddedChunk]]:
return async_bind(
async_pure(list(gen_clean_and_chunk(docs, env))),
lambda chunks: async_map(
async_gather( # M08C03 – bounded parallel embedding
[_embed_chunk(c, embedder) for c in chunks],
concurrency=env.embedding_concurrency,
),
structural_dedup_chunks,
),
)
5. Property-Based Proofs (all pass in CI)¶
@given(x=st.integers())
@pytest.mark.asyncio
async def test_async_bind_laws(x: int):
f = lambda y: async_pure(y + 1)
g = lambda z: async_pure(z * 2)
a = async_pure(x)
assert await perform_async(async_bind(async_bind(a, f), g)) == \
await perform_async(async_bind(a, lambda y: async_bind(f(y), g)))
assert await perform_async(async_bind(a, async_pure)) == \
await perform_async(a)
assert await perform_async(async_bind(async_pure(x), f)) == \
await perform_async(f(x))
@pytest.mark.asyncio
async def test_replayability():
plan = async_lift(lambda: http_mock.get_json("/test")) # mock is deterministic
r1 = await perform_async(plan)
r2 = await perform_async(plan) # fresh coroutine each call
assert r1 == r2
6. Big-O & Allocation Guarantees¶
| Operation | Time | Python call-stack | Heap allocation until interpretation |
|---|---|---|---|
| async_pure | O(1) | O(1) | O(1) |
| async_bind | O(1) | O(1) | O(1) closure |
| async_lift | O(1) | O(1) | O(1) |
| perform_async | O(chain) | O(chain) | O(chain) closures |
Composition is O(1) per layer — no work until interpretation.
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
Top-level await / perform_async / create_task in core |
Immediate effects, untestable | Return AsyncPlan, drive only in shell |
Re-using a single coroutine object with async_lift |
Replayability broken | Always pass a lambda that creates a fresh coroutine |
asyncio.create_task in core |
Hidden concurrency, leaks | Use explicit async_gather / bounded queues (later cores) |
Direct httpx.AsyncClient() in core |
Concrete dependency | Inject capability protocol |
8. Pre-Core Quiz¶
async defreturns…? → A coroutine object – a pure descriptionawaitinside coroutine bodies is…? → Required and allowed – it is part of the description- Driving a plan (
perform_async,asyncio.run,create_task) is…? → Only in shells – never in core - Compose async descriptions with…? →
async_bind/async_map/async_lift - Real power comes from…? → Concurrency is explicit data, not magic – exactly like
IOPlan
9. Post-Core Exercise¶
- Convert one real I/O-bound pipeline in your codebase to return an
AsyncPlan. - Chain ≥3 dependent async steps (preferably factored into small private helpers).
- Write a Hypothesis property proving the bind laws for your pipeline.
- Run the same
AsyncPlantwice in a test and assert identical results (for idempotent cases). - Celebrate – you now have async without magic.
Continue with: Async Generators
You now model async operations as pure, composable descriptions — exactly symmetrical to IOPlan. Your core remains pure, your shell controls execution, and concurrency is finally explicit data, not hidden magic.