Skip to content

Utilities

Helper functions for token aggregation, text processing, and data manipulation.

Overview

Utility functions for common operations like aggregating token usage across multiple evaluations, parsing thinking/output responses, and generating synthetic ground truth labels.

Token Aggregation

from autorubric import (
    aggregate_token_usage,
    aggregate_completion_cost,
    aggregate_evaluation_usage,
)

# After batch grading
results = await asyncio.gather(*[rubric.grade(r, grader) for r in responses])

# Aggregate usage and cost
total_usage, total_cost = aggregate_evaluation_usage(results)

# Or aggregate manually
usages = [r.token_usage for r in results]
costs = [r.completion_cost for r in results]

total_usage = aggregate_token_usage(usages)
total_cost = aggregate_completion_cost(costs)

if total_usage:
    print(f"Total tokens: {total_usage.total_tokens}")
if total_cost:
    print(f"Total cost: ${total_cost:.4f}")

Thinking Output Parsing

from autorubric import parse_thinking_output, normalize_to_grade_input

# Parse string with markers
text = "<thinking>Reasoning here</thinking><output>Final answer</output>"
parsed = parse_thinking_output(text)
# {'thinking': 'Reasoning here', 'output': 'Final answer'}

# Normalize any input format
input1 = "plain text"
input2 = {"thinking": "...", "output": "..."}
input3 = "<thinking>...</thinking><output>...</output>"

normalized = normalize_to_grade_input(input1)  # {'thinking': None, 'output': 'plain text'}
normalized = normalize_to_grade_input(input2)  # passes through
normalized = normalize_to_grade_input(input3)  # parses markers

Synthetic Ground Truth

from autorubric import RubricDataset, LLMConfig
from autorubric.graders import CriterionGrader
from autorubric import fill_ground_truth

async def generate_labels():
    dataset = RubricDataset.from_file("unlabeled.json")

    # Use strong model for ground truth
    grader = CriterionGrader(
        llm_config=LLMConfig(
            model="anthropic/claude-sonnet-4-5-20250929",
            max_parallel_requests=10,
        )
    )

    labeled = await fill_ground_truth(
        dataset,
        grader,
        force=False,        # Only label items without ground_truth
        show_progress=True,
    )

    labeled.to_file("labeled.json")

Verdict Helpers

from autorubric import (
    extract_verdicts_from_report,
    filter_cannot_assess,
    verdict_to_binary,
    verdict_to_string,
)

# Extract verdicts from evaluation report
verdicts = extract_verdicts_from_report(result.report)

# Filter out CANNOT_ASSESS
filtered = filter_cannot_assess(verdicts)

# Convert to binary (for metrics)
binary = verdict_to_binary(CriterionVerdict.MET)  # 1
binary = verdict_to_binary(CriterionVerdict.UNMET)  # 0

# Convert to string
string = verdict_to_string(CriterionVerdict.MET)  # "MET"

aggregate_token_usage

Aggregate token usage from multiple evaluations.

aggregate_token_usage

aggregate_token_usage(usages: list[TokenUsage | None]) -> TokenUsage | None

Aggregate multiple TokenUsage objects into a single total.

Useful for combining usage from multiple LLM calls or multiple grading operations.

PARAMETER DESCRIPTION
usages

List of TokenUsage objects (None values are filtered out).

TYPE: list[TokenUsage | None]

RETURNS DESCRIPTION
TokenUsage | None

A single TokenUsage with summed values, or None if all inputs are None.

Example

from autorubric import TokenUsage usage1 = TokenUsage(prompt_tokens=100, completion_tokens=50, total_tokens=150) usage2 = TokenUsage(prompt_tokens=200, completion_tokens=100, total_tokens=300) total = aggregate_token_usage([usage1, usage2]) print(f"Total tokens: {total.total_tokens}") Total tokens: 450

