Getting Started

Installation

pip install stbass

From source (development)

git clone <repo-url>
cd stbass
python3 -m venv .venv
source .venv/bin/activate
pip install -e ".[dev]"

Requirements

  • Python >= 3.11
  • Runtime dependencies: pydantic>=2.0, httpx>=0.27
  • Dev dependencies: pytest>=8.0, pytest-asyncio>=0.23, rich>=13.0

Verify your install

from stbass import PAR, SEQ, ALT, Chan, Process
print("stbass ready")

Core Mental Model

stbass models concurrent computation as a graph of Processes that communicate through Channels and compose via algebraic operators.

  1. Everything is a Process. An agent, an LLM call, a database write — if it's async work, it's a process.
  2. Channels are Typed Pipes. Channels provide zero-buffered, typed communication between exactly two processes.
  3. Composition is Algebraic. Processes are never orchestrated by an obscure workflow engine; you compose them using explicit operators: PAR for parallel, SEQ for sequential, and ALT for alternation.

Your first stbass program

Let's build a simple program that fires up two agents in parallel: a producer that does some work, and a consumer that waits for the result via a typed channel.

import asyncio
from stbass import PAR, Process, Chan, ProcessContext, ProcessResult
 
@Process
async def producer(ctx: ProcessContext) -> ProcessResult:
    # Do some work
    data = {"query": "hello world"}
    
    # Send it across the channel
    # This block until the consumer is ready to receive it!
    await ctx.send("data_chan", data)
    
    return ProcessResult.ok("sent successfully")
 
@Process
async def consumer(ctx: ProcessContext) -> ProcessResult:
    # Wait for the producer to send data
    data = await ctx.recv("data_chan")
    
    return ProcessResult.ok(f"processed: {data}")
 
async def main():
    # 1. Create a typed channel
    ch = Chan(dict, name="data_chan")
    
    # 2. Compose the processes in parallel and bind them to the channel
    result = await PAR(
        producer.with_channels(data_chan=ch),
        consumer.with_channels(data_chan=ch)
    ).run()
    
    # 3. Handle the result
    if result.is_ok:
        print("Pipeline succeeded!")
        print("Returns:", result.value)
    else:
        print("Pipeline failed:", result.failures[0].summary)
 
if __name__ == "__main__":
    asyncio.run(main())

Architecture Overview

                    +------------------+
                    |   Composition    |
                    |  SEQ  PAR  ALT   |
                    +--------+---------+
                             |
              +--------------+--------------+
              |              |              |
        +-----------+  +-----------+  +-----------+
        |  Process  |  |  Process  |  |  Process  |
        |  (agent)  |  |  (agent)  |  |  (agent)  |
        +-----+-----+  +-----+-----+  +-----+-----+
              |              |              |
         +----+----+    +----+----+    +----+----+
         |  Chan   |    |  Chan   |    |  Chan   |
         | (typed) |    | (typed) |    | (typed) |
         +---------+    +---------+    +---------+

Where to go next

  • Learn how to define asynchronous units of work with Process.
  • Learn how to synchronize and communicate safely through Channels.
  • Explore algebraic composition via PAR, SEQ, and ALT.