Skip to content

Async Adapters

Concept Position

flowchart TD
  family["Python Programming"] --> program["Python Functional Programming"]
  program --> module["Module 08: Async Pipelines, Backpressure, and Fairness"]
  module --> concept["Async Adapters"]
  concept --> capstone["Capstone pressure point"]
flowchart TD
  problem["Start with the design or failure question"] --> example["Study the worked example and trade-offs"]
  example --> boundary["Name the boundary this page is trying to protect"]
  boundary --> proof["Carry that question into code review or the capstone"]

Read the first diagram as a placement map: this page is one concept inside its parent module, not a detached essay, and the capstone is the pressure test for whether the idea holds. Read the second diagram as the working rhythm for the page: name the problem, study the example, identify the boundary, then carry one review question forward.

Module 08 – Main Track Core

Main track: Cores 1–10 (Async / Concurrent Pipelines → Production).
This is a required core. The pure synchronous core is the crown jewel of the entire system — and it must never be polluted by await, event loops, or any knowledge that it is running asynchronously.

Progression Note

Module 8 is Async FuncPipe & Backpressure — the lightweight, production-grade concurrency layer that sits directly on top of Module 7’s effect boundaries.

Module Focus Key Outcomes
7 Effect Boundaries & Resource Safety Ports & adapters, capability interfaces, resource-safe effect isolation
8 Async FuncPipe & Backpressure Async streams, bounded queues, timeouts/retries, fairness & rate limiting
9 FP Across Libraries and Frameworks Stdlib FP, data/ML stacks, web/CLI/distributed integration
10 Refactoring, Performance, and Future-Proofing Systematic refactors, performance budgets, governance & evolution

Core question
How do you keep the pure, synchronous FuncPipe core 100 % untouched and instantly unit-testable while still running it efficiently inside a real async, concurrent, backpressure-aware, resilient production pipeline?

We take the pure RAG core we have built since Module 1 and ask the question every scaling team eventually faces:

“Why did my beautiful pure core turn into an untestable async mess full of await, try/except httpx.TimeoutException, and event-loop dependencies?”

The naïve pattern everyone writes first:

# BEFORE – async creep destroys the core
async def rag_god_async(path: str) -> list[EmbeddedChunk]:
    async with aiofiles.open(path) as f:
        lines = [l async for l in f]
    docs = [parse_sync(l) for l in lines]
    cleaned = [await clean_doc_async(d) for d in docs]      # await in core
    chunks = [c for d in cleaned for c in chunk_sync(d)]
    embeddings = await asyncio.gather(*[model.aencode(c.text) for c in chunks])
    ...

Async everywhere, core untestable without an event loop, impossible to reason about mathematically.

The production pattern: the core is 100 % synchronous and pure (all domain functions return Result). Async edges return descriptions. Lifts turn sync Result-returning functions into async descriptions. CPU-bound core steps run in a thread pool. The shell drives everything.

# AFTER – pure sync core + thin async shell
def pure_rag_core(
    raw_lines: list[str],
    env: RagEnv,
) -> Result[list[EmbeddedChunk], ErrInfo]:
    return (
        Ok(raw_lines)
        .and_then(lambda lines: result_traverse(lines, parse_line_sync))
        .and_then(lambda docs: result_traverse(docs, lambda d: clean_doc_sync(d, env)))
        .and_then(lambda docs: result_traverse(docs, lambda d: chunk_doc_sync(d, env)))   # list[list[Chunk]]
        .and_then(lambda chunk_lists: Ok([chunk for chunks in chunk_lists for chunk in chunks]))  # explicit flatten
        .and_then(lambda chunks: result_traverse(chunks, prepare_embed_sync))
        .and_then(lambda reqs: result_traverse(reqs, embed_sync))  # fake sync embed for core tests
    )

def async_rag_description(
    storage: StoragePort,
    path: str,
    env: RagEnv,
    executor: Executor,
) -> AsyncGen[EmbeddedChunk]:
    raw_lines   = async_fetch_raw_lines(storage, path)
    parsed      = async_gen_map(raw_lines, lift_sync(parse_line_sync))
    cleaned     = async_gen_map(parsed, lambda doc: lift_sync_with_executor(clean_doc_sync, executor)(doc, env))
    chunked     = async_gen_flat_map(cleaned, lambda doc: lift_sync_gen_with_executor(chunk_doc_sync, executor)(doc, env))
    prepared    = async_gen_map(chunked, lift_sync_with_executor(prepare_embed_sync, executor))
    embedded    = async_gen_map(prepared, async_embed_request)
    return async_gen_rate_limited(async_gen_fair_merge([embedded]), rate_policy)

The core stays pure sync forever. The shell stays thin forever. Adding concurrency/retries/rate-limiting/fairness is a one-line policy change in the description.

Audience: Engineers who built a gorgeous pure core and then watched async creep destroy it.

Outcome 1. The pure core is 100 % synchronous and instantly unit-testable (no event loop needed). 2. All async, I/O, concurrency, retries, rate limiting, fairness live exclusively in thin adapters and combinators. 3. CPU-bound pure code runs efficiently without blocking the event loop. 4. The entire pipeline is still a pure description — composable, inspectable, and mathematically lawful.