Source code in src/autorubric/utils.py
def aggregate_token_usage(usages: list[TokenUsage | None]) -> TokenUsage | None:
    """Aggregate multiple TokenUsage objects into a single total.

    Useful for combining usage from multiple LLM calls or multiple grading operations.

    Args:
        usages: List of TokenUsage objects (None values are filtered out).

    Returns:
        A single TokenUsage with summed values, or None if all inputs are None.

    Example:
        >>> from autorubric import TokenUsage
        >>> usage1 = TokenUsage(prompt_tokens=100, completion_tokens=50, total_tokens=150)
        >>> usage2 = TokenUsage(prompt_tokens=200, completion_tokens=100, total_tokens=300)
        >>> total = aggregate_token_usage([usage1, usage2])
        >>> print(f"Total tokens: {total.total_tokens}")
        Total tokens: 450
    """
    valid_usages = [u for u in usages if u is not None]
    if not valid_usages:
        return None
    return sum(valid_usages, TokenUsage())

aggregate_completion_cost

Aggregate completion costs from multiple evaluations.

aggregate_completion_cost

aggregate_completion_cost(costs: list[float | None]) -> float | None

Aggregate multiple completion costs into a single total.

Useful for combining costs from multiple LLM calls or multiple grading operations.

PARAMETER DESCRIPTION
costs

List of cost values in USD (None values are filtered out).

TYPE: list[float | None]

RETURNS DESCRIPTION
float | None

Total cost in USD, or None if all inputs are None.

Example

costs = [0.001, 0.002, None, 0.003] total = aggregate_completion_cost(costs) print(f"Total cost: ${total:.4f}") Total cost: $0.0060

Source code in src/autorubric/utils.py
def aggregate_completion_cost(costs: list[float | None]) -> float | None:
    """Aggregate multiple completion costs into a single total.

    Useful for combining costs from multiple LLM calls or multiple grading operations.

    Args:
        costs: List of cost values in USD (None values are filtered out).

    Returns:
        Total cost in USD, or None if all inputs are None.

    Example:
        >>> costs = [0.001, 0.002, None, 0.003]
        >>> total = aggregate_completion_cost(costs)
        >>> print(f"Total cost: ${total:.4f}")
        Total cost: $0.0060
    """
    valid_costs = [c for c in costs if c is not None]
    if not valid_costs:
        return None
    return sum(valid_costs)

aggregate_evaluation_usage

Aggregate both usage and cost from evaluation reports.

aggregate_evaluation_usage

aggregate_evaluation_usage(reports: list['EvaluationReport']) -> tuple[TokenUsage | None, float | None]

Aggregate usage and cost from multiple EvaluationReports.

Useful for batch grading operations where you want to track total resource usage.

PARAMETER DESCRIPTION
reports

List of EvaluationReport objects from grading operations.

TYPE: list['EvaluationReport']

RETURNS DESCRIPTION
TokenUsage | None

Tuple of (total_token_usage, total_completion_cost).

float | None

Either value may be None if no usage data was available.

Example

After batch grading

results = await asyncio.gather(*[rubric.grade(...) for item in items]) total_usage, total_cost = aggregate_evaluation_usage(results) if total_usage: ... print(f"Total tokens used: {total_usage.total_tokens}") if total_cost: ... print(f"Total cost: ${total_cost:.4f}")

Source code in src/autorubric/utils.py
def aggregate_evaluation_usage(
    reports: list["EvaluationReport"],
) -> tuple[TokenUsage | None, float | None]:
    """Aggregate usage and cost from multiple EvaluationReports.

    Useful for batch grading operations where you want to track total resource usage.

    Args:
        reports: List of EvaluationReport objects from grading operations.

    Returns:
        Tuple of (total_token_usage, total_completion_cost).
        Either value may be None if no usage data was available.

    Example:
        >>> # After batch grading
        >>> results = await asyncio.gather(*[rubric.grade(...) for item in items])
        >>> total_usage, total_cost = aggregate_evaluation_usage(results)
        >>> if total_usage:
        ...     print(f"Total tokens used: {total_usage.total_tokens}")
        >>> if total_cost:
        ...     print(f"Total cost: ${total_cost:.4f}")
    """
    usages = [r.token_usage for r in reports]
    costs = [r.completion_cost for r in reports]
    return aggregate_token_usage(usages), aggregate_completion_cost(costs)

fill_ground_truth

Generate synthetic ground truth labels for unlabeled datasets.

fill_ground_truth async

fill_ground_truth(dataset: 'RubricDataset', grader: 'Grader', *, force: bool = False, show_progress: bool = True, max_concurrent_items: int | None = None) -> 'RubricDataset'

Generate ground truth labels for dataset items using an LLM grader.

