Skip to content

emic.experiments

The experiments module provides a framework for reproducible algorithm benchmarking.

Command-Line Interface

emic-experiment --help

Options

Option Description
--all Run all experiments
--quick Quick mode (reduced params, skip slow algorithms)
--parallel N Run with N parallel workers
--shard M/N Run shard M of N (for distributed execution)
--combine DIR Combine sharded results from DIR
--list List available experiments
--algorithms Comma-separated list of algorithms (e.g., --algorithms cssr,spectral)
--timeout Per-run timeout in seconds (default: 120)
-o, --output-dir Output directory (default: experiments/runs)
-q, --quiet Suppress progress output

Core Classes

ExperimentRunner

ExperimentRunner(
    config: ExperimentsConfig | None = None,
    process_registry: ProcessRegistry | None = None,
    algorithm_registry: AlgorithmRegistry | None = None,
    output_dir: str | None = None,
    verbose: bool = True,
    shard: tuple[int, int] | None = None,
    algorithms_filter: list[str] | None = None,
)

Run benchmark experiments and collect results.

Example

runner = ExperimentRunner() runner.run_all() # Run all default experiments runner.run_experiment("accuracy") # Run specific experiment

Sharding example

runner = ExperimentRunner(shard=(0, 4)) # Run shard 0 of 4

Initialize the benchmark runner.

Parameters:

Name Type Description Default
config ExperimentsConfig | None

Benchmark configuration (uses defaults if None)

None
process_registry ProcessRegistry | None

Process registry (uses defaults if None)

None
algorithm_registry AlgorithmRegistry | None

Algorithm registry (uses defaults if None)

None
output_dir str | None

Override output directory

None
verbose bool

Print progress to stdout

True
shard tuple[int, int] | None

Optional (shard_index, total_shards) for parallel execution

None
algorithms_filter list[str] | None

Optional list of algorithm names to run (overrides config)

None
Source code in src/emic/experiments/runner.py
def __init__(
    self,
    config: ExperimentsConfig | None = None,
    process_registry: ProcessRegistry | None = None,
    algorithm_registry: AlgorithmRegistry | None = None,
    output_dir: str | None = None,
    verbose: bool = True,
    shard: tuple[int, int] | None = None,
    algorithms_filter: list[str] | None = None,
):
    """
    Initialize the benchmark runner.

    Args:
        config: Benchmark configuration (uses defaults if None)
        process_registry: Process registry (uses defaults if None)
        algorithm_registry: Algorithm registry (uses defaults if None)
        output_dir: Override output directory
        verbose: Print progress to stdout
        shard: Optional (shard_index, total_shards) for parallel execution
        algorithms_filter: Optional list of algorithm names to run (overrides config)
    """
    from emic.experiments.config import create_default_config

    self.config = config or create_default_config()
    self.process_registry = process_registry or get_process_registry()
    self.algorithm_registry = algorithm_registry or get_algorithm_registry()
    self.output_dir = output_dir or self.config.output_dir
    self.verbose = verbose
    self.shard = shard
    self.algorithms_filter = algorithms_filter

    self.writer = ResultsWriter(self.output_dir, shard=shard)
    self.progress: RunProgress | None = None

run_experiment

run_experiment(
    experiment: ExperimentConfig,
) -> list[BenchmarkResult]

Run a single experiment.

Parameters:

Name Type Description Default
experiment ExperimentConfig

Experiment configuration

required

Returns:

Type Description
list[BenchmarkResult]

List of all results from the experiment