Tiny Non-Domain Example – Pure Core + Async HTTP

# Pure sync core (binary, Result-returning)
def enrich_user_sync(user: User, posts: list[Post]) -> Result[UserProfile, ErrInfo]:
    top_posts = [p for p in posts if p.score > 50]
    return Ok(UserProfile(user.id, user.name, top_posts))

# Async edges
def async_fetch_user(id: int) -> AsyncAction[User]: ...
def async_fetch_posts(user_id: int) -> AsyncGen[Post]: ...

# Integration
def user_profile_description(user_id: int, executor: Executor) -> AsyncAction[UserProfile]:
    user  = async_fetch_user(user_id)
    posts = async_fetch_posts(user_id)
    enrich = lift_sync_with_executor(enrich_user_sync, executor)
    return async_bind(
        async_tuple(user, async_collect(posts)),
        lambda user_posts: enrich(user_posts[0], user_posts[1]),
    )

Core unchanged. Async completely isolated.

Why Keep the Core 100 % Synchronous and Pure? (Three bullets every engineer should internalise)

  • Instant unit tests: No event loop, no fakes, no @pytest.mark.asyncio — just pure functions, Hypothesis, done.
  • Mathematical reasoning: Monad laws, equational reasoning, referential transparency — everything from Module 6 still applies exactly.
  • CPU efficiency: Heavy pure transforms run in ThreadPoolExecutor without starving the event loop.

1. Laws & Invariants (machine-checked)

Law Statement Enforcement
Core Purity Every function under domain/core/ is synchronous and pure (no await, no I/O) mypy + CI grep forbid
Lift Equivalence For any pure, non-raising Result-returning f: lift_sync(f)(*args)()async_from_result(f(*args))() (logical time) Hypothesis + fake executor
Description Purity All lifts and combinators return thunks that create fresh coroutines Static analysis
Error Propagation Exceptions in lifted sync functions → Err(ErrInfo.from_exc(e)) Property tests
No Async in Core Zero async def or await under domain/core/ CI forbid list

async_from_result(r: Result[T, ErrInfo]) -> AsyncAction[T] is the canonical constructor that simply wraps an existing Result in the async layer (no computation).

2. Decision Table – Where Does Code Belong?

Code Type Synchronous? Contains I/O? Layer Reason
Business logic, parsing, chunking, validation Yes No Pure Core Testable, composable, mathematical
HTTP, DB, file reads No Yes Async Adapter Real concurrency, backpressure
CPU-heavy pure transforms Yes No Core + Executor Prevent event loop blocking
Orchestration, policies No No Description Pure async wiring

If it can be sync and pure → it must be in the core.

3. Public API – Canonical Types & Lifts

# Repo implementation lives in:
# - funcpipe_rag/domain/effects/async_/plan.py  (AsyncPlan / AsyncAction)
# - funcpipe_rag/domain/effects/async_/stream.py (AsyncGen + chunking + fan-in)
# - funcpipe_rag/domain/effects/async_/lifts.py (lift_sync* helpers)

# Recap from earlier cores
AsyncAction[T] = Callable[[], Awaitable[Result[T, ErrInfo]]]   # thunk → fresh coroutine
AsyncGen[T]    = Callable[[], AsyncIterator[Result[T, ErrInfo]]]

# Domain/core functions return Result; Async adds the second layer (time/async)
def async_from_result(r: Result[T, ErrInfo]) -> AsyncAction[T]:
    async def _act() -> Result[T, ErrInfo]:
        return r
    return lambda: _act()

# Lifts accept Result-returning functions and simply add the async layer
def lift_sync(
    f: Callable[..., Result[T, ErrInfo]]
) -> Callable[..., AsyncAction[T]]:
    def lifted(*args: Any, **kwargs: Any) -> AsyncAction[T]:
        async def _act() -> Result[T, ErrInfo]:
            try:
                return f(*args, **kwargs)
            except Exception as e:
                return Err(ErrInfo.from_exc(e))
        return lambda: _act()
    return lifted

def lift_sync_with_executor(
    f: Callable[..., Result[T, ErrInfo]],
    executor: Executor,
) -> Callable[..., AsyncAction[T]]:
    def lifted(*args: Any, **kwargs: Any) -> AsyncAction[T]:
        async def _act() -> Result[T, ErrInfo]:
            loop = asyncio.get_running_loop()
            try:
                return await loop.run_in_executor(executor, lambda: f(*args, **kwargs))
            except Exception as e:
                return Err(ErrInfo.from_exc(e))
        return lambda: _act()
    return lifted

def lift_sync_gen_with_executor(
    f: Callable[..., Result[list[T], ErrInfo]],
    executor: Executor,
) -> Callable[..., AsyncGen[T]]:
    def lifted(*args: Any, **kwargs: Any) -> AsyncGen[T]:
        async def _gen() -> AsyncIterator[Result[T, ErrInfo]]:
            loop = asyncio.get_running_loop()
            try:
                res = await loop.run_in_executor(executor, lambda: f(*args, **kwargs))
                if isinstance(res, Ok):
                    for item in res.value:
                        yield Ok(item)
                else:
                    yield res
            except Exception as e:
                yield Err(ErrInfo.from_exc(e))
        return lambda: _gen()
    return lifted

