Skip to content

Eval Runner

High-throughput batch evaluation with checkpointing, resumption, and timing statistics.

Overview

EvalRunner and the evaluate() convenience function provide infrastructure for evaluating datasets at scale. Features include parallel execution with rate limiting, progress tracking, automatic checkpointing for long-running jobs, and comprehensive timing/cost statistics.

Research Background

Casabianca et al. (2025) recommend maintaining a "gold set" of human-graded examples and sampling 1-5% of production traffic for continuous validation. EvalRunner provides the infrastructure for systematic evaluation with checkpointing for long-running jobs and cost tracking for budget management.

Quick Example

from autorubric import RubricDataset, LLMConfig, evaluate
from autorubric.graders import CriterionGrader

async def main():
    dataset = RubricDataset.from_file("essays.json")
    grader = CriterionGrader(
        llm_config=LLMConfig(
            model="openai/gpt-4.1-mini",
            max_parallel_requests=10,
        )
    )

    result = await evaluate(dataset, grader, show_progress=True)

    print(f"Evaluated {result.successful_items}/{result.total_items}")
    print(f"Throughput: {result.timing_stats.items_per_second:.2f} items/s")
    print(f"Total cost: ${result.total_completion_cost:.4f}")

Checkpointing and Resumption

from autorubric import EvalRunner, EvalConfig, EvalResult

# First run (may be interrupted)
config = EvalConfig(
    experiment_name="my-essay-eval",
    experiments_dir="./experiments",
    show_progress=True,
)
runner = EvalRunner(dataset=dataset, grader=grader, config=config)
result = await runner.run()
# Saves to: experiments/my-essay-eval/manifest.json + items.jsonl

# Resume after crash
runner = EvalRunner(dataset=dataset, grader=grader, config=config)
result = await runner.run()  # Skips already-completed items

# Load results later
result = EvalResult.from_experiment("experiments/my-essay-eval")

Rate Limiting

from autorubric.graders import CriterionGrader, JudgeSpec

grader = CriterionGrader(
    judges=[
        JudgeSpec(LLMConfig(model="openai/gpt-4.1", max_parallel_requests=10), "gpt"),
        JudgeSpec(LLMConfig(model="anthropic/claude-sonnet-4-5-20250929", max_parallel_requests=5), "claude"),
    ],
    aggregation="majority",
)

