API

Gantry’s public API.

class Recipe(args: Sequence[str], name: str | None = None, description: str | None = None, workspace: str | None = None, budget: str | None = None, group_names: Sequence[str] | None = None, allow_dirty: bool = False, yes: bool | None = None, save_spec: PathLike | str | None = None, callbacks: Sequence[Callback] | None = None, clusters: Sequence[str] | None = None, gpu_types: Sequence[str] | None = None, interconnect: Literal['ib', 'tcpxo'] | None = None, tags: Sequence[str] | None = None, hostnames: Sequence[str] | None = None, cpus: float | None = None, gpus: int | None = None, memory: str | None = None, shared_memory: str | None = None, beaker_image: str | None = None, docker_image: str | None = None, datasets: Sequence[str] | None = None, env_vars: Sequence[str | tuple[str, str]] | None = None, env_secrets: Sequence[str | tuple[str, str]] | None = None, dataset_secrets: Sequence[str | tuple[str, str]] | None = None, mounts: Sequence[str | tuple[str, str]] | None = None, weka: Sequence[str | tuple[str, str]] | None = None, uploads: Sequence[str | tuple[str, str]] | None = None, ref: str | None = None, branch: str | None = None, git_repo: GitRepoState | None = None, gh_token_secret: str = 'GITHUB_TOKEN', aws_config_secret: str | None = None, aws_credentials_secret: str | None = None, google_credentials_secret: str | None = None, results: str = '/results', task_name: str = 'main', priority: str | None = None, task_timeout: str | None = None, preemptible: bool | None = None, retries: int | None = None, replicas: int | None = None, leader_selection: bool | None = None, host_networking: bool | None = None, propagate_failure: bool | None = None, propagate_preemption: bool | None = None, synchronized_start_timeout: str | None = None, skip_tcpxo_setup: bool = False, skip_nccl_setup: bool = False, runtime_dir: str = '/gantry-runtime', exec_method: Literal['exec', 'bash'] = 'exec', torchrun: bool = False, pre_setup: str | None = None, post_setup: str | None = None, python_manager: Literal['uv', 'conda'] | None = None, default_python_version: str = '3.10', system_python: bool = False, install: str | None = None, no_python: bool = False, uv_venv: str | None = None, uv_extras: Sequence[str] | None = None, uv_all_extras: bool | None = None, uv_torch_backend: str | None = None, conda_file: PathLike | str | None = None, conda_env: str | None = None)[source]

A recipe defines how Gantry creates a Beaker workload and can be used to programmatically launch Gantry runs from Python as opposed to from the command-line.

args: Sequence[str]
name: str | None = None
description: str | None = None
workspace: str | None = None
budget: str | None = None
group_names: Sequence[str] | None = None
allow_dirty: bool = False
yes: bool | None = None
save_spec: PathLike | str | None = None
callbacks: Sequence[Callback] | None = None
clusters: Sequence[str] | None = None
gpu_types: Sequence[str] | None = None
interconnect: Literal['ib', 'tcpxo'] | None = None
tags: Sequence[str] | None = None
hostnames: Sequence[str] | None = None
cpus: float | None = None
gpus: int | None = None
memory: str | None = None
shared_memory: str | None = None
beaker_image: str | None = None
docker_image: str | None = None
datasets: Sequence[str] | None = None
env_vars: Sequence[str | tuple[str, str]] | None = None
env_secrets: Sequence[str | tuple[str, str]] | None = None
dataset_secrets: Sequence[str | tuple[str, str]] | None = None
mounts: Sequence[str | tuple[str, str]] | None = None
weka: Sequence[str | tuple[str, str]] | None = None
uploads: Sequence[str | tuple[str, str]] | None = None
ref: str | None = None
branch: str | None = None
git_repo: GitRepoState | None = None
gh_token_secret: str = 'GITHUB_TOKEN'
aws_config_secret: str | None = None
aws_credentials_secret: str | None = None
google_credentials_secret: str | None = None
results: str = '/results'
task_name: str = 'main'
priority: str | None = None
task_timeout: str | None = None
preemptible: bool | None = None
retries: int | None = None
replicas: int | None = None
leader_selection: bool | None = None
host_networking: bool | None = None
propagate_failure: bool | None = None
propagate_preemption: bool | None = None
synchronized_start_timeout: str | None = None
skip_tcpxo_setup: bool = False
skip_nccl_setup: bool = False
runtime_dir: str = '/gantry-runtime'
exec_method: Literal['exec', 'bash'] = 'exec'
torchrun: bool = False
pre_setup: str | None = None
post_setup: str | None = None
python_manager: Literal['uv', 'conda'] | None = None
default_python_version: str = '3.10'
system_python: bool = False
install: str | None = None
no_python: bool = False
uv_venv: str | None = None
uv_extras: Sequence[str] | None = None
uv_all_extras: bool | None = None
uv_torch_backend: str | None = None
conda_file: PathLike | str | None = None
conda_env: str | None = None
classmethod multi_node_torchrun(cmd: Sequence[str], gpus_per_node: int, num_nodes: int, shared_memory: str | None = '10GiB', **kwargs) Recipe[source]

