Process
Import: from stbass import Process
Also: from stbass.process import ProcessContext
Creating Processes
Decorator — no arguments
@Process
async def agent(ctx: ProcessContext) -> ProcessResult:
return ProcessResult.ok("result")The function name becomes the process name automatically (agent.name == "agent").
Decorator — with arguments
@Process(name="custom_name")
async def agent(ctx: ProcessContext) -> ProcessResult:
return ProcessResult.ok("result")
@Process(placement=Placement(model="opus"))
async def placed_agent(ctx: ProcessContext) -> ProcessResult:
return ProcessResult.ok("result")Decorator parameters:
| Parameter | Type | Description |
|-----------|------|-------------|
| name | str \| None | Override the auto-derived process name |
| placement | Placement \| None | Default placement for this process |
Subclass
class Researcher(Process):
def __init__(self, topic: str):
super().__init__()
self.topic = topic
async def run(self, ctx: ProcessContext) -> ProcessResult:
# do research
return ProcessResult.ok({"findings": [...]})Subclasses must implement async def run(self, ctx) -> ProcessResult.
Process Methods
| Method | Signature | Description |
|--------|-----------|-------------|
| run | async (ctx: ProcessContext) -> ProcessResult | The main logic. Override in subclasses. |
| execute | async (ctx: ProcessContext) -> ProcessResult | Wraps run() with timing and error capture. Never override this. |
| optional | () -> Process | Returns a copy marked optional. Optional process failures don't halt PAR. |
| with_channels | (**channels: Chan) -> Process | Returns a copy with named channels bound. |
| with_name | (name: str) -> Process | Returns a copy with a new name. |
Process Properties
| Property | Type | Description |
|----------|------|-------------|
| name | str | The process name (auto-derived or explicit) |
| process_id | str | A fresh UUID on every access |
ProcessContext
Every process receives a ProcessContext that provides access to channels, checkpoints, and metadata.
@dataclass
class ProcessContext:
process_name: str
process_id: str # auto-generated UUID
channels: dict[str, Chan] # named channels
parent: str | None # parent process name
placement: Any | None # placement info
deadline: datetime | None # deadline if setMethods:
| Method | Signature | Description |
|--------|-----------|-------------|
| recv | async (channel_name: str) -> Any | Receive a value from a named channel |
| send | async (channel_name: str, value: Any) -> None | Send a value to a named channel |
| checkpoint | (value: Any) -> None | Save a checkpoint value (captured in Failure if process fails) |
| get_channel | (name: str) -> Chan | Get a channel by name. Raises KeyError if not found. |
Example — using context channels:
@Process
async def transformer(ctx: ProcessContext) -> ProcessResult:
data = await ctx.recv("input")
result = transform(data)
await ctx.send("output", result)
return ProcessResult.ok(result)Example — using checkpoints:
@Process
async def multi_step(ctx: ProcessContext) -> ProcessResult:
ctx.checkpoint("started")
step1 = do_step_1()
ctx.checkpoint("step1_done")
step2 = do_step_2(step1)
ctx.checkpoint("step2_done")
return ProcessResult.ok(step2)If the process fails after step 1, the Failure will have checkpoint="step1_done".