Common Workflows
Workflow 1: API Call Orchestration
Fan out API calls to multiple services, collect results:
import httpx
from stbass import Process, ProcessResult, PAR, FailurePolicy
from stbass.process import ProcessContext
@Process
async def call_service_a(ctx: ProcessContext) -> ProcessResult:
async with httpx.AsyncClient() as client:
resp = await client.get("https://api.service-a.com/data")
return ProcessResult.ok(resp.json())
@Process
async def call_service_b(ctx: ProcessContext) -> ProcessResult:
async with httpx.AsyncClient() as client:
resp = await client.get("https://api.service-b.com/data")
return ProcessResult.ok(resp.json())
async def orchestrate():
result = await PAR(
call_service_a,
call_service_b,
on_failure=FailurePolicy.COLLECT,
deadline=10.0,
).run()
for r in result.successes:
print(r.value)
for f in result.failures:
print(f"Failed: {f.summary}")Workflow 2: ETL Pipeline
@Process
async def extract(ctx: ProcessContext) -> ProcessResult:
source = await ctx.recv("input")
raw_data = await fetch_from_database(source)
return ProcessResult.ok(raw_data)
@Process
async def transform(ctx: ProcessContext) -> ProcessResult:
raw = await ctx.recv("input")
cleaned = [clean_record(r) for r in raw]
return ProcessResult.ok(cleaned)
@Process
async def load(ctx: ProcessContext) -> ProcessResult:
records = await ctx.recv("input")
await write_to_warehouse(records)
return ProcessResult.ok(len(records))
async def run_etl():
result = await SEQ(extract, transform, load).run(
input_value="orders_2024"
)
if result.is_ok:
print(f"Loaded {result.value} records")
else:
for f in result.failures:
print(f.detailed)Workflow 3: Multi-Agent Research with Synthesis
async def run_research(query: str):
@Process
async def plan(ctx: ProcessContext) -> ProcessResult:
q = await ctx.recv("input")
subtopics = generate_subtopics(q)
return ProcessResult.ok(subtopics)
@Process
async def research_all(ctx: ProcessContext) -> ProcessResult:
subtopics = await ctx.recv("input")
async def research_one(idx, c):
finding = await do_research(subtopics[idx])
return ProcessResult.ok(finding)
result = await PAR_FOR(
count=len(subtopics),
factory=research_one,
on_failure=FailurePolicy.COLLECT,
deadline=60.0,
).run()
return ProcessResult.ok([r.value for r in result.successes])
@Process
async def synthesize(ctx: ProcessContext) -> ProcessResult:
findings = await ctx.recv("input")
report = create_report(findings)
return ProcessResult.ok(report)
result = await SEQ(plan, research_all, synthesize).run(
input_value=query
)
return result.valueWorkflow 4: Batch Processing with Monitoring
async def process_batch(items: list):
agg = FailureAggregator()
for batch in chunk(items, size=100):
async def process_item(idx, ctx):
item = batch[idx]
result = await handle(item)
return ProcessResult.ok(result)
result = await PAR_FOR(
count=len(batch),
factory=process_item,
on_failure=FailurePolicy.COLLECT,
deadline=120.0,
).run()
report = FailureReport()
for r in result.results:
report.add(r)
agg.add_report(report)
# Check health after each batch
if report.failure_rate > 0.5:
print("WARNING: >50% failure rate")
print(report.recommendations())
print(agg.overall_summary())Workflow 5: Script with Retry and Fallback
from stbass.failure import retry_process
from stbass.result import RetryPolicy
async def resilient_fetch(url: str):
@Process
async def primary(ctx: ProcessContext) -> ProcessResult:
async with httpx.AsyncClient() as client:
resp = await client.get(url, timeout=5.0)
return ProcessResult.ok(resp.json())
@Process
async def fallback(ctx: ProcessContext) -> ProcessResult:
return ProcessResult.ok({"cached": True, "data": get_cached(url)})
# Try primary with retries
policy = RetryPolicy(max_attempts=3, backoff="exponential", base_delay=1.0)
ctx = ProcessContext(process_name="primary")
result = await retry_process(primary, ctx, policy)
if result.is_ok:
return result.value
# Fall back to cache
ctx = ProcessContext(process_name="fallback")
fallback_result = await fallback.execute(ctx)
return fallback_result.valueWorkflow 6: MCP Tool Server Integration
from stbass.mcp import MCPServer
async def search_and_analyze(query: str):
# Connect to MCP servers
brave = MCPServer("http://localhost:3000/sse", name="brave")
filesystem = MCPServer("http://localhost:3001/sse", name="fs")
# Check health
if not await brave.health():
return {"error": "brave search unavailable"}
# Create tool processes
web_search = brave.tool("brave_web_search", timeout=15.0)
read_file = filesystem.tool("read_file", timeout=5.0)
# Run tools in parallel
result = await PAR(
web_search,
read_file,
on_failure=FailurePolicy.COLLECT,
).run()
return {
"web_results": result.results[0].value if result.results[0].is_ok else None,
"file_data": result.results[1].value if result.results[1].is_ok else None,
}