PAR_FOR and SEQ_FOR (Replicators)

Import: from stbass.replicator import PAR_FOR, SEQ_FOR Also re-exported from: from stbass import PAR_FOR, SEQ_FOR

Dynamic fan-out: create N instances of a process from a factory function.

PAR_FOR

Runs N instances concurrently:

result = await PAR_FOR(
    count=10,
    factory=my_worker,
    on_failure=FailurePolicy.COLLECT,
    deadline=30.0,
).run()

| Parameter | Type | Description | |-----------|------|-------------| | count | int \| Callable[[], int] | Number of instances, or a callable returning the count | | factory | Callable[[int, ProcessContext], Awaitable[ProcessResult]] | Function that creates work for each index | | on_failure | FailurePolicy | How to handle failures (default: HALT) | | deadline | float \| None | Maximum seconds |

Factory signature:

async def my_worker(index: int, ctx: ProcessContext) -> ProcessResult:
    # index is 0, 1, 2, ... count-1
    return ProcessResult.ok(f"result_{index}")

Dynamic count:

items = fetch_items()
result = await PAR_FOR(
    count=lambda: len(items),
    factory=lambda i, ctx: process_item(items[i], ctx),
).run()

SEQ_FOR

Runs N instances sequentially (one at a time, in order):

result = await SEQ_FOR(
    count=5,
    factory=my_worker,
    on_failure=FailurePolicy.HALT,
).run()

Returns a SEQResult with results in order.

Nesting Replicators

Replicators can nest — subagents spawning subagents:

async def inner_worker(idx, ctx):
    return ProcessResult.ok(idx)
 
async def outer_worker(idx, ctx):
    result = await PAR_FOR(count=3, factory=inner_worker).run()
    return ProcessResult.ok(f"outer_{idx}")
 
result = await PAR_FOR(count=4, factory=outer_worker).run()