Source code in src/emic/experiments/runner.py
def run_experiment(self, experiment: ExperimentConfig) -> list[BenchmarkResult]:
    """
    Run a single experiment.

    Args:
        experiment: Experiment configuration

    Returns:
        List of all results from the experiment
    """
    results: list[BenchmarkResult] = []

    # Get effective sample sizes
    if self.config.quick_mode:
        sample_sizes = self.config.quick_sample_sizes
    else:
        sample_sizes = experiment.sample_sizes

    # Get algorithms (skip slow in quick mode, filter if specified)
    algorithm_names = (
        self.algorithms_filter if self.algorithms_filter else experiment.algorithms
    )
    algorithms = []
    for name in algorithm_names:
        try:
            algo_info = self.algorithm_registry.get(name)
            if self.config.quick_mode and algo_info.slow:
                self._log(f"  Skipping {name} (slow, quick mode)")
                continue
            algorithms.append(algo_info)
        except KeyError:
            self._log(f"  Warning: Unknown algorithm {name}")

    # Get processes
    processes = []
    for name in experiment.processes:
        try:
            processes.append(self.process_registry.get(name))
        except KeyError:
            self._log(f"  Warning: Unknown process {name}")

    # Build flat list of all runs
    all_runs: list[tuple[AlgorithmInfo, ProcessInfo, int, int]] = []
    for algo_info in algorithms:
        for proc_info in processes:
            for n in sample_sizes:
                reps = experiment.get_repetitions(n)
                for rep in range(reps):
                    all_runs.append((algo_info, proc_info, n, rep))

    # Filter by shard if specified
    if self.shard is not None:
        shard_index, total_shards = self.shard
        all_runs = [run for i, run in enumerate(all_runs) if i % total_shards == shard_index]

    total_runs = len(all_runs)
    self.progress = RunProgress(total=total_runs)
    self.progress.start()

    self._log(f"\n=== {experiment.name}: {experiment.description} ===")
    if self.shard is not None:
        shard_index, total_shards = self.shard
        self._log(f"Shard {shard_index}/{total_shards}: {total_runs} runs")
    else:
        self._log(f"Total runs: {total_runs}")

    for algo_info, proc_info, n, rep in all_runs:
        seed = experiment.seed_offset + rep

        self._log(
            f"  {algo_info.name} x {proc_info.name} x N={n} {self.progress.format_progress()}"
        )

        run_results = run_single_benchmark(
            algorithm_info=algo_info,
            process_info=proc_info,
            n_samples=n,
            experiment_name=experiment.name,
            seed=seed,
            timeout_seconds=experiment.timeout_seconds,
        )

        results.extend(run_results)
        self.writer.add_results(run_results)

        # Check for errors
        if any(r.error for r in run_results):
            self.progress.record_failed()
        else:
            self.progress.record_complete()

    return results

run_all

run_all() -> list[BenchmarkResult]

Run all experiments in the configuration.

Returns:

Type Description
list[BenchmarkResult]

List of all results

Source code in src/emic/experiments/runner.py
def run_all(self) -> list[BenchmarkResult]:
    """
    Run all experiments in the configuration.

    Returns:
        List of all results
    """
    all_results: list[BenchmarkResult] = []
    start_time = time.perf_counter()

    git_commit, git_dirty = get_git_info()

    self._log("=" * 60)
    self._log("EMIC Benchmark Suite")
    self._log("=" * 60)
    self._log(f"Output: {self.output_dir}")
    self._log(f"Quick mode: {self.config.quick_mode}")
    if self.shard is not None:
        shard_index, total_shards = self.shard
        self._log(f"Shard: {shard_index}/{total_shards}")
    self._log(f"Git: {git_commit}{' (dirty)' if git_dirty else ''}")

    for experiment in self.config.experiments:
        results = self.run_experiment(experiment)
        all_results.extend(results)

        # Incremental save after each experiment
        self.writer.save_incremental()

    # Finalize
    duration = time.perf_counter() - start_time
    metadata = RunMetadata(
        timestamp=self.writer.timestamp,
        git_commit=git_commit,
        git_dirty=git_dirty,
        python_version=platform.python_version(),
        emic_version=get_emic_version(),
        cli_args=sys.argv[1:],
        duration_seconds=duration,
        completed=True,
    )

    result_path = self.writer.finalize(metadata)

    self._log("")
    self._log("=" * 60)
    self._log(f"Complete! Duration: {duration:.1f}s")
    self._log(f"Results: {result_path}")
    self._log("=" * 60)

    return all_results

run_single_benchmark

run_single_benchmark(
    algorithm_info: AlgorithmInfo,
    process_info: ProcessInfo,
    n_samples: int,
    experiment_name: str,
    seed: int = 42,
    timeout_seconds: int = 120,
) -> list[BenchmarkResult]