Uses the provided grader to evaluate each item and extracts the verdicts to populate ground_truth. This is useful for creating synthetic ground truth labels when manual annotation is impractical.

PARAMETER DESCRIPTION
dataset

The dataset to fill ground truth for.

TYPE: 'RubricDataset'

grader

The grader to use for generating verdicts.

TYPE: 'Grader'

force

If True, re-grade all items. If False (default), only grade items where ground_truth is None.

TYPE: bool DEFAULT: False

show_progress

Whether to display progress bars. Default True.

TYPE: bool DEFAULT: True

max_concurrent_items

Maximum items to grade concurrently. None = grade all items in parallel (default).

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
'RubricDataset'

A new RubricDataset with ground_truth filled in. Items that fail to

'RubricDataset'

grade are excluded from the returned dataset. Items with existing

'RubricDataset'

ground_truth (when force=False) are included unchanged.

RAISES DESCRIPTION
ValueError

If dataset has no items.

Example

from autorubric import RubricDataset, LLMConfig from autorubric.graders import CriterionGrader from autorubric.utils import fill_ground_truth

dataset = RubricDataset.from_file("unlabeled.json") grader = CriterionGrader(llm_config=LLMConfig(model="openai/gpt-4o")) labeled = await fill_ground_truth(dataset, grader) labeled.to_file("labeled.json")

Source code in src/autorubric/utils.py
async def fill_ground_truth(
    dataset: "RubricDataset",
    grader: "Grader",
    *,
    force: bool = False,
    show_progress: bool = True,
    max_concurrent_items: int | None = None,
) -> "RubricDataset":
    """Generate ground truth labels for dataset items using an LLM grader.

    Uses the provided grader to evaluate each item and extracts the verdicts
    to populate ground_truth. This is useful for creating synthetic ground
    truth labels when manual annotation is impractical.

    Args:
        dataset: The dataset to fill ground truth for.
        grader: The grader to use for generating verdicts.
        force: If True, re-grade all items. If False (default), only grade items
            where ground_truth is None.
        show_progress: Whether to display progress bars. Default True.
        max_concurrent_items: Maximum items to grade concurrently.
            None = grade all items in parallel (default).

    Returns:
        A new RubricDataset with ground_truth filled in. Items that fail to
        grade are excluded from the returned dataset. Items with existing
        ground_truth (when force=False) are included unchanged.

    Raises:
        ValueError: If dataset has no items.

    Example:
        >>> from autorubric import RubricDataset, LLMConfig
        >>> from autorubric.graders import CriterionGrader
        >>> from autorubric.utils import fill_ground_truth
        >>>
        >>> dataset = RubricDataset.from_file("unlabeled.json")
        >>> grader = CriterionGrader(llm_config=LLMConfig(model="openai/gpt-4o"))
        >>> labeled = await fill_ground_truth(dataset, grader)
        >>> labeled.to_file("labeled.json")
    """
    from autorubric.dataset import DataItem, RubricDataset

    if len(dataset) == 0:
        raise ValueError("Dataset has no items")

    # Partition items
    items_to_grade: list[tuple[int, DataItem]] = []
    preserved_items: dict[int, DataItem] = {}

    for idx, item in enumerate(dataset.items):
        if force or item.ground_truth is None:
            items_to_grade.append((idx, item))
        else:
            preserved_items[idx] = item

    # Grade items that need it
    graded_items: dict[int, DataItem] = {}

    if items_to_grade:

        async def grade_item(
            idx: int, item: DataItem
        ) -> tuple[int, DataItem | None, str | None]:
            try:
                # Use per-item rubric if available, otherwise fall back to global
                effective_rubric = dataset.get_item_rubric(idx)
                # Get effective reference submission (item-level takes precedence)
                reference = dataset.get_item_reference_submission(idx)
                report = await effective_rubric.grade(
                    to_grade=item.submission,
                    grader=grader,
                    query=dataset.prompt,
                    reference_submission=reference,
                )
                gt = _extract_ground_truth_from_report(report, effective_rubric.rubric)
                new_item = DataItem(
                    submission=item.submission,
                    description=item.description,
                    ground_truth=gt,
                    rubric=item.rubric,  # Preserve per-item rubric
                    reference_submission=item.reference_submission,  # Preserve reference
                )
                return (idx, new_item, None)
            except Exception as e:
                return (idx, None, str(e))

        # Create tasks with optional concurrency limit
        if max_concurrent_items:
            semaphore = asyncio.Semaphore(max_concurrent_items)

            async def limited_grade(
                idx: int, item: DataItem
            ) -> tuple[int, DataItem | None, str | None]:
                async with semaphore:
                    return await grade_item(idx, item)

            tasks = [limited_grade(idx, item) for idx, item in items_to_grade]
        else:
            tasks = [grade_item(idx, item) for idx, item in items_to_grade]

        # Execute with optional progress
        if show_progress:
            try:
                from rich.console import Console
                from rich.progress import (
                    BarColumn,
                    MofNCompleteColumn,
                    Progress,
                    SpinnerColumn,
                    TextColumn,
                )

                progress = Progress(
                    SpinnerColumn(),
                    TextColumn("[bold blue]Filling ground truth"),
                    BarColumn(bar_width=40),
                    MofNCompleteColumn(),
                    console=Console(stderr=True),
                )

                with progress:
                    task_id = progress.add_task("Grading", total=len(tasks))
                    for coro in asyncio.as_completed(tasks):
                        idx, new_item, error = await coro
                        if new_item is not None:
                            graded_items[idx] = new_item
                        progress.update(task_id, advance=1)
            except ImportError:
                # Fall back to no progress if rich is not available
                results = await asyncio.gather(*tasks)
                for idx, new_item, error in results:
                    if new_item is not None:
                        graded_items[idx] = new_item
        else:
            results = await asyncio.gather(*tasks)
            for idx, new_item, error in results:
                if new_item is not None:
                    graded_items[idx] = new_item

    # Combine preserved and graded items, maintaining order
    all_items: dict[int, DataItem] = {**preserved_items, **graded_items}
    ordered_items = [all_items[i] for i in sorted(all_items.keys())]

    return RubricDataset(
        prompt=dataset.prompt,
        rubric=dataset.rubric,
        items=ordered_items,
        name=dataset.name,
        reference_submission=dataset.reference_submission,
    )

