Infinite Sequences Safely¶
Page Maps¶
graph LR
family["Python Programming"]
program["Python Functional Programming"]
section["Iterators Laziness Streaming Dataflow"]
page["Infinite Sequences Safely"]
capstone["Capstone evidence"]
family --> program --> section --> page
page -.applies in.-> capstone
flowchart LR
orient["Orient on the page map"] --> read["Read the main claim and examples"]
read --> inspect["Inspect the related code, proof, or capstone surface"]
inspect --> verify["Run or review the verification path"]
verify --> apply["Apply the idea back to the module and capstone"]
This lesson is where lazy iteration stops being only a performance topic and becomes a safety topic. An unfenced iterator is not merely inefficient. In the wrong place, it is a correctness and reliability risk.
Start With the Missing Fence¶
The dangerous mistake here is subtle: the pipeline still looks elegant because everything is lazy, but nothing guarantees that it stops.
- If the source might be infinite or attacker-controlled, consuming it without a fence is a design bug.
- If the pipeline applies expensive work before bounding demand, the fence is too late to be protective.
- If you cannot say how many upstream pulls a helper performs, you cannot reason about safe termination.
Keep This Question In View¶
Core question:
How do you safely handle infinite or unbounded sequences in lazy pipelines by using fencing tools like islice, takewhile, and dropwhile to bound demand, short-circuit computation, and prevent resource exhaustion?
This lesson introduces fencing as an explicit safety discipline:
- use
islicewhen the limit is a hard count - use
takewhileanddropwhilewhen the stop condition is part of the data contract - place fences where they actually bound expensive work instead of after the damage is done
The running project matters because production pipelines often process sources that are large, malformed, or simply unbounded. Leave this lesson knowing how to make termination a visible design choice.
Use this when you process logs, network feeds, generators, or other streams that may be untrusted or unbounded.
Outcome: 1. Spot any unfenced consumption of an iterator that could be unbounded and know it's a denial-of-service vulnerability. 2. Add a safe fence in ≤ 3 lines. 3. Write a Hypothesis property proving exact demand bounds and short-circuit.
Laws (frozen, used across this core): - E1 — Prefix equivalence: For any re-iterable finite prefix S and k ≥ 0: list(islice(P(iter(S)), k)) == eager_equiv(S)[:k]. - D1a — islice demand: list(islice(xs, k)) pulls exactly min(k, |xs|) elements from xs; no extra probe. - D1b — takewhile demand: If predicate eventually fails after n yields, pulls exactly n+1; if source exhausts while predicate True, pulls exactly n. - D1c — dropwhile demand: If predicate eventually fails after m elements, pulls exactly m+1; if source exhausts while predicate True, pulls exactly m. - F1 — Safe on infinite: - islice(xs, k) terminates for any finite k. - takewhile(pred, xs) terminates iff pred eventually False on xs. - dropwhile(pred, xs) terminates iff xs is finite or pred eventually False on xs. - S1 — Single-pass discipline: Pipelines never assume re-iterability unless explicitly fenced or duplicated.
1. Conceptual Foundation¶
1.1 The One-Sentence Rule¶
Always fence consumption of any iterator that is not provably finite and small with islice (hard cap) or takewhile/dropwhile (predicate cap) — never trust unbounded sources.
1.2 Fencing in One Precise Sentence¶
islice bounds to a fixed prefix (exact demand), takewhile yields while predicate holds (pulling one extra on failure), dropwhile skips while predicate holds (pulling one extra on failure) — all lazy, O(1) aux, exact demand.
1.3 Why This Matters Now¶
Chunking and grouping made streaming richer, but they still assumed data that would eventually end. This lesson removes that assumption. Think about termination as part of the contract: not only what values flow through the pipeline, but also how much upstream work is allowed before the pipeline must stop.
1.4 Fencing in 5 Lines¶
The next snippet matters because it shows two different kinds of stop rule: one numeric and one predicate-based.
from itertools import islice, count, takewhile
inf = count() # truly infinite
bounded = islice(inf, 10) # hard cap
print(list(bounded)) # [0..9], then stops
inf2 = count() # fresh infinite source
pred_bounded = takewhile(lambda x: x < 10, inf2)
print(list(pred_bounded)) # [0..9], stops on first False (pulls 11th to check)
Safe termination even on infinite sources.
2. Mental Model: Unfenced vs Fenced Streams¶
2.1 One Picture¶
Unfenced (Dangerous) Fenced (Safe)
+-----------------------+ +------------------------------+
| inf → for/list/[*] | | inf → islice/takewhile |
| ↓ | | ↓ |
| hangs / OOM / DoS | | bounded termination* |
| never returns | | bounded work |
+-----------------------+ +------------------------------+
↑ Vulnerability ↑ Production-ready default
(* assuming fences and predicates satisfy F1.)
2.2 Behavioral Contract¶
| Aspect | Unfenced | Fenced (islice/takewhile/dropwhile) |
|---|---|---|
| Termination | Never on infinite | Always with islice; with take/dropwhile iff predicate eventually fails or source finite |
| Demand | Unbounded | Exactly specified (D1a/D1b/D1c) |
| Memory | Risk of explosion | O(1) aux |
| Short-circuit | No | Immediate on bound/predicate |
When Unfenced is Acceptable: Only for provably finite, small, trusted sources (e.g., in-memory config of < 100 items).
Fence Placement Rules (memorise): 1. Filter/map first → then fence. 2. After any fan-out → fence each branch. 3. Before expensive work (embedding, network, disk). 4. At pipeline sink → always fence user/configurable bounds.
Known Pitfalls:
- takewhile/dropwhile pull one extra element to check predicate.
- islice consumes prefix permanently (single-pass).
- Predicate must be pure and fast (called on every element).
Forbidden Patterns (CI-greppable):
- for x in unbounded_source: without fence
- list(unbounded_source)
- [*unbounded_source]
3. Running Project: Fenced Streams in RAG¶
3.1 Types (Canonical, Used Throughout)¶
From previous cores; RagEnv unchanged.
3.2 Unfenced Risk (Anti-Pattern)¶
# Dangerous: no bound
chunks = gen_rag_chunks(read_docs(path), env) # infinite/malicious docs → hangs forever
list(chunks) # never returns, OOM possible
Smells: Trusts input size; vulnerable to huge/malicious streams.
4. Refactor to Safe: Fencing in RAG¶
4.1 Defensive Core – Hardened Patterns¶
from __future__ import annotations
from itertools import islice, takewhile, dropwhile
from collections.abc import Iterable, Iterator
from rag_types import RawDoc, RagEnv, ChunkWithoutEmbedding
from core2 import gen_clean_docs
from core4 import gen_rag_chunks # overlapping + tail policy
def gen_bounded_chunks(
docs: Iterable[RawDoc],
env: RagEnv,
*,
max_chunks: int | None = None,
) -> Iterator[ChunkWithoutEmbedding]:
"""
Hard cap on total chunks produced.
D1a: pulls at most max_chunks (or less if source exhausts).
"""
if max_chunks is None:
yield from gen_rag_chunks(docs, env)
return
chunked = gen_rag_chunks(docs, env)
yield from islice(chunked, max_chunks)
def gen_long_docs_only(
docs: Iterable[RawDoc],
min_abstract_len: int = 1000
) -> Iterator[RawDoc]:
"""Skip leading short docs only — useful when short docs are noise at the beginning."""
return dropwhile(lambda d: len(d.abstract) < min_abstract_len, docs)
def gen_sufficient_context_chunks(
chunks: Iterable[ChunkWithoutEmbedding],
min_tokens_per_chunk: int = 100
) -> Iterator[ChunkWithoutEmbedding]:
"""Stop at the first chunk that is too short — assumes monotone degradation (e.g. tail chunks shrink)."""
return takewhile(lambda c: len(c.text.split()) >= min_tokens_per_chunk, chunks)
# Composed production pattern
def safe_rag_pipeline(
docs: Iterable[RawDoc],
env: RagEnv,
*,
max_chunks: int = 10_000,
min_doc_len: int = 500,
) -> Iterator[ChunkWithoutEmbedding]:
docs = gen_long_docs_only(docs, min_doc_len) # drop leading short docs
chunks = gen_rag_chunks(docs, env)
chunks = gen_sufficient_context_chunks(chunks) # takewhile good chunks (monotone assumption)
yield from islice(chunks, max_chunks) # hard global cap
Wins: Guaranteed termination; bounded work; composable defenses.
Complexity: | Fence | Time Bound | Demand Pulls | Notes | |-------------------|-----------------------------|-------------------------------|------------------------------------| | islice(xs, k) | O(k) | Exactly k (or less) | Hard cap | | takewhile(pred) | O(n+1) where n = yield count| n+1 if pred fails; n if exhausts | Predicate-driven stop | | dropwhile(pred) | O(m+1) where m = skipped | m+1 if pred fails; all if never | Skip leading prefix |
Note: Always fence user-configurable bounds and before expensive stages.
Re-iterability Warning: Fences consume irrevocably; design pipelines to be re-runnable from source if needed.
4.2 Defensive Shell (Production Pattern)¶
chunks = safe_rag_pipeline(
read_docs(path),
env,
max_chunks=config.max_chunks, # from CLI/ config, default 10_000
min_doc_len=config.min_doc_len,
)
write_jsonl_atomic(output_path, chunks) # bounded write → safe
Note: Even on infinite/malicious input → terminates gracefully.
5. Equational Reasoning: Substitution Exercise¶
Hand Exercise: Inline islice → bounds lazily without overpull.
Bug Hunt: Unfenced loops exhaust infinite sources; fenced short-circuit safely.
6. Property-Based Testing: Proving Fencing Laws¶
6.1 Custom Strategy¶
6.2 Full Suite (All Laws Enforced)¶
@given(st.lists(st.integers(), max_size=200), st.integers(0, 300))
def test_islice_exact_demand(data, k):
pulls = 0
def counted():
nonlocal pulls
for x in data:
pulls += 1
yield x
bounded = list(islice(counted(), k))
assert len(bounded) == min(k, len(data))
assert pulls == len(bounded) # D1a: exactly min(k, len(xs)), no extra
@given(st.lists(st.integers(), max_size=200), st.integers(0, 100))
def test_takewhile_demand(data, stop_at):
pulls = 0
def counted():
nonlocal pulls
for x in data:
pulls += 1
yield x
# leading True prefix length
n = 0
for x in data:
if x < stop_at:
n += 1
else:
break
has_failure = n < len(data)
bounded = list(takewhile(lambda x: x < stop_at, counted()))
assert len(bounded) == n
assert pulls == n + (1 if has_failure else 0) # D1b exact
@given(st.lists(st.integers(), max_size=200), st.integers(0, 100))
def test_dropwhile_demand(data, start_at):
pulls = 0
def counted():
nonlocal pulls
for x in data:
pulls += 1
yield x
# leading True prefix length
m = 0
for x in data:
if x < start_at:
m += 1
else:
break
has_failure = m < len(data)
dropped = list(dropwhile(lambda x: x < start_at, counted()))
assert pulls == m + (1 if has_failure else 0) # D1c exact
6.3 Shrinking Demo: Catching an Overpull Bug¶
Bad version (manual loop instead of islice):
def bad_bounded(xs, k):
i = 0
for x in xs:
if i >= k:
break
yield x
i += 1
next(xs) # BUG: extra pull!
Property fails: pulls = 2k on long input.
7. When Fencing Isn't Worth It¶
Only for provably finite, small, trusted sources (e.g., in-memory config of < 100 items).
Everything else → fence aggressively.
8. Pre-Core Quiz¶
islicepulls extra? → Never more than k.takewhileon infinite True? → Infinite — always fence after.- Safe on infinite source? → Only with islice or eventual predicate failure.
- Predicate in takewhile? → Must be pure and fast.
- Dropwhile demand? → Up to and including first False.
9. Post-Core Reflection & Exercise¶
Reflect: Audit every iterator consumption in your codebase and flag the ones that still need an explicit fence or termination argument.
Project Exercise: Add max_chunks=10_000 and min_doc_len=500 defaults to your RAG config. Verify on a malicious 1 TB simulated input that the pipeline terminates in < 1 second.
Continue with: Reusable Pipeline Stages
Repository Alignment¶
- Implementation:
capstone/src/funcpipe_rag/rag/streaming_rag.py::safe_rag_pipeline,gen_bounded_chunks. - Tests:
capstone/tests/unit/streaming/test_streaming.py::test_make_gen_rag_fn_equivalenceand::test_fork2_lockstep_mismatch_raisesfor the bounded-iterator guardrails used around the pipeline.
itertools Decision Table – Use This¶
| Tool | Use When | Memory | Pitfall | Safe? |
|---|---|---|---|---|
| chain | Concat many iterables | O(1) | None | Yes |
| groupby | Group contiguous equal items | O(1) | Must sort first if not contiguous | Yes |
| tee | Multiple consumers of same iterator | O(skew) | Unbounded skew → memory explosion | Careful |
| islice | Skip/take without consuming | O(1) | None | Yes |
| accumulate | Running totals/reductions | O(n) | Default op is + | Yes |
| compress | Filter by boolean mask | O(1) | None | Yes |
Further Reading: For the deepest itertools mastery, see the official docs and Dan Bader’s ‘Python Tricks’ chapter on iterators.
You now own the most powerful lazy streaming toolkit in Python. Module 4 will show you how to make even failure and resource cleanup lazy and pure.