Run a single benchmark configuration.

Parameters:

Name Type Description Default
algorithm_info AlgorithmInfo

Algorithm to benchmark

required
process_info ProcessInfo

Process to generate data from

required
n_samples int

Number of samples to generate

required
experiment_name str

Name of the parent experiment

required
seed int

Random seed for reproducibility

42
timeout_seconds int

Maximum time for this run

120

Returns:

Type Description
list[BenchmarkResult]

List of BenchmarkResult for each metric

Source code in src/emic/experiments/runner.py
def run_single_benchmark(
    algorithm_info: AlgorithmInfo,
    process_info: ProcessInfo,
    n_samples: int,
    experiment_name: str,
    seed: int = 42,
    timeout_seconds: int = 120,
) -> list[BenchmarkResult]:
    """
    Run a single benchmark configuration.

    Args:
        algorithm_info: Algorithm to benchmark
        process_info: Process to generate data from
        n_samples: Number of samples to generate
        experiment_name: Name of the parent experiment
        seed: Random seed for reproducibility
        timeout_seconds: Maximum time for this run

    Returns:
        List of BenchmarkResult for each metric
    """
    results: list[BenchmarkResult] = []
    timestamp = datetime.now(UTC)
    error_message = None
    old_handler = None

    # Set up timeout (Unix only)
    has_alarm = hasattr(signal, "SIGALRM")
    if has_alarm:
        old_handler = signal.signal(signal.SIGALRM, _timeout_handler)
        signal.alarm(timeout_seconds)

    start_time = time.perf_counter()

    try:
        # Create source and generate data
        source = process_info.create_source(seed=seed)
        data = list(islice(source, n_samples))

        # Create algorithm and run inference
        config_overrides = {}
        algo = algorithm_info.create_algorithm(**config_overrides)
        result = algo.infer(data)

        # Extract machine
        machine = result.machine

        # Compute metrics
        duration = time.perf_counter() - start_time

        # State count
        n_states = state_count(machine)
        results.append(
            BenchmarkResult(
                experiment=experiment_name,
                algorithm=algorithm_info.name,
                process=process_info.name,
                n_samples=n_samples,
                metric="state_count",
                value=float(n_states),
                ground_truth=process_info.ground_truth.get("state_count"),
                timestamp=timestamp,
            )
        )

        # Statistical complexity (Cμ)
        cmu = statistical_complexity(machine)
        results.append(
            BenchmarkResult(
                experiment=experiment_name,
                algorithm=algorithm_info.name,
                process=process_info.name,
                n_samples=n_samples,
                metric="cmu",
                value=cmu,
                ground_truth=process_info.ground_truth.get("cmu"),
                timestamp=timestamp,
            )
        )

        # Entropy rate (hμ)
        hmu = entropy_rate(machine)
        results.append(
            BenchmarkResult(
                experiment=experiment_name,
                algorithm=algorithm_info.name,
                process=process_info.name,
                n_samples=n_samples,
                metric="hmu",
                value=hmu,
                ground_truth=process_info.ground_truth.get("hmu"),
                timestamp=timestamp,
            )
        )

        # Duration
        results.append(
            BenchmarkResult(
                experiment=experiment_name,
                algorithm=algorithm_info.name,
                process=process_info.name,
                n_samples=n_samples,
                metric="duration_s",
                value=duration,
                timestamp=timestamp,
            )
        )

    except TimeoutError:
        error_message = f"Timeout after {timeout_seconds}s"
    except Exception as e:
        error_message = str(e)
    finally:
        # Clear timeout
        if has_alarm and old_handler is not None:
            signal.alarm(0)
            signal.signal(signal.SIGALRM, old_handler)

    # If error, record error results
    if error_message is not None:
        duration = time.perf_counter() - start_time
        for metric in ["state_count", "cmu", "hmu", "duration_s"]:
            results.append(
                BenchmarkResult(
                    experiment=experiment_name,
                    algorithm=algorithm_info.name,
                    process=process_info.name,
                    n_samples=n_samples,
                    metric=metric,
                    value=float("nan"),
                    ground_truth=process_info.ground_truth.get(metric),
                    error=error_message,
                    timestamp=timestamp,
                )
            )

    return results

