Skip to main content

Examples

This page contains practical examples for common use cases with the Ashr Labs SDK.

Table of Contents


Basic Setup

Quickest Start

from ashr_labs import AshrLabsClient

# Only need your API key — everything else is automatic
client = AshrLabsClient(api_key="tp_your_api_key_here")
datasets = client.list_datasets()

From Environment Variables

from ashr_labs import AshrLabsClient

# Reads ASHR_LABS_API_KEY (required) and ASHR_LABS_BASE_URL (optional)
client = AshrLabsClient.from_env()
datasets = client.list_datasets()

Explicit Session Info

from ashr_labs import AshrLabsClient

client = AshrLabsClient(api_key="tp_your_api_key_here")

# init() is called automatically on first API call, but you can call it explicitly
session = client.init()
print(f"User: {session['user']['email']}")
print(f"Tenant: {session['tenant']['tenant_name']}")

Agents

Create an Agent with Grading Config

Agents define how your datasets should be generated and graded. The config controls tool strictness, behavior rules, and grading thresholds.

from ashr_labs import AshrLabsClient

client = AshrLabsClient(api_key="tp_your_api_key_here")

# Create an agent for a voice-based scheduling bot
agent = client.create_agent(
name="Scheduling Bot",
description="Spanish-language healthcare appointment scheduler",
config={
# Define what tools the agent has and which are required
"tool_definitions": [
{"name": "fetch_kareo_data", "required": True, "description": "Fetch appointment availability from Kareo"},
{"name": "save_data", "required": True, "description": "Persist caller information"},
{"name": "end_session", "required": False, "description": "Formally close the conversation"},
{"name": "await_user_response", "required": False, "description": "Wait for caller input"},
],
# Rules the agent should follow
"behavior_rules": [
{"rule": "Always call fetch_kareo_data before quoting availability", "strictness": "required"},
{"rule": "Save caller name and DOB via save_data when provided", "strictness": "required"},
{"rule": "End session formally when conversation is complete", "strictness": "expected"},
],
# How the grader should handle NOT_CALLED tool calls
"grading_config": {
"tool_strictness": {
"fetch_kareo_data": "required", # must be called — failure if skipped
"save_data": "required", # must be called — failure if skipped
"end_session": "optional", # OK if agent ends conversation naturally
"await_user_response": "optional", # OK if agent handles flow without explicit wait
},
"text_similarity_threshold": 0.3, # lower threshold for multilingual agents
},
},
)

print(f"Created agent: {agent['name']} (id={agent['id']})")
# Link existing datasets to the agent
client.set_dataset_agent(dataset_id=42, agent_id=agent["id"])
client.set_dataset_agent(dataset_id=43, agent_id=agent["id"])

# View agent's datasets
resp = client.get_agent_datasets(agent["id"])
print(f"Agent '{resp['agent']['name']}' has {len(resp['datasets'])} datasets")
for ds in resp["datasets"]:
print(f" - {ds['name']} (id={ds['id']})")

Deploy Runs Under an Agent

When you deploy a run with agent_id, the dataset is automatically linked to the agent:

from ashr_labs import AshrLabsClient, EvalRunner

client = AshrLabsClient(api_key="tp_...")
agent_id = 1 # your agent's ID

# Run eval and auto-link dataset to agent
runner = EvalRunner.from_dataset(client, dataset_id=42)
run = runner.run(my_agent)
created = run.deploy(client, dataset_id=42, agent_id=agent_id)

# The dataset is now linked to the agent, and the grader will use
# the agent's grading_config for smarter NOT_CALLED recovery
graded = client.poll_run(created["id"])
metrics = graded["result"]["aggregate_metrics"]
print(f"Passed: {metrics['tests_passed']}/{metrics['total_tests']}")

Update Agent Config

# Update grading config — e.g. make a tool optional that was previously required
agent = client.update_agent(
agent_id=1,
config={
"tool_definitions": [
{"name": "fetch_data", "required": True},
{"name": "save_data", "required": True},
{"name": "end_session", "required": False},
{"name": "transfer_call", "required": False}, # newly added tool
],
"grading_config": {
"tool_strictness": {
"fetch_data": "required",
"save_data": "required",
"end_session": "optional",
"transfer_call": "expected", # should call but not a hard failure
},
},
},
)
print(f"Updated agent config: {list(agent['config'].keys())}")

