PAR_FOR and SEQ_FOR (Replicators)
Import: from stbass.replicator import PAR_FOR, SEQ_FOR
Also re-exported from: from stbass import PAR_FOR, SEQ_FOR
Dynamic fan-out: create N instances of a process from a factory function.
PAR_FOR
Runs N instances concurrently:
result = await PAR_FOR(
count=10,
factory=my_worker,
on_failure=FailurePolicy.COLLECT,
deadline=30.0,
).run()| Parameter | Type | Description |
|-----------|------|-------------|
| count | int \| Callable[[], int] | Number of instances, or a callable returning the count |
| factory | Callable[[int, ProcessContext], Awaitable[ProcessResult]] | Function that creates work for each index |
| on_failure | FailurePolicy | How to handle failures (default: HALT) |
| deadline | float \| None | Maximum seconds |
Factory signature:
async def my_worker(index: int, ctx: ProcessContext) -> ProcessResult:
# index is 0, 1, 2, ... count-1
return ProcessResult.ok(f"result_{index}")Dynamic count:
items = fetch_items()
result = await PAR_FOR(
count=lambda: len(items),
factory=lambda i, ctx: process_item(items[i], ctx),
).run()SEQ_FOR
Runs N instances sequentially (one at a time, in order):
result = await SEQ_FOR(
count=5,
factory=my_worker,
on_failure=FailurePolicy.HALT,
).run()Returns a SEQResult with results in order.
Nesting Replicators
Replicators can nest — subagents spawning subagents:
async def inner_worker(idx, ctx):
return ProcessResult.ok(idx)
async def outer_worker(idx, ctx):
result = await PAR_FOR(count=3, factory=inner_worker).run()
return ProcessResult.ok(f"outer_{idx}")
result = await PAR_FOR(count=4, factory=outer_worker).run()