Process

Import: from stbass import Process Also: from stbass.process import ProcessContext

Creating Processes

Decorator — no arguments
@Process
async def agent(ctx: ProcessContext) -> ProcessResult:
    return ProcessResult.ok("result")

The function name becomes the process name automatically (agent.name == "agent").

Decorator — with arguments
@Process(name="custom_name")
async def agent(ctx: ProcessContext) -> ProcessResult:
    return ProcessResult.ok("result")
 
@Process(placement=Placement(model="opus"))
async def placed_agent(ctx: ProcessContext) -> ProcessResult:
    return ProcessResult.ok("result")

Decorator parameters:

| Parameter | Type | Description | |-----------|------|-------------| | name | str \| None | Override the auto-derived process name | | placement | Placement \| None | Default placement for this process |

Subclass
class Researcher(Process):
    def __init__(self, topic: str):
        super().__init__()
        self.topic = topic
 
    async def run(self, ctx: ProcessContext) -> ProcessResult:
        # do research
        return ProcessResult.ok({"findings": [...]})

Subclasses must implement async def run(self, ctx) -> ProcessResult.

Process Methods

| Method | Signature | Description | |--------|-----------|-------------| | run | async (ctx: ProcessContext) -> ProcessResult | The main logic. Override in subclasses. | | execute | async (ctx: ProcessContext) -> ProcessResult | Wraps run() with timing and error capture. Never override this. | | optional | () -> Process | Returns a copy marked optional. Optional process failures don't halt PAR. | | with_channels | (**channels: Chan) -> Process | Returns a copy with named channels bound. | | with_name | (name: str) -> Process | Returns a copy with a new name. |

Process Properties

| Property | Type | Description | |----------|------|-------------| | name | str | The process name (auto-derived or explicit) | | process_id | str | A fresh UUID on every access |

ProcessContext

Every process receives a ProcessContext that provides access to channels, checkpoints, and metadata.

@dataclass
class ProcessContext:
    process_name: str
    process_id: str           # auto-generated UUID
    channels: dict[str, Chan] # named channels
    parent: str | None        # parent process name
    placement: Any | None     # placement info
    deadline: datetime | None # deadline if set

Methods:

| Method | Signature | Description | |--------|-----------|-------------| | recv | async (channel_name: str) -> Any | Receive a value from a named channel | | send | async (channel_name: str, value: Any) -> None | Send a value to a named channel | | checkpoint | (value: Any) -> None | Save a checkpoint value (captured in Failure if process fails) | | get_channel | (name: str) -> Chan | Get a channel by name. Raises KeyError if not found. |

Example — using context channels:

@Process
async def transformer(ctx: ProcessContext) -> ProcessResult:
    data = await ctx.recv("input")
    result = transform(data)
    await ctx.send("output", result)
    return ProcessResult.ok(result)

Example — using checkpoints:

@Process
async def multi_step(ctx: ProcessContext) -> ProcessResult:
    ctx.checkpoint("started")
    step1 = do_step_1()
    ctx.checkpoint("step1_done")
    step2 = do_step_2(step1)
    ctx.checkpoint("step2_done")
    return ProcessResult.ok(step2)

If the process fails after step 1, the Failure will have checkpoint="step1_done".