Rate limiting uses a global per-provider semaphore, so all openai/* models share the same limit.


evaluate

Convenience function for batch evaluation.

evaluate async

evaluate(dataset: RubricDataset, grader: Grader, *, fail_fast: bool = False, show_progress: bool = True, progress_style: Literal['simple', 'detailed'] = 'simple', max_concurrent_items: int | None = None, experiment_name: str | None = None, experiments_dir: Path | str = 'experiments', resume: bool = True) -> EvalResult

Evaluate a dataset with a grader.

Convenience wrapper around EvalRunner.

PARAMETER DESCRIPTION
dataset

The dataset to evaluate.

TYPE: RubricDataset

grader

The grader to use.

TYPE: Grader

fail_fast

Stop on first error if True.

TYPE: bool DEFAULT: False

show_progress

Display progress bars if True.

TYPE: bool DEFAULT: True

progress_style

"simple" or "detailed" progress display.

TYPE: Literal['simple', 'detailed'] DEFAULT: 'simple'

max_concurrent_items

Limit concurrent items (None = unlimited).

TYPE: int | None DEFAULT: None

experiment_name

Name for this experiment run.

TYPE: str | None DEFAULT: None

experiments_dir

Root directory for experiment outputs.

TYPE: Path | str DEFAULT: 'experiments'

resume

If True and experiment exists, resume from checkpoint.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
EvalResult

EvalResult with all results and aggregated statistics.

Example

from autorubric.eval import evaluate result = await evaluate(dataset, grader, show_progress=True) print(f"Evaluated {result.successful_items}/{result.total_items}")

Source code in src/autorubric/eval.py
async def evaluate(
    dataset: RubricDataset,
    grader: Grader,
    *,
    fail_fast: bool = False,
    show_progress: bool = True,
    progress_style: Literal["simple", "detailed"] = "simple",
    max_concurrent_items: int | None = None,
    experiment_name: str | None = None,
    experiments_dir: Path | str = "experiments",
    resume: bool = True,
) -> EvalResult:
    """Evaluate a dataset with a grader.

    Convenience wrapper around EvalRunner.

    Args:
        dataset: The dataset to evaluate.
        grader: The grader to use.
        fail_fast: Stop on first error if True.
        show_progress: Display progress bars if True.
        progress_style: "simple" or "detailed" progress display.
        max_concurrent_items: Limit concurrent items (None = unlimited).
        experiment_name: Name for this experiment run.
        experiments_dir: Root directory for experiment outputs.
        resume: If True and experiment exists, resume from checkpoint.

    Returns:
        EvalResult with all results and aggregated statistics.

    Example:
        >>> from autorubric.eval import evaluate
        >>> result = await evaluate(dataset, grader, show_progress=True)
        >>> print(f"Evaluated {result.successful_items}/{result.total_items}")
    """
    config = EvalConfig(
        fail_fast=fail_fast,
        show_progress=show_progress,
        progress_style=progress_style,
        max_concurrent_items=max_concurrent_items,
        experiment_name=experiment_name,
        experiments_dir=experiments_dir,
        resume=resume,
    )
    runner = EvalRunner(dataset=dataset, grader=grader, config=config)
    return await runner.run()

EvalRunner

Runner class for batch evaluation with checkpointing.

EvalRunner

EvalRunner(dataset: RubricDataset, grader: Grader, config: EvalConfig | None = None)

Runs batch evaluations with rate limiting and progress tracking.

This class orchestrates the evaluation of a RubricDataset using a grader, handling: - Concurrent execution with configurable parallelism - Rate limiting via LLMConfig.max_parallel_requests - Progress display with rich progress bars - Checkpointing and resumption from failures - Result aggregation with timing statistics

Example

from autorubric import LLMConfig, RubricDataset from autorubric.graders import CriterionGrader from autorubric.eval import EvalRunner, EvalConfig

dataset = RubricDataset.from_file("data.json") grader = CriterionGrader( ... llm_config=LLMConfig( ... model="openai/gpt-4", ... max_parallel_requests=10, ... ) ... )

runner = EvalRunner(dataset=dataset, grader=grader) result = await runner.run() print(f"Evaluated {result.successful_items}/{result.total_items}")

Initialize the evaluation runner.

PARAMETER DESCRIPTION
dataset

The dataset to evaluate.

TYPE: RubricDataset

grader

The grader to use for evaluation.

TYPE: Grader

config

Optional configuration. Uses defaults if not provided.

TYPE: EvalConfig | None DEFAULT: None

Source code in src/autorubric/eval.py
def __init__(
    self,
    dataset: RubricDataset,
    grader: Grader,
    config: EvalConfig | None = None,
):
    """Initialize the evaluation runner.

    Args:
        dataset: The dataset to evaluate.
        grader: The grader to use for evaluation.
        config: Optional configuration. Uses defaults if not provided.
    """
    self.dataset = dataset
    self.grader = grader
    self.config = config or EvalConfig()

    # Extract judge IDs if using ensemble grader
    self._judge_ids: list[str] = []
    if hasattr(grader, "_judges"):
        self._judge_ids = [j.judge_id for j in grader._judges]

    # Resolve experiment name
    self._experiment_name = self.config.experiment_name or _generate_experiment_name()
    self._exp_dir = Path(self.config.experiments_dir) / self._experiment_name

run async

run() -> EvalResult

Run the evaluation and return aggregated results.

RETURNS DESCRIPTION
EvalResult

EvalResult with all item results, aggregated usage/cost,

EvalResult

and timing statistics.

RAISES DESCRIPTION
RuntimeError

If fail_fast=True and any item fails.

Source code in src/autorubric/eval.py
async def run(self) -> EvalResult:
    """Run the evaluation and return aggregated results.

    Returns:
        EvalResult with all item results, aggregated usage/cost,
        and timing statistics.

    Raises:
        RuntimeError: If fail_fast=True and any item fails.
    """
    started_at = datetime.now()
    start_time = time.perf_counter()

    # Set up experiment directory and load checkpoint if resuming
    completed_indices, previous_results = self._setup_experiment(started_at)

    # Determine pending items
    pending_items = [
        (idx, item)
        for idx, item in enumerate(self.dataset)
        if idx not in completed_indices
    ]

    item_results: list[ItemResult] = list(previous_results)
    errors: list[tuple[int, str]] = []
    completed_count = len(completed_indices)

    # Create progress display
    progress: EvalProgressDisplay | None = None
    if self.config.show_progress:
        progress = EvalProgressDisplay(
            total_items=len(self.dataset),
            style=self.config.progress_style,
            judge_ids=self._judge_ids,
        )

    try:
        if progress:
            progress.__enter__()
            # Update progress to show already completed items
            for _ in range(completed_count):
                progress.advance()

        # Process remaining results as they complete
        async for result in self._run_with_streaming(pending_items):
            item_results.append(result)
            completed_count += 1

            # Persist result immediately
            self._append_item_result(result)
            self._update_manifest_indices(result.item_idx)

            if result.error:
                errors.append((result.item_idx, result.error))
                if self.config.fail_fast:
                    self._update_manifest_status("failed", error=result.error)
                    raise RuntimeError(
                        f"Evaluation failed at item {result.item_idx}: {result.error}"
                    )

            # Update progress
            if progress:
                elapsed = time.perf_counter() - start_time
                rate = completed_count / elapsed if elapsed > 0 else 0.0
                progress.advance(rate=rate)

    finally:
        if progress:
            progress.__exit__(None, None, None)

    # Sort results by item index
    item_results.sort(key=lambda r: r.item_idx)

    # Compute final metrics
    end_time = time.perf_counter()
    total_duration = end_time - start_time
    completed_at = datetime.now()

    # Aggregate usage and cost
    reports = [r.report for r in item_results if r.error is None]
    usages = [r.token_usage for r in reports if r.token_usage]
    costs = [r.completion_cost for r in reports if r.completion_cost is not None]

    total_usage = aggregate_token_usage(usages)
    total_cost = aggregate_completion_cost(costs)

    # Compute timing stats
    durations = [r.duration_seconds for r in item_results]
    timing_stats = EvalTimingStats.from_durations(durations, total_duration)

    # Update manifest to completed
    self._update_manifest_status(
        "completed",
        completed_at=completed_at,
        total_duration=total_duration,
    )

    return EvalResult(
        item_results=item_results,
        total_items=len(self.dataset),
        successful_items=len(item_results) - len(errors),
        failed_items=len(errors),
        total_token_usage=total_usage,
        total_completion_cost=total_cost,
        timing_stats=timing_stats,
        started_at=started_at,
        completed_at=completed_at,
        errors=errors,
        experiment_name=self._experiment_name,
        experiment_dir=self._exp_dir,
    )

EvalConfig

Configuration options for evaluation runs.

EvalConfig dataclass

EvalConfig(fail_fast: bool = False, show_progress: bool = True, progress_style: Literal['simple', 'detailed'] = 'simple', max_concurrent_items: int | None = None, experiment_name: str | None = None, experiments_dir: Path | str = 'experiments', resume: bool = True)

Configuration for evaluation runs.

ATTRIBUTE DESCRIPTION
fail_fast

If True, stop on first error. Default False continues all items.

TYPE: bool

show_progress

Whether to display progress bars. Default True.

TYPE: bool

progress_style

Style of progress display. - "simple": Single overall progress bar - "detailed": Shows per-judge progress for ensemble mode

TYPE: Literal['simple', 'detailed']

max_concurrent_items

Maximum items to grade concurrently. None = grade all items in parallel (default). Set this to limit memory usage for very large datasets.

TYPE: int | None

experiment_name

Name for this experiment run. If None, auto-generates using coolname.

TYPE: str | None

experiments_dir

Root directory for experiment outputs. Default is "./experiments".

TYPE: Path | str

resume

If True and experiment exists, resume from checkpoint. Default True.

TYPE: bool


EvalResult

Results from a completed evaluation run.

EvalResult dataclass

EvalResult(item_results: list[ItemResult], total_items: int, successful_items: int, failed_items: int, total_token_usage: TokenUsage | None, total_completion_cost: float | None, timing_stats: EvalTimingStats, started_at: datetime, completed_at: datetime, errors: list[tuple[int, str]] = list(), experiment_name: str | None = None, experiment_dir: Path | None = None)

Complete result from an evaluation run.

get_scores

get_scores() -> list[float]

Extract scores from all successful results.

Source code in src/autorubric/eval.py
def get_scores(self) -> list[float]:
    """Extract scores from all successful results."""
    return [r.report.score for r in self.item_results if r.error is None]

get_reports

get_reports() -> list[EvaluationReport | EnsembleEvaluationReport]

Extract reports from all successful results.

Source code in src/autorubric/eval.py
def get_reports(self) -> list[EvaluationReport | EnsembleEvaluationReport]:
    """Extract reports from all successful results."""
    return [r.report for r in self.item_results if r.error is None]

filter_successful

filter_successful() -> list[ItemResult]

Get only successful item results.

Source code in src/autorubric/eval.py
def filter_successful(self) -> list[ItemResult]:
    """Get only successful item results."""
    return [r for r in self.item_results if r.error is None]

filter_failed

filter_failed() -> list[ItemResult]

Get only failed item results.

Source code in src/autorubric/eval.py
def filter_failed(self) -> list[ItemResult]:
    """Get only failed item results."""
    return [r for r in self.item_results if r.error is not None]

compute_metrics

compute_metrics(dataset: RubricDataset, *, bootstrap: bool = False, n_bootstrap: int = 1000, per_judge: bool = False, cannot_assess: Literal['exclude', 'as_unmet'] = 'exclude', na_mode: Literal['exclude', 'as_worst'] = 'exclude', confidence_level: float = 0.95, seed: int | None = None) -> 'MetricsResult'

Compute comprehensive evaluation metrics against ground truth.

This method compares predicted verdicts and scores against ground truth from the dataset, computing criterion-level agreement metrics, score correlations, and bias analysis.

If eval_result does not contain all items from the dataset, metrics are computed only for the intersection, and a warning is included in the result.

PARAMETER DESCRIPTION
dataset

The dataset with ground truth labels.

TYPE: RubricDataset

bootstrap

If True, compute bootstrap confidence intervals (expensive).

TYPE: bool DEFAULT: False

n_bootstrap

Number of bootstrap samples if bootstrap=True.

TYPE: int DEFAULT: 1000

per_judge

If True and ensemble, compute per-judge metrics.

TYPE: bool DEFAULT: False

cannot_assess

How to handle CANNOT_ASSESS verdicts: - "exclude": Skip pairs where either is CA (default) - "as_unmet": Treat CA as UNMET

TYPE: Literal['exclude', 'as_unmet'] DEFAULT: 'exclude'

na_mode

How to handle NA options in multi-choice criteria: - "exclude": Skip pairs where either is NA (default) - "as_worst": Keep NA in metrics computation

TYPE: Literal['exclude', 'as_worst'] DEFAULT: 'exclude'

confidence_level

Confidence level for bootstrap CIs (default 0.95).

TYPE: float DEFAULT: 0.95

seed

Random seed for bootstrap reproducibility.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
'MetricsResult'

MetricsResult with comprehensive metrics. Use .summary() for

'MetricsResult'

formatted output or .to_dataframe() for export.

Example

result = await evaluate(dataset, grader) metrics = result.compute_metrics(dataset) print(metrics.summary()) print(f"Accuracy: {metrics.criterion_accuracy:.1%}") df = metrics.to_dataframe()

Source code in src/autorubric/eval.py
def compute_metrics(
    self,
    dataset: RubricDataset,
    *,
    bootstrap: bool = False,
    n_bootstrap: int = 1000,
    per_judge: bool = False,
    cannot_assess: Literal["exclude", "as_unmet"] = "exclude",
    na_mode: Literal["exclude", "as_worst"] = "exclude",
    confidence_level: float = 0.95,
    seed: int | None = None,
) -> "MetricsResult":
    """Compute comprehensive evaluation metrics against ground truth.

    This method compares predicted verdicts and scores against ground truth
    from the dataset, computing criterion-level agreement metrics, score
    correlations, and bias analysis.

    If eval_result does not contain all items from the dataset, metrics
    are computed only for the intersection, and a warning is included
    in the result.

    Args:
        dataset: The dataset with ground truth labels.
        bootstrap: If True, compute bootstrap confidence intervals (expensive).
        n_bootstrap: Number of bootstrap samples if bootstrap=True.
        per_judge: If True and ensemble, compute per-judge metrics.
        cannot_assess: How to handle CANNOT_ASSESS verdicts:
            - "exclude": Skip pairs where either is CA (default)
            - "as_unmet": Treat CA as UNMET
        na_mode: How to handle NA options in multi-choice criteria:
            - "exclude": Skip pairs where either is NA (default)
            - "as_worst": Keep NA in metrics computation
        confidence_level: Confidence level for bootstrap CIs (default 0.95).
        seed: Random seed for bootstrap reproducibility.

    Returns:
        MetricsResult with comprehensive metrics. Use .summary() for
        formatted output or .to_dataframe() for export.

    Example:
        >>> result = await evaluate(dataset, grader)
        >>> metrics = result.compute_metrics(dataset)
        >>> print(metrics.summary())
        >>> print(f"Accuracy: {metrics.criterion_accuracy:.1%}")
        >>> df = metrics.to_dataframe()
    """
    from autorubric.metrics._compute import compute_metrics as _compute

    return _compute(
        self,
        dataset,
        bootstrap=bootstrap,
        n_bootstrap=n_bootstrap,
        per_judge=per_judge,
        cannot_assess=cannot_assess,
        na_mode=na_mode,
        confidence_level=confidence_level,
        seed=seed,
    )

from_experiment classmethod

from_experiment(experiment_path: Path | str) -> EvalResult

Load EvalResult from a completed experiment directory.

PARAMETER DESCRIPTION
experiment_path

Path to the experiment directory.

TYPE: Path | str

RETURNS DESCRIPTION
EvalResult

EvalResult with loaded item results and statistics.

RAISES DESCRIPTION
FileNotFoundError

If experiment directory doesn't exist.

ValueError

If manifest is invalid or experiment is incomplete.

Source code in src/autorubric/eval.py
@classmethod
def from_experiment(cls, experiment_path: Path | str) -> EvalResult:
    """Load EvalResult from a completed experiment directory.

    Args:
        experiment_path: Path to the experiment directory.

    Returns:
        EvalResult with loaded item results and statistics.

    Raises:
        FileNotFoundError: If experiment directory doesn't exist.
        ValueError: If manifest is invalid or experiment is incomplete.
    """
    exp_dir = Path(experiment_path)
    if not exp_dir.exists():
        raise FileNotFoundError(f"Experiment directory not found: {exp_dir}")

    manifest_path = exp_dir / "manifest.json"
    if not manifest_path.exists():
        raise FileNotFoundError(f"Manifest not found: {manifest_path}")

    items_path = exp_dir / "items.jsonl"

    # Load manifest
    with open(manifest_path, encoding="utf-8") as f:
        manifest = json.load(f)

    # Load items
    item_results: list[ItemResult] = []
    if items_path.exists():
        with open(items_path, encoding="utf-8") as f:
            for line in f:
                if line.strip():
                    data = json.loads(line)
                    # Create a minimal DataItem for reconstruction
                    item = DataItem(
                        submission="",  # Submission not stored in items.jsonl
                        description=f"Item {data['item_idx']}",
                    )
                    item_results.append(ItemResult.from_dict(data, item))

    # Sort by item_idx
    item_results.sort(key=lambda r: r.item_idx)

    # Compute aggregated stats
    reports = [r.report for r in item_results if r.error is None]
    usages = [r.token_usage for r in reports if r.token_usage]
    costs = [r.completion_cost for r in reports if r.completion_cost is not None]

    total_usage = aggregate_token_usage(usages)
    total_cost = aggregate_completion_cost(costs)

    durations = [r.duration_seconds for r in item_results]
    timing_stats = EvalTimingStats.from_durations(
        durations, manifest.get("total_duration_seconds", 0.0)
    )

    errors = [(r.item_idx, r.error) for r in item_results if r.error]

    return cls(
        item_results=item_results,
        total_items=manifest["total_items"],
        successful_items=len(item_results) - len(errors),
        failed_items=len(errors),
        total_token_usage=total_usage,
        total_completion_cost=total_cost,
        timing_stats=timing_stats,
        started_at=datetime.fromisoformat(manifest["started_at"]),
        completed_at=datetime.fromisoformat(
            manifest.get("completed_at", manifest["started_at"])
        ),
        errors=errors,
        experiment_name=manifest["experiment_name"],
        experiment_dir=exp_dir,
    )

ItemResult

Result for a single evaluated item.

ItemResult dataclass

ItemResult(item_idx: int, item: DataItem, report: EvaluationReport | EnsembleEvaluationReport, duration_seconds: float, error: str | None = None)

Result for a single evaluated item.

to_dict

to_dict() -> dict[str, Any]

Serialize to dictionary for JSON storage.

Source code in src/autorubric/eval.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to dictionary for JSON storage."""
    report_dict: dict[str, Any] = {
        "score": self.report.score,
        "raw_score": self.report.raw_score,
        "error": self.report.error,
    }
    if hasattr(self.report, "cannot_assess_count"):
        report_dict["cannot_assess_count"] = self.report.cannot_assess_count
    if hasattr(self.report, "mean_agreement"):
        report_dict["mean_agreement"] = self.report.mean_agreement
    if self.report.token_usage:
        report_dict["token_usage"] = {
            "prompt_tokens": self.report.token_usage.prompt_tokens,
            "completion_tokens": self.report.token_usage.completion_tokens,
            "total_tokens": self.report.token_usage.total_tokens,
        }
    if self.report.completion_cost is not None:
        report_dict["completion_cost"] = self.report.completion_cost

    return {
        "item_idx": self.item_idx,
        "duration_seconds": self.duration_seconds,
        "error": self.error,
        "report": report_dict,
    }

from_dict classmethod

from_dict(data: dict[str, Any], item: DataItem) -> ItemResult

Deserialize from dictionary.

Source code in src/autorubric/eval.py
@classmethod
def from_dict(cls, data: dict[str, Any], item: DataItem) -> ItemResult:
    """Deserialize from dictionary."""
    report_data = data["report"]

    # Reconstruct TokenUsage if present
    token_usage = None
    if "token_usage" in report_data:
        from autorubric.types import TokenUsage as TU

        token_usage = TU(
            prompt_tokens=report_data["token_usage"].get("prompt_tokens", 0),
            completion_tokens=report_data["token_usage"].get("completion_tokens", 0),
            total_tokens=report_data["token_usage"].get("total_tokens", 0),
        )

    report = EvaluationReport(
        score=report_data["score"],
        raw_score=report_data.get("raw_score"),
        llm_raw_score=report_data.get("raw_score"),
        token_usage=token_usage,
        completion_cost=report_data.get("completion_cost"),
        error=report_data.get("error"),
        cannot_assess_count=report_data.get("cannot_assess_count", 0),
    )

    return cls(
        item_idx=data["item_idx"],
        item=item,
        report=report,
        duration_seconds=data["duration_seconds"],
        error=data.get("error"),
    )

EvalTimingStats

Timing statistics for an evaluation run.

EvalTimingStats dataclass

EvalTimingStats(total_duration_seconds: float, mean_item_duration_seconds: float, min_item_duration_seconds: float, max_item_duration_seconds: float, p50_item_duration_seconds: float, p95_item_duration_seconds: float, items_per_second: float)

Timing statistics for the evaluation run.

from_durations classmethod

from_durations(durations: list[float], total_duration: float) -> EvalTimingStats

Compute timing stats from a list of item durations.

Source code in src/autorubric/eval.py
@classmethod
def from_durations(
    cls,
    durations: list[float],
    total_duration: float,
) -> EvalTimingStats:
    """Compute timing stats from a list of item durations."""
    if not durations:
        return cls(
            total_duration_seconds=total_duration,
            mean_item_duration_seconds=0.0,
            min_item_duration_seconds=0.0,
            max_item_duration_seconds=0.0,
            p50_item_duration_seconds=0.0,
            p95_item_duration_seconds=0.0,
            items_per_second=0.0,
        )

    sorted_durations = sorted(durations)
    n = len(sorted_durations)

    return cls(
        total_duration_seconds=total_duration,
        mean_item_duration_seconds=sum(durations) / n,
        min_item_duration_seconds=sorted_durations[0],
        max_item_duration_seconds=sorted_durations[-1],
        p50_item_duration_seconds=sorted_durations[n // 2],
        p95_item_duration_seconds=sorted_durations[min(int(n * 0.95), n - 1)],
        items_per_second=n / total_duration if total_duration > 0 else 0.0,
    )

to_dict

to_dict() -> dict[str, float]

Serialize to dictionary.

Source code in src/autorubric/eval.py
def to_dict(self) -> dict[str, float]:
    """Serialize to dictionary."""
    return {
        "total_duration_seconds": self.total_duration_seconds,
        "mean_item_duration_seconds": self.mean_item_duration_seconds,
        "min_item_duration_seconds": self.min_item_duration_seconds,
        "max_item_duration_seconds": self.max_item_duration_seconds,
        "p50_item_duration_seconds": self.p50_item_duration_seconds,
        "p95_item_duration_seconds": self.p95_item_duration_seconds,
        "items_per_second": self.items_per_second,
    }

ExperimentManifest

Metadata for a saved experiment.

ExperimentManifest dataclass

ExperimentManifest(experiment_name: str, created_at: datetime, dataset_name: str | None, dataset_hash: str, total_items: int, status: Literal['running', 'completed', 'failed'], completed_indices: set[int], error: str | None = None, started_at: datetime | None = None, completed_at: datetime | None = None, total_duration_seconds: float | None = None, dataset_path: str | None = None, grader_config: dict[str, Any] | None = None, eval_config: dict[str, Any] | None = None)

Manifest for experiment checkpointing.

Contains metadata about an evaluation run for reproducibility and resumption.

to_dict

to_dict() -> dict[str, Any]

Serialize to dictionary for JSON storage.

Source code in src/autorubric/eval.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to dictionary for JSON storage."""
    return {
        "experiment_name": self.experiment_name,
        "created_at": self.created_at.isoformat(),
        "dataset_name": self.dataset_name,
        "dataset_hash": self.dataset_hash,
        "total_items": self.total_items,
        "status": self.status,
        "completed_indices": list(self.completed_indices),
        "error": self.error,
        "started_at": self.started_at.isoformat() if self.started_at else None,
        "completed_at": self.completed_at.isoformat() if self.completed_at else None,
        "total_duration_seconds": self.total_duration_seconds,
        "dataset_path": self.dataset_path,
        "grader_config": self.grader_config,
        "eval_config": self.eval_config,
    }

from_dict classmethod

from_dict(data: dict[str, Any]) -> ExperimentManifest

Deserialize from dictionary.

Source code in src/autorubric/eval.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> ExperimentManifest:
    """Deserialize from dictionary."""
    return cls(
        experiment_name=data["experiment_name"],
        created_at=datetime.fromisoformat(data["created_at"]),
        dataset_name=data.get("dataset_name"),
        dataset_hash=data["dataset_hash"],
        total_items=data["total_items"],
        status=data["status"],
        completed_indices=set(data.get("completed_indices", [])),
        error=data.get("error"),
        started_at=(
            datetime.fromisoformat(data["started_at"])
            if data.get("started_at") else None
        ),
        completed_at=(
            datetime.fromisoformat(data["completed_at"])
            if data.get("completed_at") else None
        ),
        total_duration_seconds=data.get("total_duration_seconds"),
        dataset_path=data.get("dataset_path"),
        grader_config=data.get("grader_config"),
        eval_config=data.get("eval_config"),
    )

References

Casabianca, J., McCaffrey, D. F., Johnson, M. S., Alper, N., and Zubenko, V. (2025). Validity Arguments For Constructed Response Scoring Using Generative Artificial Intelligence Applications. arXiv:2501.02334.