Source code for beaker._experiment

import grpc
import yaml

from . import beaker_pb2 as pb2
from ._service_client import RpcMethod, ServiceClient
from .exceptions import *
from .types import *


[docs] class ExperimentClient(ServiceClient): """ Methods for interacting with Beaker `Experiments <https://beaker-docs.apps.allenai.org/concept/experiments.html>`_. Accessed via the :data:`Beaker.experiment <beaker.Beaker.experiment>` property. .. warning:: Do not instantiate this class directly! The :class:`~beaker.Beaker` client will create one automatically which you can access through the corresponding property. """
[docs] def create( self, *, spec: BeakerExperimentSpec | PathOrStr, name: str | None = None, workspace: pb2.Workspace | None = None, ) -> pb2.Workload: """ Create/launch a new experiment workload. .. seealso:: Use :data:`Beaker.workload <beaker._workload.WorkloadClient>` methods for working with workload objects. :returns: A :class:`~beaker.types.BeakerWorkload` object. :raises ~beaker.exceptions.BeakerExperimentConflict: If an experiment with the given name already exists. """ if name is not None: self._validate_beaker_name(name) if not isinstance(spec, BeakerExperimentSpec): spec = BeakerExperimentSpec.from_file(spec) json_spec = spec.to_json() workspace_id = self.resolve_workspace_id(workspace) data = self.http_request( f"workspaces/{workspace_id}/experiments", method="POST", query=None if name is None else {"name": name}, data=json_spec, exceptions_for_status=None if name is None else {409: BeakerExperimentConflict(name)}, ).json() return self.beaker.workload.get(data["id"])
[docs] def get_spec(self, experiment: pb2.Experiment | pb2.Workload) -> BeakerExperimentSpec: """ Get the spec that defined an experiment. :param experiment: The :class:`~beaker.types.BeakerExperiment` or :class:`~beaker.types.BeakerWorkload`. :returns: The corresponding :class:`~beaker.types.BeakerExperimentSpec`. """ yaml_str = self.rpc_request( RpcMethod[pb2.GetExperimentYamlSpecResponse](self.service.GetExperimentYamlSpec), pb2.GetExperimentYamlSpecRequest(experiment_id=self.resolve_experiment_id(experiment)), exceptions_for_status={grpc.StatusCode.NOT_FOUND: BeakerExperimentNotFound(experiment)}, ).experiment_spec json_dict = yaml.safe_load(yaml_str) return BeakerExperimentSpec.from_json(json_dict)
[docs] def restart_tasks(self, experiment: pb2.Experiment | pb2.Workload) -> pb2.Workload: """ Restart all failed or canceled tasks of an experiment. :param experiment: The :class:`~beaker.types.BeakerExperiment` or :class:`~beaker.types.BeakerWorkload`. :returns: The updated :class:`~beaker.types.BeakerWorkload`. """ return self.rpc_request( RpcMethod[pb2.RestartExperimentTasksResponse](self.service.RestartExperimentTasks), pb2.RestartExperimentTasksRequest(experiment_id=self.resolve_experiment_id(experiment)), exceptions_for_status={grpc.StatusCode.NOT_FOUND: BeakerExperimentNotFound(experiment)}, ).workload
[docs] def url(self, experiment: pb2.Experiment | pb2.Workload) -> str: return f"{self.config.agent_address}/ex/{self._url_quote(self.resolve_experiment_id(experiment))}"