parse_thinking_output

Parse text with thinking/output markers.

parse_thinking_output

parse_thinking_output(text: str) -> ThinkingOutputDict

Parse thinking and output sections from text with XML-style markers.

Looks for ... and ... markers. If markers are not found, treats the entire text as output.

PARAMETER DESCRIPTION
text

Text potentially containing thinking/output markers.

TYPE: str

RETURNS DESCRIPTION
ThinkingOutputDict

Dict with 'thinking' and 'output' keys. Empty strings if sections not found.

Examples:

>>> parse_thinking_output("<thinking>ABC</thinking><output>DEF</output>")
{'thinking': 'ABC', 'output': 'DEF'}
>>> parse_thinking_output("Just output text")
{'thinking': '', 'output': 'Just output text'}
>>> parse_thinking_output("<thinking>Think</thinking>Rest")
{'thinking': 'Think', 'output': 'Rest'}
Source code in src/autorubric/utils.py
def parse_thinking_output(text: str) -> ThinkingOutputDict:
    """Parse thinking and output sections from text with XML-style markers.

    Looks for <thinking>...</thinking> and <output>...</output> markers.
    If markers are not found, treats the entire text as output.

    Args:
        text: Text potentially containing thinking/output markers.

    Returns:
        Dict with 'thinking' and 'output' keys. Empty strings if sections not found.

    Examples:
        >>> parse_thinking_output("<thinking>ABC</thinking><output>DEF</output>")
        {'thinking': 'ABC', 'output': 'DEF'}

        >>> parse_thinking_output("Just output text")
        {'thinking': '', 'output': 'Just output text'}

        >>> parse_thinking_output("<thinking>Think</thinking>Rest")
        {'thinking': 'Think', 'output': 'Rest'}
    """
    # Try to extract thinking section
    thinking_match = re.search(r"<thinking>(.*?)</thinking>", text, re.DOTALL | re.IGNORECASE)
    thinking = thinking_match.group(1).strip() if thinking_match else ""

    # Try to extract output section
    output_match = re.search(r"<output>(.*?)</output>", text, re.DOTALL | re.IGNORECASE)

    if output_match:
        # Explicit output markers found
        output = output_match.group(1).strip()
    elif thinking_match:
        # Has thinking but no output markers - treat rest as output
        # Remove the thinking section and use remainder
        output = re.sub(
            r"<thinking>.*?</thinking>", "", text, flags=re.DOTALL | re.IGNORECASE
        ).strip()
    else:
        # No markers at all - treat entire text as output
        output = text

    return ThinkingOutputDict(thinking=thinking, output=output)

