Getting Started
Installation
pip install stbassFrom source (development)
git clone <repo-url>
cd stbass
python3 -m venv .venv
source .venv/bin/activate
pip install -e ".[dev]"Requirements
- Python >= 3.11
- Runtime dependencies:
pydantic>=2.0,httpx>=0.27 - Dev dependencies:
pytest>=8.0,pytest-asyncio>=0.23,rich>=13.0
Verify your install
from stbass import PAR, SEQ, ALT, Chan, Process
print("stbass ready")Core Mental Model
stbass models concurrent computation as a graph of Processes that communicate through Channels and compose via algebraic operators.
- Everything is a Process. An agent, an LLM call, a database write — if it's async work, it's a process.
- Channels are Typed Pipes. Channels provide zero-buffered, typed communication between exactly two processes.
- Composition is Algebraic. Processes are never orchestrated by an obscure workflow engine; you compose them using explicit operators:
PARfor parallel,SEQfor sequential, andALTfor alternation.
Your first stbass program
Let's build a simple program that fires up two agents in parallel: a producer that does some work, and a consumer that waits for the result via a typed channel.
import asyncio
from stbass import PAR, Process, Chan, ProcessContext, ProcessResult
@Process
async def producer(ctx: ProcessContext) -> ProcessResult:
# Do some work
data = {"query": "hello world"}
# Send it across the channel
# This block until the consumer is ready to receive it!
await ctx.send("data_chan", data)
return ProcessResult.ok("sent successfully")
@Process
async def consumer(ctx: ProcessContext) -> ProcessResult:
# Wait for the producer to send data
data = await ctx.recv("data_chan")
return ProcessResult.ok(f"processed: {data}")
async def main():
# 1. Create a typed channel
ch = Chan(dict, name="data_chan")
# 2. Compose the processes in parallel and bind them to the channel
result = await PAR(
producer.with_channels(data_chan=ch),
consumer.with_channels(data_chan=ch)
).run()
# 3. Handle the result
if result.is_ok:
print("Pipeline succeeded!")
print("Returns:", result.value)
else:
print("Pipeline failed:", result.failures[0].summary)
if __name__ == "__main__":
asyncio.run(main())Architecture Overview
+------------------+
| Composition |
| SEQ PAR ALT |
+--------+---------+
|
+--------------+--------------+
| | |
+-----------+ +-----------+ +-----------+
| Process | | Process | | Process |
| (agent) | | (agent) | | (agent) |
+-----+-----+ +-----+-----+ +-----+-----+
| | |
+----+----+ +----+----+ +----+----+
| Chan | | Chan | | Chan |
| (typed) | | (typed) | | (typed) |
+---------+ +---------+ +---------+