4. Before → After – Pure Core RAG Pipeline

In the core, the chunking function has the explicit domain type:

chunk_doc_sync: Callable[[Doc, RagEnv], Result[list[Chunk], ErrInfo]]
# AFTER – pure sync core + thin async shell
def pure_rag_core(
    raw_lines: list[str],
    env: RagEnv,
) -> Result[list[EmbeddedChunk], ErrInfo]:
    return (
        Ok(raw_lines)
        .and_then(lambda lines: result_traverse(lines, parse_line_sync))
        .and_then(lambda docs: result_traverse(docs, lambda d: clean_doc_sync(d, env)))
        .and_then(lambda docs: result_traverse(docs, lambda d: chunk_doc_sync(d, env)))   # list[list[Chunk]]
        .and_then(lambda chunk_lists: Ok([chunk for chunks in chunk_lists for chunk in chunks]))  # explicit flatten
        .and_then(lambda chunks: result_traverse(chunks, prepare_embed_sync))
        .and_then(lambda reqs: result_traverse(reqs, embed_sync))  # fake sync embed for core tests
    )

def async_rag_description(
    storage: StoragePort,
    path: str,
    env: RagEnv,
    executor: Executor,
) -> AsyncGen[EmbeddedChunk]:
    raw_lines   = async_fetch_raw_lines(storage, path)
    parsed      = async_gen_map(raw_lines, lift_sync(parse_line_sync))
    cleaned     = async_gen_map(parsed, lambda doc: lift_sync_with_executor(clean_doc_sync, executor)(doc, env))
    chunked     = async_gen_flat_map(cleaned, lambda doc: lift_sync_gen_with_executor(chunk_doc_sync, executor)(doc, env))
    prepared    = async_gen_map(chunked, lift_sync_with_executor(prepare_embed_sync, executor))
    embedded    = async_gen_map(prepared, async_embed_request)
    return async_gen_rate_limited(async_gen_fair_merge([embedded]), rate_policy)

5. Property-Based Proofs (all pass in CI)

@given(raw_lines=st.lists(st.text(), max_size=100), env=st.from_type(RagEnv))
def test_pure_core_deterministic(raw_lines, env):
    res1 = pure_rag_core(raw_lines, env)
    res2 = pure_rag_core(raw_lines, env)
    assert res1 == res2

@pytest.mark.asyncio
@given(raw_lines=st.lists(st.text(), max_size=50))
async def test_layered_pipeline_equivalence(raw_lines):
    env = RagEnv(chunk_size=512)
    mock_storage = MockStorage("\n".join(raw_lines))

    # Expected from pure core (using fake sync embed)
    # In this test, async_embed_request is wired to the same deterministic fake as embed_sync
    expected = pure_rag_core(raw_lines, env)

    with ThreadPoolExecutor() as executor:
        desc = async_rag_description(mock_storage, "path", env, executor)
        results: list[Result[EmbeddedChunk, ErrInfo]] = [r async for r in desc()]

    # Compare success values (ignore ordering if needed)
    if isinstance(expected, Ok):
        actual_values = [r.value for r in results if isinstance(r, Ok)]
        assert sorted(actual_values, key=lambda c: c.id) == sorted(expected.value, key=lambda c: c.id)
    else:
        assert any(isinstance(r, Err) for r in results)

6. Runtime Guarantees

Component Synchronous? Blocks Event Loop? Memory Notes
Pure Core Yes Never O(input) Instant unit tests
Async Adapters No No O(1) Real I/O
Lifted CPU steps Yes (thread) No O(1) ThreadPoolExecutor prevents blocking

7. Anti-Patterns & Immediate Fixes

Anti-Pattern Symptom Fix
await or async def in core Impossible sync tests Move to adapters, use lifts
Blocking calls in async code Starved event loop lift_sync_with_executor
Mixed sync/async in same file Confusion, leaks Strict layer separation
Direct I/O in core Untestable, impure Ports + adapters only

8. Pre-Core Quiz

  1. The pure core is…? → 100 % synchronous and pure
  2. CPU-bound pure code runs in…? → ThreadPoolExecutor via lifts
  3. Async lives in…? → Thin adapters and combinators only
  4. Integration is done via…? → Lifts that turn sync Result-returning functions → async descriptions
  5. The golden rule? → Core never knows it’s running async

9. Post-Core Exercise

  1. Extract your real pipeline’s business logic into pure sync functions returning Result.
  2. Write the async adapters for your real I/O.
  3. Lift everything and compose the description.
  4. Add the equivalence property test between pure core and layered pipeline.
  5. Delete every await that was previously in your core. Celebrate.

Continue with: Async Service Integrations

You now have a pure, synchronous, instantly testable core running efficiently inside a full async, backpressure-safe, resilient, rate-limited, fair pipeline — without a single line of async in the core.

M08C07 is now frozen.