normalize_to_grade_input

Normalize any input format to ThinkingOutputDict.

normalize_to_grade_input

normalize_to_grade_input(to_grade: ToGradeInput) -> ThinkingOutputDict

Normalize to_grade input to dict format.

PARAMETER DESCRIPTION
to_grade

Either a string (with optional markers) or a dict.

TYPE: ToGradeInput

RETURNS DESCRIPTION
ThinkingOutputDict

Dict with 'thinking' and 'output' keys.

RAISES DESCRIPTION
ValueError

If dict format is invalid (missing keys, wrong types).

Source code in src/autorubric/utils.py
def normalize_to_grade_input(to_grade: ToGradeInput) -> ThinkingOutputDict:
    """Normalize to_grade input to dict format.

    Args:
        to_grade: Either a string (with optional markers) or a dict.

    Returns:
        Dict with 'thinking' and 'output' keys.

    Raises:
        ValueError: If dict format is invalid (missing keys, wrong types).
    """
    if isinstance(to_grade, str):
        return parse_thinking_output(to_grade)

    # Handle dict input
    if not isinstance(to_grade, dict):
        raise ValueError(f"to_grade must be a string or dict, got {type(to_grade).__name__}")

    # Validate dict has correct keys
    thinking = to_grade.get("thinking", "")
    output = to_grade.get("output", "")

    # Validate types
    if not isinstance(thinking, str):
        raise ValueError(f"'thinking' must be a string, got {type(thinking).__name__}")
    if not isinstance(output, str):
        raise ValueError(f"'output' must be a string, got {type(output).__name__}")

    # Warn if dict has unexpected keys
    expected_keys = {"thinking", "output"}
    extra_keys = set(to_grade.keys()) - expected_keys
    if extra_keys:
        warnings.warn(
            f"Unexpected keys in to_grade dict: {extra_keys}. "
            f"Only 'thinking' and 'output' are used.",
            UserWarning,
        )

    return ThinkingOutputDict(thinking=thinking, output=output)

word_count

Count words in text (default length penalty function).

word_count

word_count(text: str) -> int

Count the number of whitespace-separated words in text.

This is the default counting function used by LengthPenalty. For more accurate token counting with a specific model, provide a custom count_fn that uses a tokenizer.

Source code in src/autorubric/utils.py
def word_count(text: str) -> int:
    """Count the number of whitespace-separated words in text.

    This is the default counting function used by LengthPenalty.
    For more accurate token counting with a specific model, provide a custom
    count_fn that uses a tokenizer.
    """
    return len(text.split())

extract_verdicts_from_report

Extract verdicts from criterion reports.

extract_verdicts_from_report

extract_verdicts_from_report(report: EvaluationReport | EnsembleEvaluationReport, num_criteria: int) -> list[CriterionVerdict]

Extract verdicts from an EvaluationReport.

PARAMETER DESCRIPTION
report

The evaluation report.

TYPE: EvaluationReport | EnsembleEvaluationReport

num_criteria

Expected number of criteria.

TYPE: int

RETURNS DESCRIPTION
list[CriterionVerdict]

List of CriterionVerdict values.

Source code in src/autorubric/metrics/_helpers.py
def extract_verdicts_from_report(
    report: EvaluationReport | EnsembleEvaluationReport,
    num_criteria: int,
) -> list[CriterionVerdict]:
    """Extract verdicts from an EvaluationReport.

    Args:
        report: The evaluation report.
        num_criteria: Expected number of criteria.

    Returns:
        List of CriterionVerdict values.
    """
    if report.report is None:
        return [CriterionVerdict.UNMET] * num_criteria

    verdicts = []
    for cr in report.report:
        if isinstance(cr, dict):
            # Handle dict case (shouldn't happen but defensive)
            verdicts.append(cr.get("verdict", CriterionVerdict.UNMET))
        elif hasattr(cr, "final_verdict"):
            # EnsembleCriterionReport
            verdicts.append(cr.final_verdict)
        elif hasattr(cr, "verdict"):
            # CriterionReport
            verdicts.append(cr.verdict)
        else:
            verdicts.append(CriterionVerdict.UNMET)

    return verdicts

