Async Service Integrations¶
Page Maps¶
graph LR
family["Python Programming"]
program["Python Functional Programming"]
section["Async Pipelines Backpressure Fairness"]
page["Async Service Integrations"]
capstone["Capstone evidence"]
family --> program --> section --> page
page -.applies in.-> capstone
flowchart LR
orient["Orient on the page map"] --> read["Read the main claim and examples"]
read --> inspect["Inspect the related code, proof, or capstone surface"]
inspect --> verify["Run or review the verification path"]
verify --> apply["Apply the idea back to the module and capstone"]
Read the first diagram as a placement map: this page is one concept inside its parent module, not a detached essay, and the capstone is the pressure test for whether the idea holds. Read the second diagram as the working rhythm for the page: name the problem, study the example, identify the boundary, then carry one review question forward.
Module 08 – Main Track Core
Main track: Cores 1–10 (Async / Concurrent Pipelines → Production).
This is a required core. Every real-world FuncPipe pipeline talks to external services — and those services must be wrapped in thin, pure-protocol-conforming async adapters that are completely swappable, mockable, and composable with all the resilience machinery we built in C04–C06.
Progression Note¶
Module 8 is Async FuncPipe & Backpressure — the lightweight, production-grade concurrency layer that sits directly on top of Module 7’s effect boundaries.
| Module | Focus | Key Outcomes |
|---|---|---|
| 7 | Effect Boundaries & Resource Safety | Ports & adapters, capability interfaces, resource-safe effect isolation |
| 8 | Async FuncPipe & Backpressure | Async streams, bounded queues, timeouts/retries, fairness & rate limiting |
| 9 | FP Across Libraries and Frameworks | Stdlib FP, data/ML stacks, web/CLI/distributed integration |
| 10 | Refactoring, Performance, and Future-Proofing | Systematic refactors, performance budgets, governance & evolution |
Core question
How do you turn flaky, imperative, library-specific service calls (HTTP APIs, databases, GPU queues) into thin, pure-protocol-conforming async adapters that are completely isolated, idempotent where possible, and trivially composable with retries, timeouts, rate-limiting, and fairness — while keeping the core 100 % unaware of the concrete service?
We take the layered RAG pipeline from C07 and finally connect it to the real world: OpenAI/Anthropic/Cohere embedding APIs, Pinecone/Weaviate/Qdrant/Chroma/PostgreSQL+pgvector — without ever letting a single httpx.AsyncClient or asyncpg.Pool touch the pure core.
The naïve pattern everyone writes first:
# BEFORE – service cancer everywhere
async def embed_chunks(chunks: list[Chunk]) -> list[EmbeddedChunk]:
resp = await httpx.post( # concrete library
"https://api.openai.com/v1/embeddings",
headers={"Authorization": f"Bearer {API_KEY}"},
json={"model": "text-embedding-3-large", "input": [c.text for c in chunks]},
timeout=30.0,
)
resp.raise_for_status() # untyped exception
data = resp.json()["data"]
return [replace(c, embedding=Embedding(vec["embedding"], "openai")) for c, vec in zip(chunks, data)]
Concrete library, hardcoded timeout, untyped JSON parsing, no retry classification, no idempotence, impossible to mock cleanly.
The production pattern: pure protocol → factory that returns an adapter class implementing the protocol → adapter methods return pure async descriptions (thunks) → all resilience (retry/timeout/rate-limit/fairness) applied as data in the shell or higher-order combinators.
# AFTER – pure protocol + thin adapter class
@runtime_checkable
class EmbedPort(Protocol):
# AsyncAction already yields Result[..., ErrInfo] when driven.
# Use the return type `list[Result[Embedding, ErrInfo]]` when you need per-item failures.
def embed_batch(self, texts: list[str]) -> AsyncAction[list[Result[Embedding, ErrInfo]]]: ...
def make_openai_embed_adapter(client: AsyncClient, model: str) -> EmbedPort:
class _Adapter:
def embed_batch(self, texts: list[str]) -> AsyncAction[list[Result[Embedding, ErrInfo]]]:
async def _act() -> Result[list[Result[Embedding, ErrInfo]], ErrInfo]:
if not texts:
return Ok([])
try:
resp = await client.post(
"/v1/embeddings",
json={"model": model, "input": texts},
)
if resp.status_code == 429:
return Err(ErrInfo(code="RATE_LIMIT", msg="rate limited", ctx={"retry_after": resp.headers.get("retry-after")}))
if resp.status_code == 401:
return Err(ErrInfo(code="AUTH", msg="invalid api key"))
if 500 <= resp.status_code < 600:
return Err(ErrInfo(code="TRANSIENT", msg="server error"))
resp.raise_for_status()
data = resp.json()["data"]
return Ok([Ok(Embedding(vec["embedding"], model)) for vec in data])
except TimeoutException:
return Err(ErrInfo(code="TIMEOUT", msg="request timeout"))
except RequestError as e:
return Err(ErrInfo(code="NETWORK", msg=str(e)))
except Exception as e: # JSON parse, unexpected shape, etc.
return Err(ErrInfo(code="SERVICE_SPECIFIC", msg=str(e)))
return lambda: _act()
return _Adapter()
One factory change → completely different provider. Zero core changes. Full resilience via policies applied outside.
Use this when “the embedding API changed its JSON shape again” is the kind of failure you need to contain before it wakes anyone up.
Outcome 1. Every external service wrapped in a pure protocol + thin adapter factory returning a class. 2. All resilience (retry/timeout/rate-limit/fairness) applied as pure data outside the adapter. 3. Idempotent writes via UPSERT + stable deterministic IDs (derived from content hash). 4. Full deterministic testing via sync mock implementations of the protocols.
Tiny Non-Domain Example – HTTP Weather Adapter¶
@runtime_checkable
class WeatherPort(Protocol):
def get_forecast(self, city: str) -> AsyncAction[Result[Forecast, ErrInfo]]: ...
# Single-item domain value; request-level Result only
def make_openweather_adapter(client: AsyncClient, api_key: str) -> WeatherPort:
class _Adapter:
def get_forecast(self, city: str) -> AsyncAction[Result[Forecast, ErrInfo]]:
async def _act() -> Result[Forecast, ErrInfo]:
try:
resp = await client.get(
f"https://api.openweathermap.org/data/2.5/forecast?q={city}&appid={api_key}"
)
if resp.status_code == 401:
return Err(ErrInfo(code="AUTH", msg="invalid key"))
if resp.status_code == 429:
return Err(ErrInfo(code="RATE_LIMIT", msg="too many requests"))
resp.raise_for_status()
return Ok(parse_forecast(resp.json()))
except TimeoutException:
return Err(ErrInfo(code="TIMEOUT", msg="request timeout"))
except RequestError as e:
return Err(ErrInfo(code="NETWORK", msg=str(e)))
except Exception as e:
return Err(ErrInfo(code="SERVICE_SPECIFIC", msg=str(e)))
return lambda: _act()
return _Adapter()
Swap in a mock → instant sync tests.
Why Pure Protocols + Thin Async Adapter Classes? (Three bullets every engineer should internalise)¶
- Zero vendor lock-in: Swap OpenAI ↔ Cohere ↔ local Ollama with a one-line factory change.
- Strong testability: Implement the protocol synchronously with fakes so most tests stay out of the event loop.
- Full resilience composability: All policies (retry/timeout/rate-limit/fairness) applied uniformly outside the adapter.
Adapter methods must never be async def — they always return an AsyncAction thunk (fresh coroutine factory). This is the law that keeps us pure.
1. Laws & Invariants (machine-checked)¶
| Law | Statement | Enforcement |
|---|---|---|
| Adapter Purity | Adapter methods return thunks (lambda: ...) — never executed on call; no effects on construction |
Static analysis |
| Error Taxonomy | Every external failure maps to one of: AUTH, RATE_LIMIT, TRANSIENT, TIMEOUT, NETWORK, SERVICE_SPECIFIC, DB_ERROR, FATAL_DB | Exhaustive match + catch-all |
| Idempotence (writes) | UPSERT + deterministic stable ID → repeated identical calls are no-ops | Property tests with mock DB |
| Resource Safety | Short-lived resources (connections, transactions) acquired via async with; long-lived clients owned by shell |
Contextlib + cancellation tests |
| Mock Equivalence | Real adapter ≡ mock adapter on golden inputs (with fake clock/RNG) | Hypothesis equivalence |
2. Decision Table – Adapter Design Choices¶
| Service Type | Idempotent? | Batch API? | Per-item Results? | Recommended Return Type |
|---|---|---|---|---|
| Embedding HTTP | Yes | Yes | Rarely needed | AsyncAction[Result[list[Result[Embedding, ErrInfo]], ErrInfo]] |
| Vector DB upsert | Yes | Yes | No | AsyncAction[Result[None, ErrInfo]] |
| Vector DB query | Yes | N/A | No | AsyncAction[Result[list[EmbeddedChunk], ErrInfo]] |
| Non-idempotent POST | No | Careful | — | Disable retry or make idempotent via token |
3. Public API – Protocols & Adapter Factories¶
from __future__ import annotations
from typing import Protocol, runtime_checkable
from collections.abc import AsyncGenerator
from httpx import AsyncClient, TimeoutException, RequestError
from asyncpg import Pool, PostgresError
AsyncGen = AsyncGenerator
@runtime_checkable
class EmbedPort(Protocol):
def embed_batch(self, texts: list[str]) -> AsyncAction[Result[list[Result[Embedding, ErrInfo]], ErrInfo]]: ...
# Outer Result: request-level failure (network/auth/rate-limit/transient)
# Inner list[Result]: per-text failures (provider-specific, rare but possible)
@runtime_checkable
class VectorStorePort(Protocol):
def upsert(self, chunks: list[EmbeddedChunk]) -> AsyncAction[Result[None, ErrInfo]]: ...
def query(self, embedding: EmbeddingVector, top_k: int) -> AsyncAction[Result[list[EmbeddedChunk], ErrInfo]]: ...
# HTTP factory – returns adapter class instance
def make_openai_embed_adapter(
client: AsyncClient,
model: str = "text-embedding-3-large",
) -> EmbedPort:
class _Adapter:
def embed_batch(self, texts: list[str]) -> AsyncAction[Result[list[Result[Embedding, ErrInfo]], ErrInfo]]:
async def _act() -> Result[list[Result[Embedding, ErrInfo]], ErrInfo]:
if not texts:
return Ok([])
try:
resp = await client.post(
"/v1/embeddings",
json={"model": model, "input": texts},
)
if resp.status_code == 429:
return Err(ErrInfo(code="RATE_LIMIT", msg="rate limited", meta={"retry_after": resp.headers.get("retry-after")}))
if resp.status_code == 401:
return Err(ErrInfo(code="AUTH", msg="invalid api key"))
if 500 <= resp.status_code < 600:
return Err(ErrInfo(code="TRANSIENT", msg="server error"))
resp.raise_for_status()
data = resp.json()["data"]
return Ok([Ok(Embedding(vec["embedding"], model)) for vec in data])
except TimeoutException:
return Err(ErrInfo(code="TIMEOUT", msg="request timeout"))
except RequestError as e:
return Err(ErrInfo(code="NETWORK", msg=str(e)))
except Exception as e: # JSON parse, unexpected shape, etc.
return Err(ErrInfo(code="SERVICE_SPECIFIC", msg=str(e)))
return lambda: _act()
return _Adapter()
# DB factory – returns adapter class instance
def make_pgvector_adapter(pool: Pool) -> VectorStorePort:
class _Adapter:
def upsert(self, chunks: list[EmbeddedChunk]) -> AsyncAction[Result[None, ErrInfo]]:
async def _act() -> Result[None, ErrInfo]:
if not chunks:
return Ok(None)
try:
async with pool.acquire() as conn:
async with conn.transaction():
await conn.executemany(
"""INSERT INTO chunks (id, embedding, metadata)
VALUES ($1, $2, $3)
ON CONFLICT (id) DO UPDATE
SET embedding = EXCLUDED.embedding,
metadata = EXCLUDED.metadata""",
[(c.id, c.embedding.vector, c.metadata) for c in chunks],
)
return Ok(None)
except PostgresError as e:
# 08xxx = connection exception, 40xxx = transaction rollback (usually transient)
is_transient = bool(e.sqlstate and e.sqlstate.startswith(("08", "40")))
code = "TRANSIENT" if is_transient else "DB_ERROR"
return Err(ErrInfo(code=code, msg=str(e), meta={"sqlstate": e.sqlstate}))
except Exception as e:
return Err(ErrInfo(code="FATAL_DB", msg=str(e)))
return lambda: _act()
def query(self, embedding: EmbeddingVector, top_k: int) -> AsyncAction[Result[list[EmbeddedChunk], ErrInfo]]:
async def _act() -> Result[list[EmbeddedChunk], ErrInfo]:
try:
async with pool.acquire() as conn:
rows = await conn.fetch(
"""SELECT id, embedding, metadata
FROM chunks
ORDER BY embedding <-> $1
LIMIT $2""",
embedding, top_k,
)
return Ok([EmbeddedChunk.from_row(row) for row in rows])
except PostgresError as e:
is_transient = bool(e.sqlstate and e.sqlstate.startswith(("08", "40")))
code = "TRANSIENT" if is_transient else "DB_ERROR"
return Err(ErrInfo(code=code, msg=str(e), meta={"sqlstate": e.sqlstate}))
except Exception as e:
return Err(ErrInfo(code="FATAL_DB", msg=str(e)))
return lambda: _act()
return _Adapter()
4. Before → After – RAG with Real Services¶
# BEFORE – concrete services everywhere
async def rag_with_openai_and_pg(chunks: list[Chunk]):
resp = await httpx.post(...) # scattered
# ... parse, handle 429 manually
await pool.executemany(...) # different client
# AFTER – pure protocols only
def rag_with_services(
embed_port: EmbedPort,
vector_port: VectorStorePort,
) -> AsyncGen[None]:
# ... same pipeline as C07, just pass ports
embedded = async_gen_map(batches, embed_port.embed_batch)
stored = async_gen_map(embedded, vector_port.upsert)
return stored
# Shell – owns concrete clients and policies
async def main():
async with AsyncClient(base_url="https://api.openai.com", timeout=None) as client:
embed_port = make_openai_embed_adapter(client)
vector_port = make_pgvector_adapter(pool)
desc = rag_with_services(embed_port, vector_port)
resilient = async_gen_map_action(
desc,
lambda act: async_with_resilience(act, retry_policy, timeout_policy),
)
async for _ in resilient():
pass
5. Property-Based Proofs (all pass in CI – fully deterministic, no network)¶
@given(texts=st.lists(st.text(), max_size=20))
@pytest.mark.asyncio
async def test_openai_adapter_equivalence_with_mock(texts):
# Real-style adapter with stubbed client (no network)
stub_client = StubAsyncClient(responses=[{"data": [{"embedding": [0.1] * 1536} for _ in texts]}])
real_style_adapter = make_openai_embed_adapter(stub_client)
# Pure in-memory mock
mock_adapter = make_in_memory_embed_adapter() # returns fixed fake embeddings
plan1 = async_with_resilience(real_style_adapter.embed_batch(texts), RetryPolicy(max_attempts=1), None)
plan2 = async_with_resilience(mock_adapter.embed_batch(texts), RetryPolicy(max_attempts=1), None)
res1 = await perform_async(plan1)
res2 = await perform_async(plan2)
assert res1 == res2 # identical fake behaviour
@given(chunks=st.lists(st.from_type(EmbeddedChunk), max_size=10))
@pytest.mark.asyncio
async def test_pgvector_upsert_idempotence(chunks):
mock_pool = MockPool()
adapter = make_pgvector_adapter(mock_pool)
desc = adapter.upsert(chunks)
plan = async_with_resilience(desc, RetryPolicy(max_attempts=1), None)
await perform_async(plan)
state1 = mock_pool.snapshot()
# Second identical call
await perform_async(plan)
state2 = mock_pool.snapshot()
assert state1 == state2 # UPSERT + stable ID → no change
6. Runtime Guarantees¶
| Operation | Latency Added | Memory | Idempotence |
|---|---|---|---|
| HTTP batch | O(1) RTT | O(batch) | Yes |
| DB upsert | O(batch) | O(batch) | Yes (UPSERT+stable ID) |
| Query | O(log N) | O(k) | Yes |
7. Anti-Patterns & Immediate Fixes¶
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Concrete client in core | Untestable, coupled | Pure protocol + factory |
| Hard-coded retry/timeout | Inflexible | Policies as data in shell |
| No error taxonomy | Generic "failed" errors | Map to specific ErrInfo.code |
| Non-idempotent write | Duplicate data on retry | UPSERT + deterministic ID |
8. Pre-Core Quiz¶
- External services are accessed via…? → Pure protocols
- Concrete implementation lives in…? → Thin adapter class factories
- Adapter methods return…? → AsyncAction thunks (never executed on call)
- Resilience policies are applied…? → Outside the adapter, on the description
- The golden rule? → Core never sees a concrete client or HTTP status code
9. Post-Core Exercise¶
- Define
EmbedPortandVectorStorePortfor your real services. - Implement thin adapter factories returning classes.
- Replace every direct service call in your pipeline with the protocol version.
- Add a fully deterministic equivalence property test using stubbed clients.
- Sleep well — your pipeline now survives provider changes without a single core modification.
Continue with: Async Chunking
You now have production-grade, swappable, resilient async adapters for every external service — while the pure core remains eternally untouched.
M08C08 is now frozen.