Configuration

ExperimentConfig dataclass

ExperimentConfig(
    name: str,
    description: str = "",
    algorithms: list[str] = (
        lambda: ["cssr", "spectral"]
    )(),
    processes: list[str] = (
        lambda: ["even_process", "golden_mean"]
    )(),
    sample_sizes: list[int] = (
        lambda: [1000, 10000, 100000]
    )(),
    metrics: list[str] = (
        lambda: ["state_count", "cmu", "hmu", "duration_s"]
    )(),
    repetitions: int = 1,
    repetitions_by_sample_size: dict[int, int] = dict(),
    seed_offset: int = 0,
    algorithm_configs: dict[str, dict[str, Any]] = dict(),
    timeout_seconds: int = 120,
)

Configuration for a single experiment.

Attributes:

Name Type Description
name str

Experiment identifier (e.g., "accuracy")

description str

Human-readable description

algorithms list[str]

List of algorithm names to benchmark

processes list[str]

List of process names to test

sample_sizes list[int]

List of N values for data generation

metrics list[str]

List of metrics to compute

repetitions int

Default number of times to repeat each configuration

repetitions_by_sample_size dict[int, int]

Override repetitions per sample size (e.g., {1000: 5, 10000: 3})

seed_offset int

Base seed for random number generation

algorithm_configs dict[str, dict[str, Any]]

Per-algorithm config overrides

timeout_seconds int

Per-run timeout in seconds

total_runs property

total_runs: int

Total number of individual benchmark runs.

get_repetitions

get_repetitions(sample_size: int) -> int

Get repetitions for a specific sample size.

Source code in src/emic/experiments/config.py
def get_repetitions(self, sample_size: int) -> int:
    """Get repetitions for a specific sample size."""
    return self.repetitions_by_sample_size.get(sample_size, self.repetitions)

ExperimentsConfig dataclass

ExperimentsConfig(
    experiments: list[ExperimentConfig],
    output_dir: str = "experiments/runs",
    quick_mode: bool = False,
    quick_sample_sizes: list[int] = (lambda: [1000])(),
)

Top-level experiments configuration.

Attributes:

Name Type Description
experiments list[ExperimentConfig]

List of experiment configurations

output_dir str

Directory for results output

quick_mode bool

If True, use reduced sample sizes and skip slow algorithms

quick_sample_sizes list[int]

Sample sizes to use in quick mode

from_yaml classmethod

from_yaml(path: str | Path) -> ExperimentsConfig

Load configuration from a YAML file.

Source code in src/emic/experiments/config.py
@classmethod
def from_yaml(cls, path: str | Path) -> ExperimentsConfig:
    """Load configuration from a YAML file."""
    with Path(path).open() as f:
        data = yaml.safe_load(f)
    return cls.from_dict(data)

from_dict classmethod

from_dict(data: dict[str, Any]) -> ExperimentsConfig

Create configuration from a dictionary.

Source code in src/emic/experiments/config.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> ExperimentsConfig:
    """Create configuration from a dictionary."""
    experiments = []
    for exp_data in data.get("experiments", []):
        experiments.append(ExperimentConfig(**exp_data))

    return cls(
        experiments=experiments,
        output_dir=data.get("output_dir", "experiments/runs"),
        quick_mode=data.get("quick_mode", False),
        quick_sample_sizes=data.get("quick_sample_sizes", [1000]),
    )

get_experiment

get_experiment(name: str) -> ExperimentConfig

Get an experiment by name.

Source code in src/emic/experiments/config.py
def get_experiment(self, name: str) -> ExperimentConfig:
    """Get an experiment by name."""
    for exp in self.experiments:
        if exp.name == name:
            return exp
    raise KeyError(
        f"Unknown experiment: {name}. Available: {[e.name for e in self.experiments]}"
    )

list_experiments

list_experiments() -> list[str]

List all experiment names.

Source code in src/emic/experiments/config.py
def list_experiments(self) -> list[str]:
    """List all experiment names."""
    return [exp.name for exp in self.experiments]