filter_cannot_assess

Filter out CANNOT_ASSESS verdicts.

filter_cannot_assess

filter_cannot_assess(pred_verdicts: list[CriterionVerdict], true_verdicts: list[CriterionVerdict], mode: CannotAssessMode = 'exclude') -> tuple[list[CriterionVerdict], list[CriterionVerdict]]

Filter or transform CANNOT_ASSESS verdicts based on mode.

PARAMETER DESCRIPTION
pred_verdicts

Predicted verdicts.

TYPE: list[CriterionVerdict]

true_verdicts

Ground truth verdicts.

TYPE: list[CriterionVerdict]

mode

How to handle CANNOT_ASSESS: - "exclude": Remove pairs where either is CA - "as_unmet": Convert CA to UNMET - "as_category": Keep CA as-is (3-class)

TYPE: CannotAssessMode DEFAULT: 'exclude'

RETURNS DESCRIPTION
tuple[list[CriterionVerdict], list[CriterionVerdict]]

Tuple of (filtered_pred, filtered_true).

Source code in src/autorubric/metrics/_helpers.py
def filter_cannot_assess(
    pred_verdicts: list[CriterionVerdict],
    true_verdicts: list[CriterionVerdict],
    mode: CannotAssessMode = "exclude",
) -> tuple[list[CriterionVerdict], list[CriterionVerdict]]:
    """Filter or transform CANNOT_ASSESS verdicts based on mode.

    Args:
        pred_verdicts: Predicted verdicts.
        true_verdicts: Ground truth verdicts.
        mode: How to handle CANNOT_ASSESS:
            - "exclude": Remove pairs where either is CA
            - "as_unmet": Convert CA to UNMET
            - "as_category": Keep CA as-is (3-class)

    Returns:
        Tuple of (filtered_pred, filtered_true).
    """
    CA = CriterionVerdict.CANNOT_ASSESS

    if mode == "exclude":
        filtered_pred = []
        filtered_true = []
        for p, t in zip(pred_verdicts, true_verdicts):
            if p != CA and t != CA:
                filtered_pred.append(p)
                filtered_true.append(t)
        return filtered_pred, filtered_true

    elif mode == "as_unmet":
        return (
            [CriterionVerdict.UNMET if v == CA else v for v in pred_verdicts],
            [CriterionVerdict.UNMET if v == CA else v for v in true_verdicts],
        )

    else:  # as_category
        return list(pred_verdicts), list(true_verdicts)

verdict_to_binary

Convert verdict to binary value.

verdict_to_binary

verdict_to_binary(verdicts: Sequence[CriterionVerdict]) -> list[int]

Convert verdicts to binary (MET=1, UNMET/CA=0).

PARAMETER DESCRIPTION
verdicts

List of CriterionVerdict values.

TYPE: Sequence[CriterionVerdict]

RETURNS DESCRIPTION
list[int]

List of 0/1 values.

Source code in src/autorubric/metrics/_helpers.py
def verdict_to_binary(verdicts: Sequence[CriterionVerdict]) -> list[int]:
    """Convert verdicts to binary (MET=1, UNMET/CA=0).

    Args:
        verdicts: List of CriterionVerdict values.

    Returns:
        List of 0/1 values.
    """
    return [1 if v == CriterionVerdict.MET else 0 for v in verdicts]

verdict_to_string

Convert verdict to string representation.

verdict_to_string

verdict_to_string(verdicts: Sequence[CriterionVerdict]) -> list[str]

Convert verdicts to string values.

PARAMETER DESCRIPTION
verdicts

List of CriterionVerdict values.

TYPE: Sequence[CriterionVerdict]

RETURNS DESCRIPTION
list[str]

List of string values.

Source code in src/autorubric/metrics/_helpers.py
def verdict_to_string(verdicts: Sequence[CriterionVerdict]) -> list[str]:
    """Convert verdicts to string values.

    Args:
        verdicts: List of CriterionVerdict values.

    Returns:
        List of string values.
    """
    return [v.value for v in verdicts]