Create a multi-node recipe using torchrun.

dry_run(client: Beaker | None = None) None[source]

Do a dry-run to validate options.

launch(show_logs: bool | None = None, timeout: int | None = None, start_timeout: int | None = None, inactive_timeout: int | None = None, inactive_soft_timeout: int | None = None, client: Beaker | None = None) Workload[source]

Launch an experiment on Beaker. Same as the gantry run command.

Returns:

The Beaker workload.

with_replicas(replicas: int, leader_selection: bool = True, host_networking: bool = True, propagate_failure: bool = True, propagate_preemption: bool = True, synchronized_start_timeout: str = '5m', skip_nccl_setup: bool = False) Recipe[source]

Add replicas to the recipe.

class GitRepoState(repo: str, repo_url: str, ref: str, branch: str | None = None)[source]

Represents the state of a local git repository.

Tip

Use from_env() to instantiate this class.

repo: str

The repository name, e.g. "allenai/beaker-gantry".

repo_url: str

The repository URL for cloning, e.g. "https://github.com/allenai/beaker-gantry".

ref: str

The current commit ref/SHA.

branch: str | None = None

The current active branch, if any.

property is_dirty: bool

If the local repository state is dirty (uncommitted changes).

property is_public: bool

If the repository is public.

property short_ref: str

Short, 7-character version of the current ref.

property ref_url: str

The URL to the current ref.

property branch_url: str | None

The URL to the current active branch.

property commit_message: str | None

Full commit message.

short_commit_message(max_length: int = 50) str | None[source]

The commit message, truncated to max_length characters.

is_in_tree(path: PathLike | str) bool[source]

Check if a file is in the tree.

classmethod from_env(ref: str | None = None, branch: str | None = None) GitRepoState[source]

Instantiate this class from the root of a git repository.

Raises:
class Callback(*args, type: str | None = None, **kwargs)[source]

Base class for gantry callbacks. Callbacks provide a way to hook into gantry’s launch loop to customize behavior on certain events.

property beaker: Beaker

A beaker client that can be accessed after attach() is called.

property git_repo: GitRepoState

The git repo state that can be accessed after attach() is called.

property spec: BeakerExperimentSpec

The experiment spec that can be accessed after attach() is called.

property workload: Workload

The workload that can be accessed after attach() is called.

interrupt_workload()[source]

Cancels the active workload.

attach(*, beaker: Beaker, git_repo: GitRepoState, spec: BeakerExperimentSpec, workload: Workload)[source]

Runs when a callback is attached to the workload.

detach()[source]

Runs when a callback is detached from the workload.

on_start(job: Job)[source]

Runs when a job for the workload starts.

on_log(job: Job, log_line: str, log_time: float)[source]

Runs when a new log event is received from the workload.

on_no_new_logs(job: Job)[source]

Periodically runs when no new logs have been received from the workload recently.

on_start_timeout(job: Job)[source]

Runs when the active job for the workload hits the configured start timeout before starting.

on_timeout(job: Job)[source]

Runs when the active job for the workload hits the configured timeout before completing.

on_inactive_timeout(job: Job)[source]

Runs when the active job for the workload hits the configured inactive timeout.

on_inactive_soft_timeout(job: Job)[source]

Runs when the active job for the workload hits the configured inactive hard timeout.

on_preemption(job: Job)[source]

Runs when the active job for the workload is preempted.

on_cancellation(job: Job | None)[source]

Runs when the active job for the workload is canceled by the user, either directly or because. a timeout was reached.

on_failure(job: Job, *, metrics: dict[str, Any] | None = None, results_ds: Dataset | None = None)[source]

Runs when the active job for the workload fails.

on_success(job: Job, *, metrics: dict[str, Any] | None = None, results_ds: Dataset | None = None)[source]

Runs when the active job for the workload succeeds.

class SlackCallback(*, type: dataclasses.InitVar[str | None] = 'slack', webhook_url: str)
registered_base

alias of Callback

