SEQ (Sequential Composition)
Import: from stbass import SEQ
Runs processes in order. The output of each process becomes the input to the next.
Constructor
SEQ(*processes, on_failure=FailurePolicy.HALT)| Parameter | Type | Description |
|-----------|------|-------------|
| *processes | Process | Processes to run in order |
| on_failure | FailurePolicy | What to do when a process fails (default: HALT) |
Running
result = await SEQ(step1, step2, step3).run(input_value="start")Each process receives an "input" channel with the previous stage's output and may write to an "output" channel. Two patterns work:
Pattern A — Channel I/O (explicit):
@Process
async def step(ctx: ProcessContext) -> ProcessResult:
data = await ctx.recv("input")
result = transform(data)
await ctx.send("output", result)
return ProcessResult.ok(result)Pattern B — Return value only (simpler):
@Process
async def step(ctx: ProcessContext) -> ProcessResult:
data = await ctx.recv("input")
result = transform(data)
return ProcessResult.ok(result)Both patterns work. If the process writes to "output", that value is used. If not, result.value is used.
Result: SEQResult
| Property | Type | Description |
|----------|------|-------------|
| is_ok | bool | True if all stages succeeded |
| value | Any | Value from the last successful stage |
| results | list[ProcessResult] | All individual results |
| failures | list[Failure] | All failures |
Failure Policies in SEQ
| Policy | Behavior |
|--------|----------|
| FailurePolicy.HALT | Stop pipeline at first failure. Subsequent stages do not run. |
| FailurePolicy.COLLECT | Continue pipeline after failure. Next stage receives None as input. |
Example — three-stage pipeline:
@Process
async def parse(ctx: ProcessContext) -> ProcessResult:
raw = await ctx.recv("input")
return ProcessResult.ok(json.loads(raw))
@Process
async def validate(ctx: ProcessContext) -> ProcessResult:
data = await ctx.recv("input")
if "name" not in data:
raise ValueError("missing name")
return ProcessResult.ok(data)
@Process
async def save(ctx: ProcessContext) -> ProcessResult:
data = await ctx.recv("input")
db.insert(data)
return ProcessResult.ok("saved")
result = await SEQ(parse, validate, save).run(input_value='{"name": "test"}')Context Manager
async with SEQ(step1, step2) as pipeline:
result = await pipeline.run(input_value=data)