Placement and PLACED_PAR
Import: from stbass import Placement, PLACED_PAR
Also: from stbass.placement import Placement, PLACED_PAR, BackendRegistry, ExecutionBackend, LocalBackend
Placement
Describes where a process should execute:
opus = Placement(model="claude-opus-4-5-20250414", endpoint="anthropic")
haiku = Placement(model="claude-haiku-4-5-20250414", endpoint="anthropic")
local = Placement(model="llama-3", endpoint="local:8080", config={"temperature": 0.7})| Field | Type | Description |
|-------|------|-------------|
| model | str \| None | Model name |
| endpoint | str \| None | Endpoint identifier |
| config | dict \| None | Arbitrary backend configuration |
| Property | Type | Description |
|----------|------|-------------|
| summary | str | Human-readable summary |
PLACED_PAR
Like PAR, but each process is paired with a Placement:
result = await PLACED_PAR(
(reasoning_agent, Placement(model="opus", endpoint="anthropic")),
(classifier_agent, Placement(model="haiku", endpoint="anthropic")),
(embed_agent, Placement(model="nomic", endpoint="local:8080")),
on_failure=FailurePolicy.COLLECT,
deadline=30.0,
).run()| Parameter | Type | Description |
|-----------|------|-------------|
| *pairs | tuple[Process, Placement] | Process-placement pairs |
| on_failure | FailurePolicy | Failure handling |
| deadline | float \| None | Timeout in seconds |
| registry | BackendRegistry \| None | Custom registry (uses global default if omitted) |
Returns a PARResult. Placement info is captured in Failure.placement on failure.
ExecutionBackend, LocalBackend, BackendRegistry
Import: from stbass.placement import ExecutionBackend, LocalBackend, BackendRegistry
ExecutionBackend (Abstract Base Class)
Implement this to create custom execution backends:
class MyCloudBackend(ExecutionBackend):
async def execute(self, process, ctx):
# route to cloud
return await process.execute(ctx)
async def health_check(self):
return True
@property
def concurrency_limit(self):
return 100
@property
def name(self):
return "my-cloud"| Method/Property | Description |
|-----------------|-------------|
| execute(process, ctx) | Execute a process (async) |
| health_check() | Check if backend is available (async) |
| concurrency_limit | Max concurrent processes (property) |
| name | Backend identifier (property) |
LocalBackend
The default backend. Runs processes directly via asyncio:
backend = LocalBackend()
result = await backend.execute(my_process, ctx)BackendRegistry
Maps placement specs to backends:
registry = BackendRegistry()
registry.register("anthropic", my_anthropic_backend)
registry.register("local", my_local_backend)
backend = registry.get(Placement(endpoint="anthropic")) # returns my_anthropic_backend
backend = registry.get(Placement(endpoint="unknown")) # returns default (LocalBackend)| Method/Property | Description |
|-----------------|-------------|
| register(name, backend) | Register a named backend |
| get(placement) | Look up backend by placement (falls back to default) |
| default | The default LocalBackend |