FailurePolicy and RetryPolicy

Import: from stbass import FailurePolicy Also: from stbass.result import RetryPolicy

FailurePolicy

class FailurePolicy(Enum):
    HALT = "halt"        # stop on first failure
    COLLECT = "collect"  # run all, collect results

Factory for RetryPolicy:

policy = FailurePolicy.retry(
    max_attempts=5,
    backoff="exponential",  # or "linear" or "constant"
    base_delay=1.0,
)

RetryPolicy

policy = RetryPolicy(max_attempts=5, backoff="exponential", base_delay=1.0)

| Field | Type | Default | Description | |-------|------|---------|-------------| | max_attempts | int | required | Maximum number of attempts | | backoff | str | "exponential" | Backoff strategy | | base_delay | float | 1.0 | Base delay in seconds |

Backoff strategies:

| Strategy | Formula | Example (base=1.0) | |----------|---------|---------------------| | "exponential" | base * 2^attempt | 1s, 2s, 4s, 8s, 16s | | "linear" | base * (attempt + 1) | 1s, 2s, 3s, 4s, 5s | | "constant" | base | 1s, 1s, 1s, 1s, 1s |

policy = RetryPolicy(max_attempts=3, backoff="exponential", base_delay=0.5)
policy.delay_for(0)  # 0.5
policy.delay_for(1)  # 1.0
policy.delay_for(2)  # 2.0


FailureReport

Import: from stbass import FailureReport

Aggregated analysis of process execution results with pattern detection.

Building a Report

report = FailureReport()
for r in par_result.results:
    report.add(r)

Fields

| Field | Type | Description | |-------|------|-------------| | total_processes | int | Total processes tracked | | succeeded | int | Count of successes | | failed | int | Count of failures | | failures | list[Failure] | All failure objects |

Properties

| Property | Type | Description | |----------|------|-------------| | failure_rate | float | failed / total_processes (0.0 if empty) | | common_failure_types | dict[str, int] | Error type name to count | | slowest_failure | Failure \| None | Failure with longest elapsed time |

Methods

| Method | Return | Description | |--------|--------|-------------| | add(result) | None | Add a ProcessResult to the report | | pattern_analysis() | dict | Detect failure patterns (see below) | | recommendations() | list[str] | Human-readable engineering recommendations | | to_dict() | dict | Serializable representation | | summary() | str | Multi-line text summary |

Pattern Analysis

patterns = report.pattern_analysis()

Returns a dict with:

| Key | Type | Description | |-----|------|-------------| | "repeat_timeouts" | list[str] | Process names with repeated TimerExpired failures | | "repeat_errors" | list[tuple[str, str]] | (process_name, error_type) pairs recurring 2+ times | | "cascade_failures" | bool | True if consecutive failures occurred < 100ms apart |

Recommendations

recs = report.recommendations()
for r in recs:
    print(r)

Generates actionable messages like:

  • "Process 'slow_agent' timeout 3/5 runs — consider increasing deadline or optimizing"
  • "Process 'parser' consistently fails with ValueError — check input validation"
  • "Cascade failure detected — consider adding FailurePolicy.COLLECT to isolate"

Serialization

d = report.to_dict()
 
---
 
### FailureAggregator
 
**Import:** `from stbass.failure import FailureAggregator`
 
Collects `FailureReport` objects across multiple pipeline runs for longitudinal analysis.
 
```python
agg = FailureAggregator()
 
for run in range(10):
    result = await PAR_FOR(count=5, factory=worker, on_failure=FailurePolicy.COLLECT).run()
    report = FailureReport()
    for r in result.results:
        report.add(r)
    agg.add_report(report)
 
print(agg.overall_summary())
worst = agg.worst_processes(n=3)
print(f"Worst processes: {worst}")

| Method | Return | Description | |--------|--------|-------------| | add_report(report) | None | Add a FailureReport | | overall_summary() | str | Summary across all reports | | worst_processes(n=5) | list[str] | Process names with highest failure counts |



retry_process

Import: from stbass.failure import retry_process

Retry a process according to a RetryPolicy:

from stbass.failure import retry_process
from stbass.result import RetryPolicy
 
policy = RetryPolicy(max_attempts=5, backoff="exponential", base_delay=0.5)
ctx = ProcessContext(process_name="flaky_agent")
result = await retry_process(my_agent, ctx, policy)

| Parameter | Type | Description | |-----------|------|-------------| | process | Process | The process to retry | | ctx | ProcessContext | Context to pass on each attempt | | policy | RetryPolicy | Retry configuration |

Returns the first successful ProcessResult, or the last failure if all attempts are exhausted.

Behavior:

  1. Execute the process
  2. If is_ok, return immediately
  3. If is_fail and attempts remain, wait policy.delay_for(attempt) seconds, then retry
  4. If all attempts exhausted, return the final failure