load_config

load_config(
    path: str | Path | None = None, quick_mode: bool = False
) -> ExperimentsConfig

Load benchmark configuration.

Parameters:

Name Type Description Default
path str | Path | None

Path to YAML config file. If None, uses defaults.

None
quick_mode bool

If True, use reduced parameter space.

False

Returns:

Type Description
ExperimentsConfig

Loaded or default configuration

Source code in src/emic/experiments/config.py
def load_config(path: str | Path | None = None, quick_mode: bool = False) -> ExperimentsConfig:
    """
    Load benchmark configuration.

    Args:
        path: Path to YAML config file. If None, uses defaults.
        quick_mode: If True, use reduced parameter space.

    Returns:
        Loaded or default configuration
    """
    if path is not None:
        config = ExperimentsConfig.from_yaml(path)
        if quick_mode:
            # Override quick mode setting from CLI
            return ExperimentsConfig(
                experiments=config.experiments,
                output_dir=config.output_dir,
                quick_mode=True,
                quick_sample_sizes=config.quick_sample_sizes,
            )
        return config
    return create_default_config(quick_mode=quick_mode)

Registries

ProcessRegistry

ProcessRegistry()

Registry of benchmark processes.

Processes are data sources with known ground truth for validation.

Example

registry = ProcessRegistry() registry.register( name="even_process", display_name="Even Process", factory=EvenProcessSource, parameters={"p": 0.5}, ground_truth={"state_count": 2, "cmu": 1.0}, ) process = registry.get("even_process") source = process.create_source(seed=42)

Source code in src/emic/experiments/registry.py
def __init__(self) -> None:
    self._processes: dict[str, ProcessInfo] = {}

register

register(
    name: str,
    display_name: str,
    factory: Callable[..., SequenceSource],
    ground_truth: dict[str, float] | None = None,
    description: str = "",
    parameters: dict[str, Any] | None = None,
) -> None

Register a process.

Source code in src/emic/experiments/registry.py
def register(
    self,
    name: str,
    display_name: str,
    factory: Callable[..., SequenceSource],
    ground_truth: dict[str, float] | None = None,
    description: str = "",
    parameters: dict[str, Any] | None = None,
) -> None:
    """Register a process."""
    self._processes[name] = ProcessInfo(
        name=name,
        display_name=display_name,
        factory=factory,
        ground_truth=ground_truth or {},
        description=description,
        parameters=parameters or {},
    )

get

get(name: str) -> ProcessInfo

Get a registered process by name.

Source code in src/emic/experiments/registry.py
def get(self, name: str) -> ProcessInfo:
    """Get a registered process by name."""
    if name not in self._processes:
        raise KeyError(f"Unknown process: {name}. Available: {list(self._processes)}")
    return self._processes[name]

list

list() -> list[str]

List all registered process names.

Source code in src/emic/experiments/registry.py
def list(self) -> list[str]:
    """List all registered process names."""
    return list(self._processes.keys())

ProcessInfo dataclass

ProcessInfo(
    name: str,
    display_name: str,
    factory: Callable[..., SequenceSource],
    ground_truth: dict[str, float] = dict(),
    description: str = "",
    parameters: dict[str, Any] = dict(),
)

Information about a benchmark process.

Attributes:

Name Type Description
name str

Unique identifier (e.g., "even_process")

display_name str

Human-readable name (e.g., "Even Process")

factory Callable[..., SequenceSource]

Callable that creates the source (takes seed as kwarg)

ground_truth dict[str, float]

Dictionary of expected metric values

description str

Optional description

parameters dict[str, Any]

Parameters passed to the factory

create_source

create_source(seed: int = 42) -> SequenceSource

Create a new source instance with the given seed.

Source code in src/emic/experiments/registry.py
def create_source(self, seed: int = 42) -> SequenceSource:
    """Create a new source instance with the given seed."""
    return self.factory(_seed=seed, **self.parameters)

AlgorithmRegistry

AlgorithmRegistry()

Registry of benchmark algorithms.

Algorithms are inference methods that reconstruct epsilon-machines.

