Common Workflows

Workflow 1: API Call Orchestration

Fan out API calls to multiple services, collect results:

import httpx
from stbass import Process, ProcessResult, PAR, FailurePolicy
from stbass.process import ProcessContext
 
@Process
async def call_service_a(ctx: ProcessContext) -> ProcessResult:
    async with httpx.AsyncClient() as client:
        resp = await client.get("https://api.service-a.com/data")
        return ProcessResult.ok(resp.json())
 
@Process
async def call_service_b(ctx: ProcessContext) -> ProcessResult:
    async with httpx.AsyncClient() as client:
        resp = await client.get("https://api.service-b.com/data")
        return ProcessResult.ok(resp.json())
 
async def orchestrate():
    result = await PAR(
        call_service_a,
        call_service_b,
        on_failure=FailurePolicy.COLLECT,
        deadline=10.0,
    ).run()
 
    for r in result.successes:
        print(r.value)
    for f in result.failures:
        print(f"Failed: {f.summary}")

Workflow 2: ETL Pipeline

@Process
async def extract(ctx: ProcessContext) -> ProcessResult:
    source = await ctx.recv("input")
    raw_data = await fetch_from_database(source)
    return ProcessResult.ok(raw_data)
 
@Process
async def transform(ctx: ProcessContext) -> ProcessResult:
    raw = await ctx.recv("input")
    cleaned = [clean_record(r) for r in raw]
    return ProcessResult.ok(cleaned)
 
@Process
async def load(ctx: ProcessContext) -> ProcessResult:
    records = await ctx.recv("input")
    await write_to_warehouse(records)
    return ProcessResult.ok(len(records))
 
async def run_etl():
    result = await SEQ(extract, transform, load).run(
        input_value="orders_2024"
    )
    if result.is_ok:
        print(f"Loaded {result.value} records")
    else:
        for f in result.failures:
            print(f.detailed)

Workflow 3: Multi-Agent Research with Synthesis

async def run_research(query: str):
    @Process
    async def plan(ctx: ProcessContext) -> ProcessResult:
        q = await ctx.recv("input")
        subtopics = generate_subtopics(q)
        return ProcessResult.ok(subtopics)
 
    @Process
    async def research_all(ctx: ProcessContext) -> ProcessResult:
        subtopics = await ctx.recv("input")
 
        async def research_one(idx, c):
            finding = await do_research(subtopics[idx])
            return ProcessResult.ok(finding)
 
        result = await PAR_FOR(
            count=len(subtopics),
            factory=research_one,
            on_failure=FailurePolicy.COLLECT,
            deadline=60.0,
        ).run()
        return ProcessResult.ok([r.value for r in result.successes])
 
    @Process
    async def synthesize(ctx: ProcessContext) -> ProcessResult:
        findings = await ctx.recv("input")
        report = create_report(findings)
        return ProcessResult.ok(report)
 
    result = await SEQ(plan, research_all, synthesize).run(
        input_value=query
    )
    return result.value

Workflow 4: Batch Processing with Monitoring

async def process_batch(items: list):
    agg = FailureAggregator()
 
    for batch in chunk(items, size=100):
        async def process_item(idx, ctx):
            item = batch[idx]
            result = await handle(item)
            return ProcessResult.ok(result)
 
        result = await PAR_FOR(
            count=len(batch),
            factory=process_item,
            on_failure=FailurePolicy.COLLECT,
            deadline=120.0,
        ).run()
 
        report = FailureReport()
        for r in result.results:
            report.add(r)
        agg.add_report(report)
 
        # Check health after each batch
        if report.failure_rate > 0.5:
            print("WARNING: >50% failure rate")
            print(report.recommendations())
 
    print(agg.overall_summary())

Workflow 5: Script with Retry and Fallback

from stbass.failure import retry_process
from stbass.result import RetryPolicy
 
async def resilient_fetch(url: str):
    @Process
    async def primary(ctx: ProcessContext) -> ProcessResult:
        async with httpx.AsyncClient() as client:
            resp = await client.get(url, timeout=5.0)
            return ProcessResult.ok(resp.json())
 
    @Process
    async def fallback(ctx: ProcessContext) -> ProcessResult:
        return ProcessResult.ok({"cached": True, "data": get_cached(url)})
 
    # Try primary with retries
    policy = RetryPolicy(max_attempts=3, backoff="exponential", base_delay=1.0)
    ctx = ProcessContext(process_name="primary")
    result = await retry_process(primary, ctx, policy)
 
    if result.is_ok:
        return result.value
 
    # Fall back to cache
    ctx = ProcessContext(process_name="fallback")
    fallback_result = await fallback.execute(ctx)
    return fallback_result.value

Workflow 6: MCP Tool Server Integration

from stbass.mcp import MCPServer
 
async def search_and_analyze(query: str):
    # Connect to MCP servers
    brave = MCPServer("http://localhost:3000/sse", name="brave")
    filesystem = MCPServer("http://localhost:3001/sse", name="fs")
 
    # Check health
    if not await brave.health():
        return {"error": "brave search unavailable"}
 
    # Create tool processes
    web_search = brave.tool("brave_web_search", timeout=15.0)
    read_file = filesystem.tool("read_file", timeout=5.0)
 
    # Run tools in parallel
    result = await PAR(
        web_search,
        read_file,
        on_failure=FailurePolicy.COLLECT,
    ).run()
 
    return {
        "web_results": result.results[0].value if result.results[0].is_ok else None,
        "file_data": result.results[1].value if result.results[1].is_ok else None,
    }