Examples
This page contains practical examples for common use cases with the Ashr Labs SDK.
Table of Contents
- Basic Setup
- Agents
- Working with Datasets
- Building Runs with RunBuilder
- VM Stream Logs
- Observability — Production Tracing
- Managing Test Runs
- Submitting Requests
- CI/CD Integration
- Batch Operations
- Monitoring and Reporting
Basic Setup
Quickest Start
from ashr_labs import AshrLabsClient
# Only need your API key — everything else is automatic
client = AshrLabsClient(api_key="tp_your_api_key_here")
datasets = client.list_datasets()
From Environment Variables
from ashr_labs import AshrLabsClient
# Reads ASHR_LABS_API_KEY (required) and ASHR_LABS_BASE_URL (optional)
client = AshrLabsClient.from_env()
datasets = client.list_datasets()
Explicit Session Info
from ashr_labs import AshrLabsClient
client = AshrLabsClient(api_key="tp_your_api_key_here")
# init() is called automatically on first API call, but you can call it explicitly
session = client.init()
print(f"User: {session['user']['email']}")
print(f"Tenant: {session['tenant']['tenant_name']}")
Agents
Create an Agent with Grading Config
Agents define how your datasets should be generated and graded. The config controls tool strictness, behavior rules, and grading thresholds.
from ashr_labs import AshrLabsClient
client = AshrLabsClient(api_key="tp_your_api_key_here")
# Create an agent for a voice-based scheduling bot
agent = client.create_agent(
name="Scheduling Bot",
description="Spanish-language healthcare appointment scheduler",
config={
# Define what tools the agent has and which are required
"tool_definitions": [
{"name": "fetch_kareo_data", "required": True, "description": "Fetch appointment availability from Kareo"},
{"name": "save_data", "required": True, "description": "Persist caller information"},
{"name": "end_session", "required": False, "description": "Formally close the conversation"},
{"name": "await_user_response", "required": False, "description": "Wait for caller input"},
],
# Rules the agent should follow
"behavior_rules": [
{"rule": "Always call fetch_kareo_data before quoting availability", "strictness": "required"},
{"rule": "Save caller name and DOB via save_data when provided", "strictness": "required"},
{"rule": "End session formally when conversation is complete", "strictness": "expected"},
],
# How the grader should handle NOT_CALLED tool calls
"grading_config": {
"tool_strictness": {
"fetch_kareo_data": "required", # must be called — failure if skipped
"save_data": "required", # must be called — failure if skipped
"end_session": "optional", # OK if agent ends conversation naturally
"await_user_response": "optional", # OK if agent handles flow without explicit wait
},
"text_similarity_threshold": 0.3, # lower threshold for multilingual agents
},
},
)
print(f"Created agent: {agent['name']} (id={agent['id']})")
Link Datasets to an Agent
# Link existing datasets to the agent
client.set_dataset_agent(dataset_id=42, agent_id=agent["id"])
client.set_dataset_agent(dataset_id=43, agent_id=agent["id"])
# View agent's datasets
resp = client.get_agent_datasets(agent["id"])
print(f"Agent '{resp['agent']['name']}' has {len(resp['datasets'])} datasets")
for ds in resp["datasets"]:
print(f" - {ds['name']} (id={ds['id']})")
Deploy Runs Under an Agent
When you deploy a run with agent_id, the dataset is automatically linked to the agent:
from ashr_labs import AshrLabsClient, EvalRunner
client = AshrLabsClient(api_key="tp_...")
agent_id = 1 # your agent's ID
# Run eval and auto-link dataset to agent
runner = EvalRunner.from_dataset(client, dataset_id=42)
run = runner.run(my_agent)
created = run.deploy(client, dataset_id=42, agent_id=agent_id)
# The dataset is now linked to the agent, and the grader will use
# the agent's grading_config for smarter NOT_CALLED recovery
graded = client.poll_run(created["id"])
metrics = graded["result"]["aggregate_metrics"]
print(f"Passed: {metrics['tests_passed']}/{metrics['total_tests']}")
Update Agent Config
# Update grading config — e.g. make a tool optional that was previously required
agent = client.update_agent(
agent_id=1,
config={
"tool_definitions": [
{"name": "fetch_data", "required": True},
{"name": "save_data", "required": True},
{"name": "end_session", "required": False},
{"name": "transfer_call", "required": False}, # newly added tool
],
"grading_config": {
"tool_strictness": {
"fetch_data": "required",
"save_data": "required",
"end_session": "optional",
"transfer_call": "expected", # should call but not a hard failure
},
},
},
)
print(f"Updated agent config: {list(agent['config'].keys())}")
List and Clean Up Agents
agents = client.list_agents()
for a in agents:
print(f"{a['name']}: {a['dataset_count']} datasets, active={a['is_active']}")
# Soft-delete an agent (datasets are unlinked, not deleted)
client.delete_agent(agent_id=3)
Working with Datasets
List All Datasets with Pagination
def get_all_datasets(client):
"""Fetch all datasets using cursor-based pagination."""
all_datasets = []
cursor = None
while True:
response = client.list_datasets(limit=50, cursor=cursor)
datasets = response["datasets"]
all_datasets.extend(datasets)
cursor = response.get("next_cursor")
if not cursor:
break
return all_datasets
# Usage
datasets = get_all_datasets(client)
print(f"Total datasets: {len(datasets)}")
Download Dataset Media Files
import urllib.request
from pathlib import Path
def download_dataset_files(client, dataset_id: int, output_dir: str):
"""Download all media files from a dataset."""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# Get dataset with signed URLs
dataset = client.get_dataset(
dataset_id=dataset_id,
include_signed_urls=True,
url_expires_seconds=3600
)
source = dataset.get("dataset_source", {})
actions = source.get("actions", [])
downloaded = []
for action in actions:
if "signed_url" not in action:
continue
action_id = action["action_id"]
url = action["signed_url"]
# Determine file extension from URL or default to .bin
ext = ".bin"
if ".mp3" in url:
ext = ".mp3"
elif ".wav" in url:
ext = ".wav"
elif ".json" in url:
ext = ".json"
filename = output_path / f"{action_id}{ext}"
print(f"Downloading {action_id}...")
urllib.request.urlretrieve(url, filename)
downloaded.append(filename)
return downloaded
# Usage
files = download_dataset_files(client, dataset_id=42, output_dir="./downloads")
print(f"Downloaded {len(files)} files")
Search Datasets by Name
def find_datasets_by_name(client, search_term: str):
"""Find datasets matching a search term."""
all_datasets = get_all_datasets(client)
matching = [
d for d in all_datasets
if search_term.lower() in d["name"].lower()
]
return matching
# Usage
datasets = find_datasets_by_name(client, search_term="audio")
for d in datasets:
print(f"- {d['name']} (ID: {d['id']})")
Building Runs with RunBuilder
Basic RunBuilder Usage
from ashr_labs import AshrLabsClient, RunBuilder
client = AshrLabsClient(api_key="tp_...")
run = RunBuilder()
run.start()
test = run.add_test("bank_analysis")
test.start()
# Record user actions
test.add_user_file(
file_path="datasets/tenant_1/dataset_9/bank_analysis/action_0.pdf",
description="User submits bank statement PDF"
)
test.add_user_text(
text="Please analyze this bank statement and summarize key transactions.",
description="User asks for analysis"
)
# Record agent tool calls with expected vs actual
test.add_tool_call(
expected={"tool_name": "extract_pdf_content", "arguments": {"file_path": "bank_statement.pdf"}},
actual={"tool_name": "extract_pdf_content", "arguments": {"file_path": "bank_statement.pdf", "pages": "all"}},
match_status="partial",
divergence_notes="Extra 'pages' argument in actual call",
)
test.add_tool_call(
expected={"tool_name": "analyze_transactions", "arguments": {"account_holder": "Rohan", "period": "last_month"}},
actual={"tool_name": "analyze_transactions", "arguments": {"account_holder": "Rohan", "period": "last_month"}},
match_status="exact",
)
# Record agent text responses
test.add_agent_response(
expected_response={"summary": "Based on the bank statement analysis..."},
actual_response={"summary": "After analyzing the bank statement..."},
match_status="similar",
semantic_similarity=0.89,
divergence_notes="Slightly different wording but same key information",
)
test.complete()
run.complete()
# Deploy to the API
created_run = run.deploy(client, dataset_id=42)
print(f"Run #{created_run['id']} created")
Multiple Tests in a Single Run
from ashr_labs import RunBuilder
run = RunBuilder()
run.start()
# First test
test1 = run.add_test("pdf_extraction")
test1.start()
test1.add_user_file(file_path="data/invoice.pdf", description="Upload invoice")
test1.add_tool_call(
expected={"tool_name": "extract_pdf", "arguments": {"file": "invoice.pdf"}},
actual={"tool_name": "extract_pdf", "arguments": {"file": "invoice.pdf"}},
match_status="exact",
)
test1.complete()
# Second test
test2 = run.add_test("summary_generation")
test2.start()
test2.add_user_text(text="Summarize the invoice", description="User request")
test2.add_agent_response(
expected_response={"summary": "Invoice total: $500"},
actual_response={"summary": "The invoice totals $500"},
match_status="similar",
semantic_similarity=0.93,
)
test2.complete()
run.complete()
# Inspect the built result before deploying
result = run.build()
print(f"Total tests: {result['aggregate_metrics']['total_tests']}")
print(f"Tests completed: {result['aggregate_metrics']['tests_completed']}")
# Full pass/fail metrics available after deploy + poll_run()
Using build() for Inspection Before Deploy
run = RunBuilder()
run.start()
test = run.add_test("my_test")
test.start()
test.add_user_text(text="Hello", description="Greeting")
test.add_tool_call(
expected={"tool_name": "greet", "arguments": {}},
actual={"tool_name": "greet", "arguments": {"formal": True}},
match_status="partial",
divergence_notes="Extra 'formal' argument",
)
test.complete()
run.complete()
# Inspect the result dict
result = run.build()
print(result["aggregate_metrics"])
# {'total_tests': 1, 'tests_completed': 1, 'tests_errored': 0}
# Deploy and wait for server-side grading
created = run.deploy(client, dataset_id=42)
graded = client.poll_run(created["id"])
print(graded["result"]["aggregate_metrics"])
# {'total_tests': 1, 'tests_passed': 1, 'tests_failed': 0,
# 'total_tool_call_divergence': 1, 'total_response_divergence': 0, ...}
VM Stream Logs
Attach Browser Session Logs
For agents that operate in a browser (via Browserbase, Scrapybara, Steel, etc.), attach VM session logs to each test:
from ashr_labs import RunBuilder
run = RunBuilder()
run.start()
test = run.add_test("checkout_flow")
test.start()
# Your agent runs through the checkout flow...
test.add_user_text(text="Buy the blue shoes in size 10", description="User request")
from ashr_labs.comparators import compare_args_structural
expected_tc = {"tool_name": "navigate", "arguments": {"url": "https://shop.example.com/shoes"}}
actual_tc = {"tool_name": "navigate", "arguments": {"url": "https://shop.example.com/shoes"}}
status, arg_comp = compare_args_structural(expected_tc, actual_tc)
test.add_tool_call(
expected=expected_tc,
actual=actual_tc,
match_status=status,
argument_comparison=arg_comp,
)
test.add_agent_response(
expected_response={"text": "I found the blue shoes. Adding size 10 to cart."},
actual_response={"text": "Found them! Adding the blue shoes size 10 to your cart now."},
match_status="similar",
semantic_similarity=0.87,
)
# Attach the VM session logs from your browser provider
test.set_vm_stream(
provider="browserbase",
session_id="sess_abc123def456",
duration_ms=12000,
logs=[
{"ts": 0, "type": "navigation", "data": {"url": "https://shop.example.com"}},
{"ts": 800, "type": "action", "data": {"action": "click", "selector": ".product-card.blue-shoes"}},
{"ts": 1500, "type": "navigation", "data": {"url": "https://shop.example.com/shoes/blue-runner"}},
{"ts": 2200, "type": "action", "data": {"action": "select", "selector": "#size-dropdown", "value": "10"}},
{"ts": 2800, "type": "action", "data": {"action": "click", "selector": "#add-to-cart"}},
{"ts": 3500, "type": "network", "data": {"method": "POST", "url": "/api/cart/add", "status": 200}},
{"ts": 4000, "type": "action", "data": {"action": "click", "selector": "#checkout-btn"}},
{"ts": 5000, "type": "navigation", "data": {"url": "https://shop.example.com/checkout"}},
{"ts": 8000, "type": "action", "data": {"action": "click", "selector": "#place-order"}},
{"ts": 9500, "type": "network", "data": {"method": "POST", "url": "/api/orders", "status": 201}},
{"ts": 10000, "type": "navigation", "data": {"url": "https://shop.example.com/order-confirmation"}},
],
metadata={
"os": "linux",
"browser": "chromium",
"viewport": {"width": 1280, "height": 720},
},
)
test.complete()
run.complete()
# Deploy — the VM logs are included in the run result
created = run.deploy(client, dataset_id=42)
Log Types
Common log entry types your VM provider might emit:
| Type | Description | Example data |
|---|---|---|
navigation | Page navigation | {"url": "https://..."} |
action | User interaction | {"action": "click", "selector": "#btn"} |
network | HTTP request | {"method": "POST", "url": "/api/...", "status": 200} |
console | Browser console | {"level": "warn", "message": "Deprecated API"} |
error | Error occurred | {"message": "Element not found: #submit"} |
screenshot | Screenshot taken | {"s3_key": "vm-streams/.../frame.png"} |
Kernel Browser Session
Use the set_kernel_vm() convenience method for Kernel browser sessions — it sets the provider and exposes Kernel-specific metadata as named parameters:
test.set_kernel_vm(
session_id="kern_sess_abc123",
duration_ms=15000,
logs=[
{"ts": 0, "type": "navigation", "data": {"url": "https://app.example.com"}},
{"ts": 1200, "type": "action", "data": {"action": "click", "selector": "#login"}},
{"ts": 2500, "type": "action", "data": {"action": "type", "selector": "#email", "value": "user@example.com"}},
{"ts": 3800, "type": "action", "data": {"action": "click", "selector": "#submit"}},
{"ts": 5000, "type": "navigation", "data": {"url": "https://app.example.com/dashboard"}},
{"ts": 8000, "type": "screenshot", "data": {"s3_key": "vm-streams/.../dashboard.png"}},
],
replay_id="replay_abc123",
replay_view_url="https://www.kernel.sh/replays/replay_abc123",
stealth=True,
viewport={"width": 1920, "height": 1080},
)
Minimal VM Stream (Logs Only)
You don't need all fields — at minimum just pass the provider and logs:
test.set_vm_stream(
provider="steel",
logs=[
{"ts": 0, "type": "navigation", "data": {"url": "https://app.example.com"}},
{"ts": 5000, "type": "error", "data": {"message": "Login failed: invalid credentials"}},
],
)
Observability — Production Tracing
Full Agent Trace with Context Managers
from ashr_labs import AshrLabsClient
client = AshrLabsClient(api_key="tp_...")
with client.trace("support-agent", user_id="user_42", session_id="conv_001",
metadata={"version": "1.0"}, tags=["prod"]) as trace:
# LLM call: classify intent
with trace.generation("classify-intent", model="claude-sonnet-4-6",
input=[{"role": "user", "content": "I can't log in"}]) as gen:
# ... call your LLM here ...
gen.end(
output={"intent": "account_lockout", "confidence": 0.95},
usage={"input_tokens": 45, "output_tokens": 18},
)
# Tool call: lookup account
with trace.span("tool:lookup_account", input={"user_id": "user_42"}) as tool:
# ... call your tool here ...
tool.end(output={"status": "locked", "reason": "too_many_attempts"})
# Tool call: unlock account
with trace.span("tool:unlock_account", input={"user_id": "user_42"}) as tool:
tool.end(output={"success": True})
# Nested: compose response with guardrail
with trace.span("compose-response") as compose:
with compose.generation("generate-reply", model="claude-sonnet-4-6") as gen:
gen.end(
output={"content": "I've unlocked your account."},
usage={"input_tokens": 80, "output_tokens": 25},
)
compose.event("guardrail:pii-check", input={"pii_detected": False})
compose.end(output={"response": "I've unlocked your account."})
# Point-in-time event
trace.event("guardrail:toxicity", input={"toxic": False}, level="DEFAULT")
# trace.end() is called automatically — never crashes your agent
print(f"Trace ID: {trace.trace_id}")
Error Tracking
Spans that raise exceptions auto-end with level="ERROR":
with client.trace("risky-agent") as trace:
with trace.span("tool:external_api") as tool:
response = call_external_api(...) # if this throws...
tool.end(output=response)
# ...the span auto-ends with level="ERROR" and the exception propagates
# The trace still flushes — you can see the error in analytics
Analytics Dashboard
# Overview: traces, tokens, errors, latency
analytics = client.get_observability_analytics(days=7)
overview = analytics["overview"]
print(f"Traces: {overview['total_traces']}")
print(f"Tokens: {overview['total_input_tokens']} in / {overview['total_output_tokens']} out")
print(f"Error rate: {overview['error_rate']}")
print(f"Avg latency: {overview['avg_latency_ms']}ms")
# Per-tool performance
for tool in analytics["tool_performance"]:
print(f" {tool['tool_name']}: {tool['total_calls']} calls, {tool['error_rate']} error rate")
# Per-model usage
for model in analytics["model_usage"]:
print(f" {model['model']}: {model['total_calls']} calls, {model['total_tokens']} tokens")
# Error log
errors = client.get_observability_errors(days=7, limit=10)
for t in errors["traces"]:
print(f" {t['trace_name']}: {t['error_count']} errors")
Filtering Traces
# By user
traces = client.list_observability_traces(user_id="user_42")
# By session
traces = client.list_observability_traces(session_id="conv_001")
# Pagination
page1 = client.list_observability_traces(limit=20, page=1)
page2 = client.list_observability_traces(limit=20, page=2)
Managing Test Runs
Create a Comprehensive Test Run
from datetime import datetime
def submit_test_results(
client,
tenant_id: int,
dataset_id: int,
test_results: dict,
metadata: dict = None
):
"""Submit comprehensive test results."""
result = {
"timestamp": datetime.utcnow().isoformat(),
"status": test_results.get("status", "unknown"),
"score": test_results.get("score"),
"metrics": test_results.get("metrics", {}),
"test_cases": test_results.get("test_cases", []),
"metadata": metadata or {},
"environment": {
"python_version": "3.11",
"platform": "linux"
}
}
run = client.create_run(
tenant_id=tenant_id,
dataset_id=dataset_id,
result=result
)
return run
# Usage
test_results = {
"status": "passed",
"score": 0.95,
"metrics": {
"accuracy": 0.98,
"precision": 0.96,
"recall": 0.94,
"f1_score": 0.95
},
"test_cases": [
{"name": "test_audio_quality", "passed": True, "duration_ms": 150},
{"name": "test_voice_match", "passed": True, "duration_ms": 200},
{"name": "test_latency", "passed": True, "duration_ms": 50}
]
}
run = submit_test_results(
client,
tenant_id=1,
dataset_id=42,
test_results=test_results,
metadata={"version": "1.0.0", "branch": "main"}
)
Compare Test Runs
def compare_runs(client, run_id_1: int, run_id_2: int):
"""Compare metrics between two test runs."""
run1 = client.get_run(run_id=run_id_1)
run2 = client.get_run(run_id=run_id_2)
metrics1 = run1["result"].get("metrics", {})
metrics2 = run2["result"].get("metrics", {})
comparison = {}
all_keys = set(metrics1.keys()) | set(metrics2.keys())
for key in all_keys:
val1 = metrics1.get(key)
val2 = metrics2.get(key)
if val1 is not None and val2 is not None:
diff = val2 - val1
pct_change = (diff / val1 * 100) if val1 != 0 else 0
comparison[key] = {
"run_1": val1,
"run_2": val2,
"diff": diff,
"pct_change": round(pct_change, 2)
}
return comparison
# Usage
comparison = compare_runs(client, run_id_1=100, run_id_2=101)
for metric, values in comparison.items():
print(f"{metric}: {values['run_1']} -> {values['run_2']} ({values['pct_change']:+.2f}%)")
Get Latest Run for Dataset
def get_latest_run(client, dataset_id: int):
"""Get the most recent run for a dataset."""
response = client.list_runs(dataset_id=dataset_id, limit=1)
runs = response.get("runs", [])
if runs:
return runs[0]
return None
# Usage
latest = get_latest_run(client, dataset_id=42)
if latest:
print(f"Latest run: #{latest['id']} - {latest['result']['status']}")
Submitting Requests
Audio Generation Request
def request_audio_generation(client, text: str, voice: str = "alloy", format: str = "mp3"):
"""Submit an audio generation request."""
return client.create_request(
request_name=f"Audio: {text[:30]}...",
request={
"type": "audio_generation",
"text": text,
"voice": voice,
"format": format,
"speed": 1.0
}
)
# Usage
req = request_audio_generation(client, text="Welcome to our testing platform!", voice="nova")
print(f"Request #{req['id']} submitted")
Poll for Request Completion
# For requests (dataset generation), use the built-in wait_for_request:
req = client.create_request(request_name="My Eval", request=config)
completed = client.wait_for_request(req["id"], timeout=300)
print(f"Request completed: {completed['request_status']}")
Poll for Run Grading
# For runs (after deploy), use poll_run:
created = run.deploy(client, dataset_id=42)
graded = client.poll_run(
created["id"],
timeout=300,
on_poll=lambda elapsed, r: print(f" Grading... ({elapsed}s)"),
)
metrics = graded["result"]["aggregate_metrics"]
print(f"Passed: {metrics['tests_passed']}/{metrics['total_tests']}")
CI/CD Integration
GitHub Actions Integration
#!/usr/bin/env python3
"""CI/CD script for running tests against the Ashr Labs."""
import os
import sys
from ashr_labs import AshrLabsClient, AshrLabsError
def main():
# Get configuration from environment
client = AshrLabsClient.from_env() # reads ASHR_LABS_API_KEY
dataset_id = int(os.environ["ASHR_LABS_DATASET_ID"])
# Run your tests (placeholder)
print("Running tests...")
test_results = run_tests() # Your test function
# Submit results
try:
run = client.create_run(
dataset_id=dataset_id,
result={
"status": "passed" if test_results["success"] else "failed",
"score": test_results["score"],
"metrics": test_results["metrics"],
"commit": os.environ.get("GITHUB_SHA", "unknown"),
"branch": os.environ.get("GITHUB_REF_NAME", "unknown"),
"workflow": os.environ.get("GITHUB_WORKFLOW", "unknown")
}
)
print(f"Results submitted: Run #{run['id']}")
# Exit with appropriate code
if test_results["success"]:
sys.exit(0)
else:
sys.exit(1)
except AshrLabsError as e:
print(f"Failed to submit results: {e}")
sys.exit(1)
def run_tests():
"""Placeholder for your actual test logic."""
return {
"success": True,
"score": 0.95,
"metrics": {"accuracy": 0.98}
}
if __name__ == "__main__":
main()
GitHub Actions Workflow
# .github/workflows/test.yml
name: Run Tests
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install ashr-labs
pip install -r requirements.txt
- name: Run tests and submit results
env:
ASHR_LABS_API_KEY: ${{ secrets.ASHR_LABS_API_KEY }}
ASHR_LABS_DATASET_ID: ${{ vars.DATASET_ID }}
run: python scripts/run_tests.py
Batch Operations
Batch Create Runs
from concurrent.futures import ThreadPoolExecutor, as_completed
def batch_create_runs(client, run_configs: list, max_workers: int = 5):
"""Create multiple runs in parallel."""
results = []
errors = []
def create_single_run(config):
return client.create_run(
dataset_id=config["dataset_id"],
result=config["result"]
)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_config = {
executor.submit(create_single_run, config): config
for config in run_configs
}
for future in as_completed(future_to_config):
config = future_to_config[future]
try:
run = future.result()
results.append(run)
except Exception as e:
errors.append({"config": config, "error": str(e)})
return {"created": results, "errors": errors}
# Usage
configs = [
{"dataset_id": 1, "result": {"score": 0.95}},
{"dataset_id": 2, "result": {"score": 0.87}},
{"dataset_id": 3, "result": {"score": 0.92}},
]
result = batch_create_runs(client, run_configs=configs)
print(f"Created {len(result['created'])} runs, {len(result['errors'])} errors")
Export All Runs to CSV
import csv
from datetime import datetime
def export_runs_to_csv(client, output_file: str):
"""Export all runs to a CSV file."""
# Fetch all runs
all_runs = []
response = client.list_runs(limit=100)
all_runs.extend(response.get("runs", []))
# Write to CSV
with open(output_file, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["ID", "Dataset", "Created At", "Status", "Score"])
for run in all_runs:
result = run.get("result", {})
writer.writerow([
run["id"],
run["dataset"],
run["created_at"],
result.get("status", "unknown"),
result.get("score", "N/A")
])
return len(all_runs)
# Usage
count = export_runs_to_csv(client, output_file="runs_export.csv")
print(f"Exported {count} runs to runs_export.csv")
Monitoring and Reporting
Generate Test Report
from collections import defaultdict
from datetime import datetime, timedelta
def generate_weekly_report(client):
"""Generate a weekly test report."""
# Get runs from the last 7 days
response = client.list_runs(limit=1000)
all_runs = response.get("runs", [])
# Filter to last 7 days
cutoff = datetime.utcnow() - timedelta(days=7)
recent_runs = [
r for r in all_runs
if datetime.fromisoformat(r["created_at"].replace("Z", "+00:00")).replace(tzinfo=None) > cutoff
]
# Aggregate stats
stats = {
"total_runs": len(recent_runs),
"passed": 0,
"failed": 0,
"by_dataset": defaultdict(lambda: {"passed": 0, "failed": 0, "scores": []}),
"scores": []
}
for run in recent_runs:
result = run.get("result", {})
status = result.get("status", "unknown")
score = result.get("score")
dataset_id = run["dataset"]
if status == "passed":
stats["passed"] += 1
stats["by_dataset"][dataset_id]["passed"] += 1
elif status == "failed":
stats["failed"] += 1
stats["by_dataset"][dataset_id]["failed"] += 1
if score is not None:
stats["scores"].append(score)
stats["by_dataset"][dataset_id]["scores"].append(score)
# Calculate averages
if stats["scores"]:
stats["avg_score"] = sum(stats["scores"]) / len(stats["scores"])
else:
stats["avg_score"] = None
stats["pass_rate"] = (
stats["passed"] / stats["total_runs"] * 100
if stats["total_runs"] > 0 else 0
)
return stats
# Usage
report = generate_weekly_report(client)
print(f"Weekly Report")
print(f"=============")
print(f"Total Runs: {report['total_runs']}")
print(f"Pass Rate: {report['pass_rate']:.1f}%")
print(f"Average Score: {report['avg_score']:.3f}" if report['avg_score'] else "N/A")
Monitor API Key Usage
def check_api_key_health(client):
"""Check the health and status of API keys."""
keys = client.list_api_keys(include_inactive=True)
report = {
"total": len(keys),
"active": 0,
"inactive": 0,
"expiring_soon": [],
"never_used": []
}
for key in keys:
if key["is_active"]:
report["active"] += 1
# Check if expiring within 7 days
if key.get("expires_at"):
expires = datetime.fromisoformat(key["expires_at"].replace("Z", "+00:00"))
if expires.replace(tzinfo=None) < datetime.utcnow() + timedelta(days=7):
report["expiring_soon"].append(key)
# Check if never used
if not key.get("last_used_at"):
report["never_used"].append(key)
else:
report["inactive"] += 1
return report
# Usage
health = check_api_key_health(client)
print(f"API Key Health Report")
print(f"Active: {health['active']}, Inactive: {health['inactive']}")
if health["expiring_soon"]:
print(f"\nKeys expiring soon:")
for key in health["expiring_soon"]:
print(f" - {key['name']} ({key['key_prefix']}...)")
if health["never_used"]:
print(f"\nKeys never used:")
for key in health["never_used"]:
print(f" - {key['name']} ({key['key_prefix']}...)")