Example

registry = AlgorithmRegistry() registry.register( name="cssr", display_name="CSSR", factory=CSSR, config_class=CSSRConfig, default_config={"max_history": 5, "significance": 0.05}, ) algo_info = registry.get("cssr") algo = algo_info.create_algorithm(max_history=8)

Source code in src/emic/experiments/registry.py
def __init__(self) -> None:
    self._algorithms: dict[str, AlgorithmInfo] = {}

register

register(
    name: str,
    display_name: str,
    factory: Callable[..., InferenceAlgorithm],
    config_class: type | None = None,
    default_config: dict[str, Any] | None = None,
    slow: bool = False,
    description: str = "",
) -> None

Register an algorithm.

Source code in src/emic/experiments/registry.py
def register(
    self,
    name: str,
    display_name: str,
    factory: Callable[..., InferenceAlgorithm],
    config_class: type | None = None,
    default_config: dict[str, Any] | None = None,
    slow: bool = False,
    description: str = "",
) -> None:
    """Register an algorithm."""
    self._algorithms[name] = AlgorithmInfo(
        name=name,
        display_name=display_name,
        factory=factory,
        config_class=config_class,
        default_config=default_config or {},
        slow=slow,
        description=description,
    )

get

get(name: str) -> AlgorithmInfo

Get a registered algorithm by name.

Source code in src/emic/experiments/registry.py
def get(self, name: str) -> AlgorithmInfo:
    """Get a registered algorithm by name."""
    if name not in self._algorithms:
        raise KeyError(f"Unknown algorithm: {name}. Available: {list(self._algorithms)}")
    return self._algorithms[name]

list

list(include_slow: bool = True) -> list[str]

List all registered algorithm names.

Source code in src/emic/experiments/registry.py
def list(self, include_slow: bool = True) -> list[str]:
    """List all registered algorithm names."""
    if include_slow:
        return list(self._algorithms.keys())
    return [name for name, info in self._algorithms.items() if not info.slow]

AlgorithmInfo dataclass

AlgorithmInfo(
    name: str,
    display_name: str,
    factory: Callable[..., InferenceAlgorithm],
    config_class: type | None = None,
    default_config: dict[str, Any] = dict(),
    slow: bool = False,
    description: str = "",
)

Information about a benchmark algorithm.

Attributes:

Name Type Description
name str

Unique identifier (e.g., "cssr")

display_name str

Human-readable name (e.g., "CSSR")

factory Callable[..., InferenceAlgorithm]

Callable that creates the algorithm (takes config kwargs)

config_class type | None

Configuration class for the algorithm

default_config dict[str, Any]

Default configuration parameters

slow bool

Whether this algorithm is slow (skipped in --quick mode)

description str

Optional description

create_algorithm

create_algorithm(
    **config_overrides: Any,
) -> InferenceAlgorithm

Create a new algorithm instance with merged config.

Source code in src/emic/experiments/registry.py
def create_algorithm(self, **config_overrides: Any) -> InferenceAlgorithm:
    """Create a new algorithm instance with merged config."""
    config = {**self.default_config, **config_overrides}
    if self.config_class is not None:
        cfg = self.config_class(**config)
        return self.factory(cfg)
    return self.factory(**config)

Result Schema

BenchmarkResult dataclass

BenchmarkResult(
    experiment: str,
    algorithm: str,
    process: str,
    n_samples: int,
    metric: str,
    value: float,
    ground_truth: float | None = None,
    error: str | None = None,
    timestamp: datetime = (lambda: now(UTC))(),
)

A single benchmark measurement.

Represents one algorithm run on one process configuration, measuring one metric. Multiple BenchmarkResults form a complete benchmark run.

Attributes:

Name Type Description
experiment str

Experiment identifier (e.g., "accuracy", "convergence")

algorithm str

Algorithm name (e.g., "cssr", "spectral", "bsi")

process str

Process name (e.g., "even_process", "golden_mean")

n_samples int

Number of samples used for inference

metric str

Metric name (e.g., "cmu", "hmu", "state_count", "duration_s")

value float

Measured value

ground_truth float | None

Expected value if known, None otherwise

