FailurePolicy and RetryPolicy
Import: from stbass import FailurePolicy
Also: from stbass.result import RetryPolicy
FailurePolicy
class FailurePolicy(Enum):
HALT = "halt" # stop on first failure
COLLECT = "collect" # run all, collect resultsFactory for RetryPolicy:
policy = FailurePolicy.retry(
max_attempts=5,
backoff="exponential", # or "linear" or "constant"
base_delay=1.0,
)RetryPolicy
policy = RetryPolicy(max_attempts=5, backoff="exponential", base_delay=1.0)| Field | Type | Default | Description |
|-------|------|---------|-------------|
| max_attempts | int | required | Maximum number of attempts |
| backoff | str | "exponential" | Backoff strategy |
| base_delay | float | 1.0 | Base delay in seconds |
Backoff strategies:
| Strategy | Formula | Example (base=1.0) |
|----------|---------|---------------------|
| "exponential" | base * 2^attempt | 1s, 2s, 4s, 8s, 16s |
| "linear" | base * (attempt + 1) | 1s, 2s, 3s, 4s, 5s |
| "constant" | base | 1s, 1s, 1s, 1s, 1s |
policy = RetryPolicy(max_attempts=3, backoff="exponential", base_delay=0.5)
policy.delay_for(0) # 0.5
policy.delay_for(1) # 1.0
policy.delay_for(2) # 2.0FailureReport
Import: from stbass import FailureReport
Aggregated analysis of process execution results with pattern detection.
Building a Report
report = FailureReport()
for r in par_result.results:
report.add(r)Fields
| Field | Type | Description |
|-------|------|-------------|
| total_processes | int | Total processes tracked |
| succeeded | int | Count of successes |
| failed | int | Count of failures |
| failures | list[Failure] | All failure objects |
Properties
| Property | Type | Description |
|----------|------|-------------|
| failure_rate | float | failed / total_processes (0.0 if empty) |
| common_failure_types | dict[str, int] | Error type name to count |
| slowest_failure | Failure \| None | Failure with longest elapsed time |
Methods
| Method | Return | Description |
|--------|--------|-------------|
| add(result) | None | Add a ProcessResult to the report |
| pattern_analysis() | dict | Detect failure patterns (see below) |
| recommendations() | list[str] | Human-readable engineering recommendations |
| to_dict() | dict | Serializable representation |
| summary() | str | Multi-line text summary |
Pattern Analysis
patterns = report.pattern_analysis()Returns a dict with:
| Key | Type | Description |
|-----|------|-------------|
| "repeat_timeouts" | list[str] | Process names with repeated TimerExpired failures |
| "repeat_errors" | list[tuple[str, str]] | (process_name, error_type) pairs recurring 2+ times |
| "cascade_failures" | bool | True if consecutive failures occurred < 100ms apart |
Recommendations
recs = report.recommendations()
for r in recs:
print(r)Generates actionable messages like:
"Process 'slow_agent' timeout 3/5 runs — consider increasing deadline or optimizing""Process 'parser' consistently fails with ValueError — check input validation""Cascade failure detected — consider adding FailurePolicy.COLLECT to isolate"
Serialization
d = report.to_dict()
---
### FailureAggregator
**Import:** `from stbass.failure import FailureAggregator`
Collects `FailureReport` objects across multiple pipeline runs for longitudinal analysis.
```python
agg = FailureAggregator()
for run in range(10):
result = await PAR_FOR(count=5, factory=worker, on_failure=FailurePolicy.COLLECT).run()
report = FailureReport()
for r in result.results:
report.add(r)
agg.add_report(report)
print(agg.overall_summary())
worst = agg.worst_processes(n=3)
print(f"Worst processes: {worst}")| Method | Return | Description |
|--------|--------|-------------|
| add_report(report) | None | Add a FailureReport |
| overall_summary() | str | Summary across all reports |
| worst_processes(n=5) | list[str] | Process names with highest failure counts |
retry_process
Import: from stbass.failure import retry_process
Retry a process according to a RetryPolicy:
from stbass.failure import retry_process
from stbass.result import RetryPolicy
policy = RetryPolicy(max_attempts=5, backoff="exponential", base_delay=0.5)
ctx = ProcessContext(process_name="flaky_agent")
result = await retry_process(my_agent, ctx, policy)| Parameter | Type | Description |
|-----------|------|-------------|
| process | Process | The process to retry |
| ctx | ProcessContext | Context to pass on each attempt |
| policy | RetryPolicy | Retry configuration |
Returns the first successful ProcessResult, or the last failure if all attempts are exhausted.
Behavior:
- Execute the process
- If
is_ok, return immediately - If
is_failand attempts remain, waitpolicy.delay_for(attempt)seconds, then retry - If all attempts exhausted, return the final failure