SEQ (Sequential Composition)

Import: from stbass import SEQ

Runs processes in order. The output of each process becomes the input to the next.

Constructor

SEQ(*processes, on_failure=FailurePolicy.HALT)

| Parameter | Type | Description | |-----------|------|-------------| | *processes | Process | Processes to run in order | | on_failure | FailurePolicy | What to do when a process fails (default: HALT) |

Running

result = await SEQ(step1, step2, step3).run(input_value="start")

Each process receives an "input" channel with the previous stage's output and may write to an "output" channel. Two patterns work:

Pattern A — Channel I/O (explicit):

@Process
async def step(ctx: ProcessContext) -> ProcessResult:
    data = await ctx.recv("input")
    result = transform(data)
    await ctx.send("output", result)
    return ProcessResult.ok(result)

Pattern B — Return value only (simpler):

@Process
async def step(ctx: ProcessContext) -> ProcessResult:
    data = await ctx.recv("input")
    result = transform(data)
    return ProcessResult.ok(result)

Both patterns work. If the process writes to "output", that value is used. If not, result.value is used.

Result: SEQResult

| Property | Type | Description | |----------|------|-------------| | is_ok | bool | True if all stages succeeded | | value | Any | Value from the last successful stage | | results | list[ProcessResult] | All individual results | | failures | list[Failure] | All failures |

Failure Policies in SEQ

| Policy | Behavior | |--------|----------| | FailurePolicy.HALT | Stop pipeline at first failure. Subsequent stages do not run. | | FailurePolicy.COLLECT | Continue pipeline after failure. Next stage receives None as input. |

Example — three-stage pipeline:

@Process
async def parse(ctx: ProcessContext) -> ProcessResult:
    raw = await ctx.recv("input")
    return ProcessResult.ok(json.loads(raw))
 
@Process
async def validate(ctx: ProcessContext) -> ProcessResult:
    data = await ctx.recv("input")
    if "name" not in data:
        raise ValueError("missing name")
    return ProcessResult.ok(data)
 
@Process
async def save(ctx: ProcessContext) -> ProcessResult:
    data = await ctx.recv("input")
    db.insert(data)
    return ProcessResult.ok("saved")
 
result = await SEQ(parse, validate, save).run(input_value='{"name": "test"}')

Context Manager

async with SEQ(step1, step2) as pipeline:
    result = await pipeline.run(input_value=data)