error str | None

Exception message if run failed, None otherwise

timestamp datetime

When this measurement was recorded

to_dict

to_dict() -> dict

Convert to dictionary for DataFrame construction.

Source code in src/emic/experiments/schema.py
def to_dict(self) -> dict:
    """Convert to dictionary for DataFrame construction."""
    d = asdict(self)
    # Convert datetime to ISO string for Parquet compatibility
    d["timestamp"] = self.timestamp.isoformat()
    return d

RunMetadata dataclass

RunMetadata(
    timestamp: datetime,
    git_commit: str,
    git_dirty: bool,
    python_version: str,
    emic_version: str,
    cli_args: list[str],
    duration_seconds: float | None = None,
    completed: bool = False,
)

Metadata for a complete benchmark run.

Captures environment and configuration for reproducibility.

Attributes:

Name Type Description
timestamp datetime

When the run started

git_commit str

Git commit hash (short form)

git_dirty bool

Whether working directory had uncommitted changes

python_version str

Python version string

emic_version str

emic package version

cli_args list[str]

Command-line arguments used

duration_seconds float | None

Total run duration

completed bool

Whether all experiments finished successfully

to_dict

to_dict() -> dict

Convert to dictionary for YAML serialization.

Source code in src/emic/experiments/schema.py
def to_dict(self) -> dict:
    """Convert to dictionary for YAML serialization."""
    return {
        "timestamp": self.timestamp.isoformat(),
        "git_commit": self.git_commit,
        "git_dirty": self.git_dirty,
        "python_version": self.python_version,
        "emic_version": self.emic_version,
        "cli_args": self.cli_args,
        "duration_seconds": self.duration_seconds,
        "completed": self.completed,
    }

ResultsWriter

ResultsWriter(
    base_dir: str | Path,
    shard: tuple[int, int] | None = None,
    run_dir: Path | None = None,
)

Write benchmark results to disk.

Creates timestamped directories with Parquet data and YAML metadata. Updates a 'latest' symlink for convenient access.

Example

writer = ResultsWriter(base_dir="experiments/results") writer.add_result(result1) writer.add_result(result2) writer.finalize(metadata)

Creates: experiments/results/2026-01-26T14-32-05/

├── metadata.yaml

└── results.parquet

For sharded runs

writer = ResultsWriter(base_dir="experiments/results", shard=(0, 4))

Creates: results_shard0.parquet instead of results.parquet

Initialize writer with output directory.

Parameters:

Name Type Description Default
base_dir str | Path

Base directory for results (e.g., "experiments/results")

required
shard tuple[int, int] | None

Optional (shard_index, total_shards) for sharded output

None
run_dir Path | None

Optional explicit run directory (for sharded runs sharing a dir)

None
Source code in src/emic/experiments/schema.py
def __init__(
    self,
    base_dir: str | Path,
    shard: tuple[int, int] | None = None,
    run_dir: Path | None = None,
) -> None:
    """
    Initialize writer with output directory.

    Args:
        base_dir: Base directory for results (e.g., "experiments/results")
        shard: Optional (shard_index, total_shards) for sharded output
        run_dir: Optional explicit run directory (for sharded runs sharing a dir)
    """
    self.base_dir = Path(base_dir)
    self.timestamp = datetime.now(UTC)
    self.results: list[BenchmarkResult] = []
    self.shard = shard
    self._run_dir: Path | None = run_dir

run_dir property

run_dir: Path

Get the timestamped directory for this run.

results_filename property

results_filename: str

Get the results filename, accounting for sharding.

add_result

add_result(result: BenchmarkResult) -> None

Add a single result to the collection.

Source code in src/emic/experiments/schema.py
def add_result(self, result: BenchmarkResult) -> None:
    """Add a single result to the collection."""
    self.results.append(result)

add_results

add_results(results: list[BenchmarkResult]) -> None

Add multiple results to the collection.

Source code in src/emic/experiments/schema.py
def add_results(self, results: list[BenchmarkResult]) -> None:
    """Add multiple results to the collection."""
    self.results.extend(results)

save_incremental

save_incremental() -> None