List and Clean Up Agents

agents = client.list_agents()
for a in agents:
print(f"{a['name']}: {a['dataset_count']} datasets, active={a['is_active']}")

# Soft-delete an agent (datasets are unlinked, not deleted)
client.delete_agent(agent_id=3)

Working with Datasets

List All Datasets with Pagination

def get_all_datasets(client):
"""Fetch all datasets using cursor-based pagination."""
all_datasets = []
cursor = None

while True:
response = client.list_datasets(limit=50, cursor=cursor)
datasets = response["datasets"]
all_datasets.extend(datasets)

cursor = response.get("next_cursor")
if not cursor:
break

return all_datasets

# Usage
datasets = get_all_datasets(client)
print(f"Total datasets: {len(datasets)}")

Download Dataset Media Files

import urllib.request
from pathlib import Path

def download_dataset_files(client, dataset_id: int, output_dir: str):
"""Download all media files from a dataset."""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)

# Get dataset with signed URLs
dataset = client.get_dataset(
dataset_id=dataset_id,
include_signed_urls=True,
url_expires_seconds=3600
)

source = dataset.get("dataset_source", {})
actions = source.get("actions", [])

downloaded = []
for action in actions:
if "signed_url" not in action:
continue

action_id = action["action_id"]
url = action["signed_url"]

# Determine file extension from URL or default to .bin
ext = ".bin"
if ".mp3" in url:
ext = ".mp3"
elif ".wav" in url:
ext = ".wav"
elif ".json" in url:
ext = ".json"

filename = output_path / f"{action_id}{ext}"

print(f"Downloading {action_id}...")
urllib.request.urlretrieve(url, filename)
downloaded.append(filename)

return downloaded

# Usage
files = download_dataset_files(client, dataset_id=42, output_dir="./downloads")
print(f"Downloaded {len(files)} files")

Search Datasets by Name

def find_datasets_by_name(client, search_term: str):
"""Find datasets matching a search term."""
all_datasets = get_all_datasets(client)

matching = [
d for d in all_datasets
if search_term.lower() in d["name"].lower()
]

return matching

# Usage
datasets = find_datasets_by_name(client, search_term="audio")
for d in datasets:
print(f"- {d['name']} (ID: {d['id']})")

Building Runs with RunBuilder

Basic RunBuilder Usage

from ashr_labs import AshrLabsClient, RunBuilder

client = AshrLabsClient(api_key="tp_...")

run = RunBuilder()
run.start()

test = run.add_test("bank_analysis")
test.start()

# Record user actions
test.add_user_file(
file_path="datasets/tenant_1/dataset_9/bank_analysis/action_0.pdf",
description="User submits bank statement PDF"
)
test.add_user_text(
text="Please analyze this bank statement and summarize key transactions.",
description="User asks for analysis"
)

# Record agent tool calls with expected vs actual
test.add_tool_call(
expected={"tool_name": "extract_pdf_content", "arguments": {"file_path": "bank_statement.pdf"}},
actual={"tool_name": "extract_pdf_content", "arguments": {"file_path": "bank_statement.pdf", "pages": "all"}},
match_status="partial",
divergence_notes="Extra 'pages' argument in actual call",
)
test.add_tool_call(
expected={"tool_name": "analyze_transactions", "arguments": {"account_holder": "Rohan", "period": "last_month"}},
actual={"tool_name": "analyze_transactions", "arguments": {"account_holder": "Rohan", "period": "last_month"}},
match_status="exact",
)

# Record agent text responses
test.add_agent_response(
expected_response={"summary": "Based on the bank statement analysis..."},
actual_response={"summary": "After analyzing the bank statement..."},
match_status="similar",
semantic_similarity=0.89,
divergence_notes="Slightly different wording but same key information",
)

test.complete()
run.complete()

# Deploy to the API
created_run = run.deploy(client, dataset_id=42)
print(f"Run #{created_run['id']} created")

