Skip to content

async/await as Descriptions

Concept Position

flowchart TD
  family["Python Programming"] --> program["Python Functional Programming"]
  program --> module["Module 08: Async Pipelines, Backpressure, and Fairness"]
  module --> concept["async/await as Descriptions"]
  concept --> capstone["Capstone pressure point"]
flowchart TD
  problem["Start with the design or failure question"] --> example["Study the worked example and trade-offs"]
  example --> boundary["Name the boundary this page is trying to protect"]
  boundary --> proof["Carry that question into code review or the capstone"]

Read the first diagram as a placement map: this page is one concept inside its parent module, not a detached essay, and the capstone is the pressure test for whether the idea holds. Read the second diagram as the working rhythm for the page: name the problem, study the example, identify the boundary, then carry one review question forward.

Module 08 – Main Track Core

Main track: Cores 1–10 (Async / Concurrent Pipelines → Production).
This is a required core. Every production FuncPipe async system treats async def as a pure description of steps — never hidden magic.

Progression Note

Module 8 is Async FuncPipe & Backpressure — the lightweight, production-grade concurrency layer that sits directly on top of Module 7’s effect boundaries.

Module Focus Key Outcomes
7 Effect Boundaries & Resource Safety Ports & adapters, capability protocols, idempotent effects, explicit sessions
8 Async FuncPipe & Backpressure Async FuncPipe, non-blocking pipelines, backpressure, basic fairness
9 FP Across Libraries and Frameworks FuncPipe style in Pandas/Polars/Dask, FastAPI, CLI, distributed systems
10 Refactoring, Performance, Future-Proofing Systematic refactors, performance budgets, governance and long-term evolution

Why this module matters in the course

This is where many Python courses accidentally turn concurrency into mysticism. The goal here is the opposite: treat async behavior as explicit description plus explicit driving boundaries, so the reader can still reason about the system locally.

Module 08 matters because it is the place where production pressure is highest: retries, timeouts, queues, fairness, and coordination. If the mental model stays weak here, the course stops being practical.

Questions this module should answer

By the end of the module, you should be able to answer:

  • What is an async plan describing, and when does actual execution begin?
  • Which async concerns belong in the pure description and which belong in the shell?
  • How do backpressure and bounded concurrency preserve system behavior instead of merely tuning speed?
  • Why is explicit async structure more testable than ad-hoc await chains spread across the codebase?

Those answers are what prevent "async functional programming" from becoming hand-wavy branding.

What to inspect in the capstone

Keep the FuncPipe capstone open while reading this module and inspect:

  • async plan and stream helpers under the async effects packages
  • tests that prove replayability, law-like behavior, and deterministic execution
  • the shell boundaries that actually drive async plans
  • places where retry, timeout, and queue policies are explicit data

The capstone should make one point visible here: concurrency only stays reviewable when its control flow is explicit.

Core question
How do you model asynchronous operations as pure, composable descriptions using coroutines — so that concurrency is explicit data, testable, and never hidden magic?

Symmetry with Module 7

IOPlan[T]    = () -> Result[T, ErrInfo]
AsyncPlan[T] = () -> Awaitable[Result[T, ErrInfo]]
AsyncPlan is exactly the same idea as IOPlan, only non-blocking.

What you now have after M07 + this core - Pure domain core
- All effects behind ports
- Effectful operations described as pure data (IOPlan for sync, AsyncPlan for async)
- Typed capability protocols
- Reliable resource cleanup
- Pure logging via Writer
- Idempotent effects
- Explicit sessions & transactions
- Async operations as pure descriptionsasync def returns a coroutine object (a value), not execution. await inside coroutine bodies is required and allowed. Driving the plan (perform_async, asyncio.run, create_task, top-level await) happens only in shells.

What the rest of Module 8 adds - Async generators & streaming (M08C02)
- Bounded concurrency, backpressure, fairness (M08C03–C05)
- Retry & timeout policies as data
- Deterministic async testing
- Full async RAG pipeline

You are now one step away from production-grade async/concurrency.

1. Algebraic Laws & Architectural Invariants (machine-checked where possible)

