Source code for gantry.callbacks.callback

import dataclasses
from dataclasses import dataclass
from typing import Any

from beaker import (
    Beaker,
    BeakerDataset,
    BeakerExperimentSpec,
    BeakerJob,
    BeakerWorkload,
)
from dataclass_extensions import Registrable

from ..exceptions import GantryInterruptWorkload
from ..git_utils import GitRepoState


[docs] @dataclass(kw_only=True) class Callback(Registrable): """ Base class for gantry callbacks. Callbacks provide a way to hook into gantry's launch loop to customize behavior on certain events. """ _beaker: Beaker | None = dataclasses.field(repr=False, init=False, default=None) _git_repo: GitRepoState | None = dataclasses.field(repr=False, init=False, default=None) _spec: BeakerExperimentSpec | None = dataclasses.field(repr=False, init=False, default=None) _workload: BeakerWorkload | None = dataclasses.field(repr=False, init=False, default=None) @property def beaker(self) -> Beaker: """ A beaker client that can be accessed after :meth:`attach()` is called. """ if self._beaker is None: raise RuntimeError("Callback has not been attached to a gantry workload yet") return self._beaker @property def git_repo(self) -> GitRepoState: """ The git repo state that can be accessed after :meth:`attach()` is called. """ if self._git_repo is None: raise RuntimeError("Callback has not been attached to a gantry workload yet") return self._git_repo @property def spec(self) -> BeakerExperimentSpec: """ The experiment spec that can be accessed after :meth:`attach()` is called. """ if self._spec is None: raise RuntimeError("Callback has not been attached to a gantry workload yet") return self._spec @property def workload(self) -> BeakerWorkload: """ The workload that can be accessed after :meth:`attach()` is called. """ if self._workload is None: raise RuntimeError("Callback has not been attached to a gantry workload yet") return self._workload
[docs] def interrupt_workload(self): """Cancels the active workload.""" raise GantryInterruptWorkload(f"workload interrupted by callback {self.__class__.__name__}")
[docs] def attach( self, *, beaker: Beaker, git_repo: GitRepoState, spec: BeakerExperimentSpec, workload: BeakerWorkload, ): """ Runs when a callback is attached to the workload. """ self._beaker = beaker self._git_repo = git_repo self._spec = spec self._workload = workload
[docs] def detach(self): """ Runs when a callback is detached from the workload. """ self._beaker = None self._git_repo = None self._spec = None self._workload = None
[docs] def on_start(self, job: BeakerJob): """ Runs when a job for the workload starts. """ del job
[docs] def on_log(self, job: BeakerJob, log_line: str, log_time: float): """ Runs when a new log event is received from the workload. """ del job, log_line, log_time
[docs] def on_no_new_logs(self, job: BeakerJob): """ Periodically runs when no new logs have been received from the workload recently. """ del job
[docs] def on_start_timeout(self, job: BeakerJob): """ Runs when the active job for the workload hits the configured start timeout before starting. """ del job
[docs] def on_timeout(self, job: BeakerJob): """ Runs when the active job for the workload hits the configured timeout before completing. """ del job
[docs] def on_inactive_timeout(self, job: BeakerJob): """ Runs when the active job for the workload hits the configured inactive timeout. """ del job
[docs] def on_inactive_soft_timeout(self, job: BeakerJob): """ Runs when the active job for the workload hits the configured inactive hard timeout. """ del job
[docs] def on_preemption(self, job: BeakerJob): """ Runs when the active job for the workload is preempted. """ del job
[docs] def on_cancellation(self, job: BeakerJob | None): """ Runs when the active job for the workload is canceled by the user, either directly or because. a timeout was reached. """ del job
[docs] def on_failure( self, job: BeakerJob, *, metrics: dict[str, Any] | None = None, results_ds: BeakerDataset | None = None, ): """ Runs when the active job for the workload fails. """ del job, metrics, results_ds
[docs] def on_success( self, job: BeakerJob, *, metrics: dict[str, Any] | None = None, results_ds: BeakerDataset | None = None, ): """ Runs when the active job for the workload succeeds. """ del job, metrics, results_ds