Multiple Tests in a Single Run

from ashr_labs import RunBuilder

run = RunBuilder()
run.start()

# First test
test1 = run.add_test("pdf_extraction")
test1.start()
test1.add_user_file(file_path="data/invoice.pdf", description="Upload invoice")
test1.add_tool_call(
expected={"tool_name": "extract_pdf", "arguments": {"file": "invoice.pdf"}},
actual={"tool_name": "extract_pdf", "arguments": {"file": "invoice.pdf"}},
match_status="exact",
)
test1.complete()

# Second test
test2 = run.add_test("summary_generation")
test2.start()
test2.add_user_text(text="Summarize the invoice", description="User request")
test2.add_agent_response(
expected_response={"summary": "Invoice total: $500"},
actual_response={"summary": "The invoice totals $500"},
match_status="similar",
semantic_similarity=0.93,
)
test2.complete()

run.complete()

# Inspect the built result before deploying
result = run.build()
print(f"Total tests: {result['aggregate_metrics']['total_tests']}")
print(f"Tests completed: {result['aggregate_metrics']['tests_completed']}")
# Full pass/fail metrics available after deploy + poll_run()

Using build() for Inspection Before Deploy

run = RunBuilder()
run.start()

test = run.add_test("my_test")
test.start()
test.add_user_text(text="Hello", description="Greeting")
test.add_tool_call(
expected={"tool_name": "greet", "arguments": {}},
actual={"tool_name": "greet", "arguments": {"formal": True}},
match_status="partial",
divergence_notes="Extra 'formal' argument",
)
test.complete()
run.complete()

# Inspect the result dict
result = run.build()
print(result["aggregate_metrics"])
# {'total_tests': 1, 'tests_completed': 1, 'tests_errored': 0}

# Deploy and wait for server-side grading
created = run.deploy(client, dataset_id=42)
graded = client.poll_run(created["id"])
print(graded["result"]["aggregate_metrics"])
# {'total_tests': 1, 'tests_passed': 1, 'tests_failed': 0,
# 'total_tool_call_divergence': 1, 'total_response_divergence': 0, ...}

VM Stream Logs

Attach Browser Session Logs

For agents that operate in a browser (via Browserbase, Scrapybara, Steel, etc.), attach VM session logs to each test:

from ashr_labs import RunBuilder

run = RunBuilder()
run.start()

test = run.add_test("checkout_flow")
test.start()

# Your agent runs through the checkout flow...
test.add_user_text(text="Buy the blue shoes in size 10", description="User request")

from ashr_labs.comparators import compare_args_structural
expected_tc = {"tool_name": "navigate", "arguments": {"url": "https://shop.example.com/shoes"}}
actual_tc = {"tool_name": "navigate", "arguments": {"url": "https://shop.example.com/shoes"}}
status, arg_comp = compare_args_structural(expected_tc, actual_tc)
test.add_tool_call(
expected=expected_tc,
actual=actual_tc,
match_status=status,
argument_comparison=arg_comp,
)
test.add_agent_response(
expected_response={"text": "I found the blue shoes. Adding size 10 to cart."},
actual_response={"text": "Found them! Adding the blue shoes size 10 to your cart now."},
match_status="similar",
semantic_similarity=0.87,
)

# Attach the VM session logs from your browser provider
test.set_vm_stream(
provider="browserbase",
session_id="sess_abc123def456",
duration_ms=12000,
logs=[
{"ts": 0, "type": "navigation", "data": {"url": "https://shop.example.com"}},
{"ts": 800, "type": "action", "data": {"action": "click", "selector": ".product-card.blue-shoes"}},
{"ts": 1500, "type": "navigation", "data": {"url": "https://shop.example.com/shoes/blue-runner"}},
{"ts": 2200, "type": "action", "data": {"action": "select", "selector": "#size-dropdown", "value": "10"}},
{"ts": 2800, "type": "action", "data": {"action": "click", "selector": "#add-to-cart"}},
{"ts": 3500, "type": "network", "data": {"method": "POST", "url": "/api/cart/add", "status": 200}},
{"ts": 4000, "type": "action", "data": {"action": "click", "selector": "#checkout-btn"}},
{"ts": 5000, "type": "navigation", "data": {"url": "https://shop.example.com/checkout"}},
{"ts": 8000, "type": "action", "data": {"action": "click", "selector": "#place-order"}},
{"ts": 9500, "type": "network", "data": {"method": "POST", "url": "/api/orders", "status": 201}},
{"ts": 10000, "type": "navigation", "data": {"url": "https://shop.example.com/order-confirmation"}},
],
metadata={
"os": "linux",
"browser": "chromium",
"viewport": {"width": 1280, "height": 720},
},
)