Save current results incrementally.

Useful for long-running benchmarks to preserve partial results.

Source code in src/emic/experiments/schema.py
def save_incremental(self) -> None:
    """
    Save current results incrementally.

    Useful for long-running benchmarks to preserve partial results.
    """
    if not self.results:
        return
    self._ensure_run_dir()
    self._write_parquet(self.run_dir / self.results_filename)

finalize

finalize(metadata: RunMetadata) -> Path

Write final results and metadata, update 'latest' symlink.

Parameters:

Name Type Description Default
metadata RunMetadata

Run metadata to save

required

Returns:

Type Description
Path

Path to the results directory

Source code in src/emic/experiments/schema.py
def finalize(self, metadata: RunMetadata) -> Path:
    """
    Write final results and metadata, update 'latest' symlink.

    Args:
        metadata: Run metadata to save

    Returns:
        Path to the results directory
    """
    self._ensure_run_dir()

    # Write results
    if self.results:
        self._write_parquet(self.run_dir / self.results_filename)

    # Write metadata (only for non-sharded runs, or include shard info)
    if self.shard is not None:
        shard_index, total_shards = self.shard
        self._write_shard_metadata(metadata, shard_index, total_shards)
    else:
        self._write_metadata(metadata)
        # Update latest symlink (only for non-sharded runs)
        self._update_latest_symlink()

    return self.run_dir

read_results

read_results(path: str | Path) -> DataFrame

Read benchmark results from Parquet or JSON.

Parameters:

Name Type Description Default
path str | Path

Path to results.parquet or results.json

required

Returns:

Type Description
DataFrame

DataFrame with benchmark results

Source code in src/emic/experiments/schema.py
def read_results(path: str | Path) -> pd.DataFrame:
    """
    Read benchmark results from Parquet or JSON.

    Args:
        path: Path to results.parquet or results.json

    Returns:
        DataFrame with benchmark results
    """
    import pandas as pd

    path = Path(path)
    if path.suffix == ".parquet":
        return pd.read_parquet(path)
    elif path.suffix == ".json":
        import json

        with path.open() as f:
            data = json.load(f)
        return pd.DataFrame(data)
    else:
        raise ValueError(f"Unsupported file format: {path.suffix}")

read_latest_results

read_latest_results(base_dir: str | Path) -> DataFrame

Read results from the 'latest' run.

Parameters:

Name Type Description Default
base_dir str | Path

Base results directory (e.g., "experiments/results")

required

Returns:

Type Description
DataFrame

DataFrame with benchmark results

Source code in src/emic/experiments/schema.py
def read_latest_results(base_dir: str | Path) -> pd.DataFrame:
    """
    Read results from the 'latest' run.

    Args:
        base_dir: Base results directory (e.g., "experiments/results")

    Returns:
        DataFrame with benchmark results
    """
    base = Path(base_dir)
    latest = base / "latest"

    if not latest.exists():
        raise FileNotFoundError(f"No 'latest' results found in {base}")

    # Try parquet first, then JSON
    parquet_path = latest / "results.parquet"
    if parquet_path.exists():
        return read_results(parquet_path)

    json_path = latest / "results.json"
    if json_path.exists():
        return read_results(json_path)

    raise FileNotFoundError(f"No results file found in {latest}")

Functions

get_process_registry

get_process_registry() -> ProcessRegistry

Get the default process registry (lazy-initialized).

Source code in src/emic/experiments/registry.py
def get_process_registry() -> ProcessRegistry:
    """Get the default process registry (lazy-initialized)."""
    global _default_process_registry
    if _default_process_registry is None:
        _default_process_registry = create_default_process_registry()
    return _default_process_registry

get_algorithm_registry

get_algorithm_registry() -> AlgorithmRegistry

Get the default algorithm registry (lazy-initialized).

Source code in src/emic/experiments/registry.py
def get_algorithm_registry() -> AlgorithmRegistry:
    """Get the default algorithm registry (lazy-initialized)."""
    global _default_algorithm_registry
    if _default_algorithm_registry is None:
        _default_algorithm_registry = create_default_algorithm_registry()
    return _default_algorithm_registry