registered_name: ClassVar[str] = 'slack'
type: dataclasses.InitVar[str | None] = 'slack'
webhook_url: str
launch_experiment(args: Sequence[str], name: str | None = None, description: str | None = None, task_name: str = 'main', workspace: str | None = None, group_names: Sequence[str] | None = None, clusters: Sequence[str] | None = None, gpu_types: Sequence[str] | None = None, interconnect: Literal['ib', 'tcpxo'] | None = None, tags: Sequence[str] | None = None, hostnames: Sequence[str] | None = None, beaker_image: str | None = None, docker_image: str | None = None, cpus: float | None = None, gpus: int | None = None, memory: str | None = None, shared_memory: str | None = None, datasets: Sequence[str] | None = None, gh_token_secret: str = 'GITHUB_TOKEN', ref: str | None = None, branch: str | None = None, conda_file: PathLike | str | None = None, conda_env: str | None = None, python_manager: Literal['uv', 'conda'] | None = None, system_python: bool = False, uv_venv: str | None = None, uv_extras: Sequence[str] | None = None, uv_all_extras: bool | None = None, uv_torch_backend: str | None = None, env_vars: Sequence[str | tuple[str, str]] | None = None, env_secrets: Sequence[str | tuple[str, str]] | None = None, dataset_secrets: Sequence[str | tuple[str, str]] | None = None, mounts: Sequence[str | tuple[str, str]] | None = None, weka: Sequence[str | tuple[str, str]] | None = None, uploads: Sequence[str | tuple[str, str]] | None = None, timeout: int | None = None, task_timeout: str | None = None, start_timeout: int | None = None, inactive_timeout: int | None = None, inactive_soft_timeout: int | None = None, show_logs: bool | None = None, allow_dirty: bool = False, dry_run: bool = False, yes: bool | None = None, save_spec: PathLike | str | None = None, priority: str | None = None, install: str | None = None, no_python: bool = False, replicas: int | None = None, leader_selection: bool | None = None, host_networking: bool | None = None, propagate_failure: bool | None = None, propagate_preemption: bool | None = None, synchronized_start_timeout: str | None = None, budget: str | None = None, preemptible: bool | None = None, retries: int | None = None, results: str = '/results', runtime_dir: str = '/gantry-runtime', exec_method: Literal['exec', 'bash'] = 'exec', torchrun: bool = False, skip_tcpxo_setup: bool = False, skip_nccl_setup: bool = False, default_python_version: str = '3.10', pre_setup: str | None = None, post_setup: str | None = None, aws_config_secret: str | None = None, aws_credentials_secret: str | None = None, google_credentials_secret: str | None = None, callbacks: Sequence[Callback] | None = None, git_repo: GitRepoState | None = None, client: Beaker | None = None) Workload | None[source]

Launch an experiment on Beaker. Same as the gantry run command.

Parameters:

cli_mode – Set to True if this function is being called from a CLI command. This mostly affects how certain prompts and messages are displayed.

follow_workload(beaker: Beaker, workload: Workload, *, job: Job | None = None, task: Task | None = None, timeout: int | None = None, start_timeout: int | None = None, inactive_timeout: int | None = None, inactive_soft_timeout: int | None = None, tail: bool = False, show_logs: bool = True, auto_cancel: bool = False, callbacks: Sequence[Callback] | None = None) Job[source]

Follow a workload until completion while streaming logs to stdout.

Parameters:
  • task – A specific task in the workload to follow. Defaults to the first task.

  • timeout – The number of seconds to wait for the workload to complete. Raises a timeout error if it doesn’t complete in time.

  • start_timeout – The number of seconds to wait for the workload to start running. Raises a timeout error if it doesn’t start in time.

  • inactive_timeout – The number of seconds to wait for new logs before timing out. Raises a timeout error if no new logs are produced in time.

  • inactive_soft_timeout – The number of seconds to wait for new logs before timing out. Issues a warning notification if no new logs are produced in time.

  • tail – Start tailing the logs if a job is already running. Otherwise shows all logs.

  • show_logs – Set to False to avoid streaming the logs.

  • auto_cancel – Set to True to automatically cancel the workload on timeout or or SIGTERM.

Returns:

The finalized BeakerJob from the task being followed.

Raises:

BeakerJobTimeoutError – If timeout is set to a positive number and the workload doesn’t complete in time.

update_workload_description(description: str, strategy: Literal['append', 'prepend', 'replace'] = 'replace', beaker_token: str | None = None, client: Beaker | None = None) str[source]

Update the description of the Gantry workload that this process is running in.

Parameters:
  • description – The description to set or add, depending on the strategy.

  • strategy – One of “append”, “prepend”, or “replace” to indicate how the new description should be combined with the original description. Defaults to “replace”.

  • beaker_token – An optional Beaker API token to use. If not provided, the BEAKER_TOKEN environment variable will be used if set, or a Beaker config file. Alternatively you can provide an existing Beaker client via the client parameter.

  • client – An optional existing Beaker client to use. If not provided, a new client will be created using the provided beaker_token or environment/config.

write_metrics(metrics: dict[str, Any])[source]

Write result metrics for the Gantry workload that this process is running in.

Parameters:

metrics – A JSON-serializable dictionary of metrics to write.