test.complete()
run.complete()

# Deploy — the VM logs are included in the run result
created = run.deploy(client, dataset_id=42)

Log Types

Common log entry types your VM provider might emit:

TypeDescriptionExample data
navigationPage navigation{"url": "https://..."}
actionUser interaction{"action": "click", "selector": "#btn"}
networkHTTP request{"method": "POST", "url": "/api/...", "status": 200}
consoleBrowser console{"level": "warn", "message": "Deprecated API"}
errorError occurred{"message": "Element not found: #submit"}
screenshotScreenshot taken{"s3_key": "vm-streams/.../frame.png"}

Kernel Browser Session

Use the set_kernel_vm() convenience method for Kernel browser sessions — it sets the provider and exposes Kernel-specific metadata as named parameters:

test.set_kernel_vm(
session_id="kern_sess_abc123",
duration_ms=15000,
logs=[
{"ts": 0, "type": "navigation", "data": {"url": "https://app.example.com"}},
{"ts": 1200, "type": "action", "data": {"action": "click", "selector": "#login"}},
{"ts": 2500, "type": "action", "data": {"action": "type", "selector": "#email", "value": "user@example.com"}},
{"ts": 3800, "type": "action", "data": {"action": "click", "selector": "#submit"}},
{"ts": 5000, "type": "navigation", "data": {"url": "https://app.example.com/dashboard"}},
{"ts": 8000, "type": "screenshot", "data": {"s3_key": "vm-streams/.../dashboard.png"}},
],
replay_id="replay_abc123",
replay_view_url="https://www.kernel.sh/replays/replay_abc123",
stealth=True,
viewport={"width": 1920, "height": 1080},
)

Minimal VM Stream (Logs Only)

You don't need all fields — at minimum just pass the provider and logs:

test.set_vm_stream(
provider="steel",
logs=[
{"ts": 0, "type": "navigation", "data": {"url": "https://app.example.com"}},
{"ts": 5000, "type": "error", "data": {"message": "Login failed: invalid credentials"}},
],
)

Observability — Production Tracing

Full Agent Trace with Context Managers

from ashr_labs import AshrLabsClient

client = AshrLabsClient(api_key="tp_...")

with client.trace("support-agent", user_id="user_42", session_id="conv_001",
metadata={"version": "1.0"}, tags=["prod"]) as trace:

# LLM call: classify intent
with trace.generation("classify-intent", model="claude-sonnet-4-6",
input=[{"role": "user", "content": "I can't log in"}]) as gen:
# ... call your LLM here ...
gen.end(
output={"intent": "account_lockout", "confidence": 0.95},
usage={"input_tokens": 45, "output_tokens": 18},
)

# Tool call: lookup account
with trace.span("tool:lookup_account", input={"user_id": "user_42"}) as tool:
# ... call your tool here ...
tool.end(output={"status": "locked", "reason": "too_many_attempts"})

# Tool call: unlock account
with trace.span("tool:unlock_account", input={"user_id": "user_42"}) as tool:
tool.end(output={"success": True})

# Nested: compose response with guardrail
with trace.span("compose-response") as compose:
with compose.generation("generate-reply", model="claude-sonnet-4-6") as gen:
gen.end(
output={"content": "I've unlocked your account."},
usage={"input_tokens": 80, "output_tokens": 25},
)
compose.event("guardrail:pii-check", input={"pii_detected": False})
compose.end(output={"response": "I've unlocked your account."})

# Point-in-time event
trace.event("guardrail:toxicity", input={"toxic": False}, level="DEFAULT")

