Placement and PLACED_PAR

Import: from stbass import Placement, PLACED_PAR Also: from stbass.placement import Placement, PLACED_PAR, BackendRegistry, ExecutionBackend, LocalBackend

Placement

Describes where a process should execute:

opus = Placement(model="claude-opus-4-5-20250414", endpoint="anthropic")
haiku = Placement(model="claude-haiku-4-5-20250414", endpoint="anthropic")
local = Placement(model="llama-3", endpoint="local:8080", config={"temperature": 0.7})

| Field | Type | Description | |-------|------|-------------| | model | str \| None | Model name | | endpoint | str \| None | Endpoint identifier | | config | dict \| None | Arbitrary backend configuration |

| Property | Type | Description | |----------|------|-------------| | summary | str | Human-readable summary |

PLACED_PAR

Like PAR, but each process is paired with a Placement:

result = await PLACED_PAR(
    (reasoning_agent, Placement(model="opus", endpoint="anthropic")),
    (classifier_agent, Placement(model="haiku", endpoint="anthropic")),
    (embed_agent, Placement(model="nomic", endpoint="local:8080")),
    on_failure=FailurePolicy.COLLECT,
    deadline=30.0,
).run()

| Parameter | Type | Description | |-----------|------|-------------| | *pairs | tuple[Process, Placement] | Process-placement pairs | | on_failure | FailurePolicy | Failure handling | | deadline | float \| None | Timeout in seconds | | registry | BackendRegistry \| None | Custom registry (uses global default if omitted) |

Returns a PARResult. Placement info is captured in Failure.placement on failure.



ExecutionBackend, LocalBackend, BackendRegistry

Import: from stbass.placement import ExecutionBackend, LocalBackend, BackendRegistry

ExecutionBackend (Abstract Base Class)

Implement this to create custom execution backends:

class MyCloudBackend(ExecutionBackend):
    async def execute(self, process, ctx):
        # route to cloud
        return await process.execute(ctx)
 
    async def health_check(self):
        return True
 
    @property
    def concurrency_limit(self):
        return 100
 
    @property
    def name(self):
        return "my-cloud"

| Method/Property | Description | |-----------------|-------------| | execute(process, ctx) | Execute a process (async) | | health_check() | Check if backend is available (async) | | concurrency_limit | Max concurrent processes (property) | | name | Backend identifier (property) |

LocalBackend

The default backend. Runs processes directly via asyncio:

backend = LocalBackend()
result = await backend.execute(my_process, ctx)

BackendRegistry

Maps placement specs to backends:

registry = BackendRegistry()
registry.register("anthropic", my_anthropic_backend)
registry.register("local", my_local_backend)
 
backend = registry.get(Placement(endpoint="anthropic"))  # returns my_anthropic_backend
backend = registry.get(Placement(endpoint="unknown"))     # returns default (LocalBackend)

| Method/Property | Description | |-----------------|-------------| | register(name, backend) | Register a named backend | | get(placement) | Look up backend by placement (falls back to default) | | default | The default LocalBackend |