Category Name Description Enforcement
Algebraic Laws Left Identity async_bind(async_pure(x), f) == f(x) Hypothesis
Algebraic Laws Right Identity async_bind(plan, async_pure) == plan Hypothesis
Algebraic Laws Associativity async_bind(async_bind(m, f), g) == async_bind(m, lambda x: async_bind(f(x), g)) Hypothesis
Architectural Invariants Description Purity Constructing an AsyncPlan performs zero side effects. Effects happen only when a plan is driven in the shell. Mock tests + static analysis
Architectural Invariants Replayability The same AsyncPlan value can be called multiple times → each call produces a fresh coroutine. Property tests
Architectural Invariants Interpreter Isolation Core may await inside coroutine bodies, but must never drive a plan (perform_async, asyncio.run, create_task, top-level await). Only shells drive plans. mypy --strict + grep-CI + review

The algebraic laws are true monad laws over Result. The architectural invariants are enforced by tooling and review.

2. Decision Table – Sync vs Async Description

Scenario Blocking OK? Needs Concurrency / Non-blocking I/O? Recommended Pattern
CPU-bound pure logic Yes No Sync functions
I/O-bound single step Yes No IOPlan (Module 7)
I/O-bound with dependent steps No Yes AsyncPlan + async_bind
Independent parallel I/O No Yes async_gather (M08C03)
Streaming over network / DB No Yes Async generator + backpressure (M08C02)

Rule: Use AsyncPlan exactly when you need non-blocking, dependent I/O steps. Never drive a plan in core.

3. Public API – AsyncPlan (capstone/src/funcpipe_rag/domain/effects/async_/plan.py – mypy --strict clean)

# funcpipe_rag/domain/effects/async_/plan.py
from __future__ import annotations

from collections.abc import Awaitable, Callable
from typing import TypeAlias, TypeVar

from funcpipe_rag import Result, Ok, Err, ErrInfo, assert_never

A = TypeVar("A")
B = TypeVar("B")

# AsyncPlan[T] = a pure, replayable description of async work that eventually yields Result[T, ErrInfo]
AsyncPlan: TypeAlias = Callable[[], Awaitable[Result[A, ErrInfo]]]

def async_pure(value: A) -> AsyncPlan[A]:
    async def _coro() -> Result[A, ErrInfo]:
        return Ok(value)
    return lambda: _coro()

def async_bind(
    plan: AsyncPlan[A],
    f: Callable[[A], AsyncPlan[B]],
) -> AsyncPlan[B]:
    async def _coro() -> Result[B, ErrInfo]:
        res = await plan()
        match res:
            case Ok(value=v):
                return await f(v)()
            case Err(error=e):
                return Err(e)
        assert_never(res)
    return lambda: _coro()

def async_map(
    plan: AsyncPlan[A],
    f: Callable[[A], B],
) -> AsyncPlan[B]:
    return async_bind(plan, lambda x: async_pure(f(x)))

# Lift a parameterless async capability call into a replayable AsyncPlan
# Precondition: make_coro MUST return a fresh coroutine each call.
# Correct:   lambda: capability.method(...)
# Incorrect: coro = capability.method(...); async_lift(lambda: coro)
def async_lift(
    make_coro: Callable[[], Awaitable[Result[A, ErrInfo]]],
) -> AsyncPlan[A]:
    return make_coro

Shell-only interpreter (capstone/src/funcpipe_rag/infra/adapters/async_runtime.py – never imported into domain)

# funcpipe_rag/infra/adapters/async_runtime.py
async def perform_async(plan: AsyncPlan[A]) -> Result[A, ErrInfo]:
    """The one interpreter – runs the async description."""
    return await plan()

4. Reference Implementations (pure domain code)

4.1 Capability protocols (domain/ports.py)

class HttpClientCap(Protocol):
    async def get_json(self, url: str) -> Result[dict, ErrInfo]: ...

class DbCap(Protocol):
    async def save_user(self, user: UserProfile) -> Result[None, ErrInfo]: ...

4.2 Pure description – dependent async steps (domain/user.py)

# Factored version – the canonical, readable style
def _fetch(user_id: str, http: HttpClientCap) -> AsyncPlan[dict]:
    return async_lift(lambda: http.get_json(f"https://api.example.com/users/{user_id}"))

def _validate(raw: dict) -> AsyncPlan[UserProfile]:
    return async_pure(validate_user(raw))

def _save(user: UserProfile, db: DbCap) -> AsyncPlan[UserProfile]:
    return async_bind(
        async_lift(lambda: db.save_user(user)),
        lambda _: async_pure(user),
    )

def async_user_pipeline(
    user_id: str,
    http: HttpClientCap,
    db: DbCap,
) -> AsyncPlan[UserProfile]:
    return async_bind(
        _fetch(user_id, http),
        lambda raw: async_bind(
            _validate(raw),
            lambda user: _save(user, db),
        ),
    )

4.3 Before → After (the real transformation)