# trace.end() is called automatically — never crashes your agent
print(f"Trace ID: {trace.trace_id}")

Error Tracking

Spans that raise exceptions auto-end with level="ERROR":

with client.trace("risky-agent") as trace:
with trace.span("tool:external_api") as tool:
response = call_external_api(...) # if this throws...
tool.end(output=response)
# ...the span auto-ends with level="ERROR" and the exception propagates

# The trace still flushes — you can see the error in analytics

Analytics Dashboard

# Overview: traces, tokens, errors, latency
analytics = client.get_observability_analytics(days=7)
overview = analytics["overview"]
print(f"Traces: {overview['total_traces']}")
print(f"Tokens: {overview['total_input_tokens']} in / {overview['total_output_tokens']} out")
print(f"Error rate: {overview['error_rate']}")
print(f"Avg latency: {overview['avg_latency_ms']}ms")

# Per-tool performance
for tool in analytics["tool_performance"]:
print(f" {tool['tool_name']}: {tool['total_calls']} calls, {tool['error_rate']} error rate")

# Per-model usage
for model in analytics["model_usage"]:
print(f" {model['model']}: {model['total_calls']} calls, {model['total_tokens']} tokens")

# Error log
errors = client.get_observability_errors(days=7, limit=10)
for t in errors["traces"]:
print(f" {t['trace_name']}: {t['error_count']} errors")

Filtering Traces

# By user
traces = client.list_observability_traces(user_id="user_42")

# By session
traces = client.list_observability_traces(session_id="conv_001")

# Pagination
page1 = client.list_observability_traces(limit=20, page=1)
page2 = client.list_observability_traces(limit=20, page=2)

Managing Test Runs

Create a Comprehensive Test Run

from datetime import datetime

def submit_test_results(
client,
tenant_id: int,
dataset_id: int,
test_results: dict,
metadata: dict = None
):
"""Submit comprehensive test results."""
result = {
"timestamp": datetime.utcnow().isoformat(),
"status": test_results.get("status", "unknown"),
"score": test_results.get("score"),
"metrics": test_results.get("metrics", {}),
"test_cases": test_results.get("test_cases", []),
"metadata": metadata or {},
"environment": {
"python_version": "3.11",
"platform": "linux"
}
}

run = client.create_run(
tenant_id=tenant_id,
dataset_id=dataset_id,
result=result
)

return run

# Usage
test_results = {
"status": "passed",
"score": 0.95,
"metrics": {
"accuracy": 0.98,
"precision": 0.96,
"recall": 0.94,
"f1_score": 0.95
},
"test_cases": [
{"name": "test_audio_quality", "passed": True, "duration_ms": 150},
{"name": "test_voice_match", "passed": True, "duration_ms": 200},
{"name": "test_latency", "passed": True, "duration_ms": 50}
]
}

run = submit_test_results(
client,
tenant_id=1,
dataset_id=42,
test_results=test_results,
metadata={"version": "1.0.0", "branch": "main"}
)

Compare Test Runs

def compare_runs(client, run_id_1: int, run_id_2: int):
"""Compare metrics between two test runs."""
run1 = client.get_run(run_id=run_id_1)
run2 = client.get_run(run_id=run_id_2)

metrics1 = run1["result"].get("metrics", {})
metrics2 = run2["result"].get("metrics", {})

comparison = {}
all_keys = set(metrics1.keys()) | set(metrics2.keys())

for key in all_keys:
val1 = metrics1.get(key)
val2 = metrics2.get(key)

if val1 is not None and val2 is not None:
diff = val2 - val1
pct_change = (diff / val1 * 100) if val1 != 0 else 0
comparison[key] = {
"run_1": val1,
"run_2": val2,
"diff": diff,
"pct_change": round(pct_change, 2)
}

return comparison

# Usage
comparison = compare_runs(client, run_id_1=100, run_id_2=101)
for metric, values in comparison.items():
print(f"{metric}: {values['run_1']} -> {values['run_2']} ({values['pct_change']:+.2f}%)")

Get Latest Run for Dataset

def get_latest_run(client, dataset_id: int):
"""Get the most recent run for a dataset."""
response = client.list_runs(dataset_id=dataset_id, limit=1)
runs = response.get("runs", [])

if runs:
return runs[0]
return None

# Usage
latest = get_latest_run(client, dataset_id=42)
if latest:
print(f"Latest run: #{latest['id']} - {latest['result']['status']}")

Submitting Requests

Audio Generation Request

def request_audio_generation(client, text: str, voice: str = "alloy", format: str = "mp3"):
"""Submit an audio generation request."""
return client.create_request(
request_name=f"Audio: {text[:30]}...",
request={
"type": "audio_generation",
"text": text,
"voice": voice,
"format": format,
"speed": 1.0
}
)

# Usage
req = request_audio_generation(client, text="Welcome to our testing platform!", voice="nova")
print(f"Request #{req['id']} submitted")

Poll for Request Completion

# For requests (dataset generation), use the built-in wait_for_request:
req = client.create_request(request_name="My Eval", request=config)
completed = client.wait_for_request(req["id"], timeout=300)
print(f"Request completed: {completed['request_status']}")

Poll for Run Grading

# For runs (after deploy), use poll_run:
created = run.deploy(client, dataset_id=42)
graded = client.poll_run(
created["id"],
timeout=300,
on_poll=lambda elapsed, r: print(f" Grading... ({elapsed}s)"),
)
metrics = graded["result"]["aggregate_metrics"]
print(f"Passed: {metrics['tests_passed']}/{metrics['total_tests']}")

CI/CD Integration

GitHub Actions Integration

#!/usr/bin/env python3
"""CI/CD script for running tests against the Ashr Labs."""

import os
import sys
from ashr_labs import AshrLabsClient, AshrLabsError

def main():
# Get configuration from environment
client = AshrLabsClient.from_env() # reads ASHR_LABS_API_KEY
dataset_id = int(os.environ["ASHR_LABS_DATASET_ID"])

# Run your tests (placeholder)
print("Running tests...")
test_results = run_tests() # Your test function

# Submit results
try:
run = client.create_run(
dataset_id=dataset_id,
result={
"status": "passed" if test_results["success"] else "failed",
"score": test_results["score"],
"metrics": test_results["metrics"],
"commit": os.environ.get("GITHUB_SHA", "unknown"),
"branch": os.environ.get("GITHUB_REF_NAME", "unknown"),
"workflow": os.environ.get("GITHUB_WORKFLOW", "unknown")
}
)
print(f"Results submitted: Run #{run['id']}")

# Exit with appropriate code
if test_results["success"]:
sys.exit(0)
else:
sys.exit(1)

except AshrLabsError as e:
print(f"Failed to submit results: {e}")
sys.exit(1)

def run_tests():
"""Placeholder for your actual test logic."""
return {
"success": True,
"score": 0.95,
"metrics": {"accuracy": 0.98}
}

if __name__ == "__main__":
main()

GitHub Actions Workflow

# .github/workflows/test.yml
name: Run Tests

on: [push, pull_request]

jobs:
test:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'

- name: Install dependencies
run: |
pip install ashr-labs
pip install -r requirements.txt

- name: Run tests and submit results
env:
ASHR_LABS_API_KEY: ${{ secrets.ASHR_LABS_API_KEY }}
ASHR_LABS_DATASET_ID: ${{ vars.DATASET_ID }}
run: python scripts/run_tests.py

Batch Operations

Batch Create Runs

from concurrent.futures import ThreadPoolExecutor, as_completed

def batch_create_runs(client, run_configs: list, max_workers: int = 5):
"""Create multiple runs in parallel."""
results = []
errors = []

def create_single_run(config):
return client.create_run(
dataset_id=config["dataset_id"],
result=config["result"]
)

with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_config = {
executor.submit(create_single_run, config): config
for config in run_configs
}

for future in as_completed(future_to_config):
config = future_to_config[future]
try:
run = future.result()
results.append(run)
except Exception as e:
errors.append({"config": config, "error": str(e)})

return {"created": results, "errors": errors}