# BEFORE – hidden magic, immediate execution, untestable
async def old_process(user_id: str):
    raw = await http.get_json(f"/users/{user_id}")   # immediate effect
    user = validate_user(raw)
    await db.save_user(user)
    return user

# AFTER – pure description, no execution until shell calls perform_async
def async_user_pipeline(
    user_id: str,
    http: HttpClientCap,
    db: DbCap,
) -> AsyncPlan[UserProfile]:
    return async_bind(
        _fetch(user_id, http),
        lambda raw: async_bind(
            _validate(raw),
            lambda user: _save(user, db),
        ),
    )

Shell usage (only place driving happens):

plan = async_user_pipeline("123", real_http, real_db)
result = await perform_async(plan)   # or asyncio.run(perform_async(plan))

4.4 Real-world RAG example (domain/rag_async.py)

from dataclasses import replace
from funcpipe_rag.domain.effects.async_ import async_gather

def _embed_chunk(
    chunk: ChunkWithoutEmbedding,
    embedder: EmbeddingCap,
) -> AsyncPlan[EmbeddedChunk]:
    return async_bind(
        async_lift(lambda: embedder.embed(chunk.text.content)),
        lambda vector: async_pure(
            replace(chunk, embedding=Embedding(vector, embedder.model_name))
        ),
    )

def async_rag_pipeline(
    docs: Iterator[RawDoc],
    env: RagEnv,
    embedder: EmbeddingCap,
) -> AsyncPlan[list[EmbeddedChunk]]:
    return async_bind(
        async_pure(list(gen_clean_and_chunk(docs, env))),
        lambda chunks: async_map(
            async_gather(  # M08C03 – bounded parallel embedding
                [_embed_chunk(c, embedder) for c in chunks],
                concurrency=env.embedding_concurrency,
            ),
            structural_dedup_chunks,
        ),
    )

5. Property-Based Proofs (all pass in CI)

@given(x=st.integers())
@pytest.mark.asyncio
async def test_async_bind_laws(x: int):
    f = lambda y: async_pure(y + 1)
    g = lambda z: async_pure(z * 2)

    a = async_pure(x)

    assert await perform_async(async_bind(async_bind(a, f), g)) == \
           await perform_async(async_bind(a, lambda y: async_bind(f(y), g)))

    assert await perform_async(async_bind(a, async_pure)) == \
           await perform_async(a)

    assert await perform_async(async_bind(async_pure(x), f)) == \
           await perform_async(f(x))
@pytest.mark.asyncio
async def test_replayability():
    plan = async_lift(lambda: http_mock.get_json("/test"))  # mock is deterministic
    r1 = await perform_async(plan)
    r2 = await perform_async(plan)  # fresh coroutine each call
    assert r1 == r2

6. Big-O & Allocation Guarantees

Operation Time Python call-stack Heap allocation until interpretation
async_pure O(1) O(1) O(1)
async_bind O(1) O(1) O(1) closure
async_lift O(1) O(1) O(1)
perform_async O(chain) O(chain) O(chain) closures

Composition is O(1) per layer — no work until interpretation.

7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
Top-level await / perform_async / create_task in core Immediate effects, untestable Return AsyncPlan, drive only in shell
Re-using a single coroutine object with async_lift Replayability broken Always pass a lambda that creates a fresh coroutine
asyncio.create_task in core Hidden concurrency, leaks Use explicit async_gather / bounded queues (later cores)
Direct httpx.AsyncClient() in core Concrete dependency Inject capability protocol

8. Pre-Core Quiz

  1. async def returns…? → A coroutine object – a pure description
  2. await inside coroutine bodies is…? → Required and allowed – it is part of the description
  3. Driving a plan (perform_async, asyncio.run, create_task) is…? → Only in shells – never in core
  4. Compose async descriptions with…? → async_bind / async_map / async_lift
  5. Real power comes from…? → Concurrency is explicit data, not magic – exactly like IOPlan

9. Post-Core Exercise

  1. Convert one real I/O-bound pipeline in your codebase to return an AsyncPlan.
  2. Chain ≥3 dependent async steps (preferably factored into small private helpers).
  3. Write a Hypothesis property proving the bind laws for your pipeline.
  4. Run the same AsyncPlan twice in a test and assert identical results (for idempotent cases).
  5. Celebrate – you now have async without magic.

Continue with: Async Generators

You now model async operations as pure, composable descriptions — exactly symmetrical to IOPlan. Your core remains pure, your shell controls execution, and concurrency is finally explicit data, not hidden magic.