# Usage
configs = [
{"dataset_id": 1, "result": {"score": 0.95}},
{"dataset_id": 2, "result": {"score": 0.87}},
{"dataset_id": 3, "result": {"score": 0.92}},
]

result = batch_create_runs(client, run_configs=configs)
print(f"Created {len(result['created'])} runs, {len(result['errors'])} errors")

Export All Runs to CSV

import csv
from datetime import datetime

def export_runs_to_csv(client, output_file: str):
"""Export all runs to a CSV file."""
# Fetch all runs
all_runs = []
response = client.list_runs(limit=100)
all_runs.extend(response.get("runs", []))

# Write to CSV
with open(output_file, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["ID", "Dataset", "Created At", "Status", "Score"])

for run in all_runs:
result = run.get("result", {})
writer.writerow([
run["id"],
run["dataset"],
run["created_at"],
result.get("status", "unknown"),
result.get("score", "N/A")
])

return len(all_runs)

# Usage
count = export_runs_to_csv(client, output_file="runs_export.csv")
print(f"Exported {count} runs to runs_export.csv")

Monitoring and Reporting

Generate Test Report

from collections import defaultdict
from datetime import datetime, timedelta

def generate_weekly_report(client):
"""Generate a weekly test report."""
# Get runs from the last 7 days
response = client.list_runs(limit=1000)
all_runs = response.get("runs", [])

# Filter to last 7 days
cutoff = datetime.utcnow() - timedelta(days=7)
recent_runs = [
r for r in all_runs
if datetime.fromisoformat(r["created_at"].replace("Z", "+00:00")).replace(tzinfo=None) > cutoff
]

# Aggregate stats
stats = {
"total_runs": len(recent_runs),
"passed": 0,
"failed": 0,
"by_dataset": defaultdict(lambda: {"passed": 0, "failed": 0, "scores": []}),
"scores": []
}

for run in recent_runs:
result = run.get("result", {})
status = result.get("status", "unknown")
score = result.get("score")
dataset_id = run["dataset"]

if status == "passed":
stats["passed"] += 1
stats["by_dataset"][dataset_id]["passed"] += 1
elif status == "failed":
stats["failed"] += 1
stats["by_dataset"][dataset_id]["failed"] += 1

if score is not None:
stats["scores"].append(score)
stats["by_dataset"][dataset_id]["scores"].append(score)

# Calculate averages
if stats["scores"]:
stats["avg_score"] = sum(stats["scores"]) / len(stats["scores"])
else:
stats["avg_score"] = None

stats["pass_rate"] = (
stats["passed"] / stats["total_runs"] * 100
if stats["total_runs"] > 0 else 0
)

return stats

# Usage
report = generate_weekly_report(client)
print(f"Weekly Report")
print(f"=============")
print(f"Total Runs: {report['total_runs']}")
print(f"Pass Rate: {report['pass_rate']:.1f}%")
print(f"Average Score: {report['avg_score']:.3f}" if report['avg_score'] else "N/A")

Monitor API Key Usage

def check_api_key_health(client):
"""Check the health and status of API keys."""
keys = client.list_api_keys(include_inactive=True)

report = {
"total": len(keys),
"active": 0,
"inactive": 0,
"expiring_soon": [],
"never_used": []
}

for key in keys:
if key["is_active"]:
report["active"] += 1

# Check if expiring within 7 days
if key.get("expires_at"):
expires = datetime.fromisoformat(key["expires_at"].replace("Z", "+00:00"))
if expires.replace(tzinfo=None) < datetime.utcnow() + timedelta(days=7):
report["expiring_soon"].append(key)

# Check if never used
if not key.get("last_used_at"):
report["never_used"].append(key)
else:
report["inactive"] += 1

return report

# Usage
health = check_api_key_health(client)
print(f"API Key Health Report")
print(f"Active: {health['active']}, Inactive: {health['inactive']}")

if health["expiring_soon"]:
print(f"\nKeys expiring soon:")
for key in health["expiring_soon"]:
print(f" - {key['name']} ({key['key_prefix']}...)")

if health["never_used"]:
print(f"\nKeys never used:")
for key in health["never_used"]:
print(f" - {key['name']} ({key['key_prefix']}...)")