Source code for beaker._workload

from datetime import datetime
from typing import Iterable, Literal

import grpc

from . import beaker_pb2 as pb2
from ._service_client import RpcMethod, ServiceClient
from .exceptions import *
from .types import *


[docs] class WorkloadClient(ServiceClient): """ Methods for interacting with Beaker Workloads. Accessed via the :data:`Beaker.workload <beaker.Beaker.workload>` property. .. warning:: Do not instantiate this class directly! The :class:`~beaker.Beaker` client will create one automatically which you can access through the corresponding property. """
[docs] def get(self, workload: str) -> pb2.Workload: """ :examples: >>> with Beaker.from_env() as beaker: ... workload = beaker.workload.get(workload_id) :returns: A :class:`~beaker.types.BeakerWorkload` protobuf object. :raises ~beaker.exceptions.BeakerWorkloadNotFound: If the workload doesn't exist. """ return self.rpc_request( RpcMethod[pb2.GetWorkloadResponse](self.service.GetWorkload), pb2.GetWorkloadRequest(workload_id=self.resolve_workload_id(workload)), exceptions_for_status={grpc.StatusCode.NOT_FOUND: BeakerWorkloadNotFound(workload)}, ).workload
[docs] def is_experiment(self, workload: pb2.Workload) -> bool: """ Returns ``True`` if an workload is an experiment-type workload. """ return workload.HasField("experiment")
[docs] def is_environment(self, workload: pb2.Workload) -> bool: """ Returns ``True`` if an workload is an environment-type (session) workload. """ return workload.HasField("environment")
[docs] def update( self, workload: pb2.Workload, *, name: str | None = None, description: str | None = None ) -> pb2.Workload: """ Update fields of a workload. :returns: The updated :class:`~beaker.types.BeakerWorkload` object. """ return self.rpc_request( RpcMethod[pb2.UpdateWorkloadResponse](self.service.UpdateWorkload), pb2.UpdateWorkloadRequest( workload_id=self.resolve_workload_id(workload), name=name, description=description ), exceptions_for_status={grpc.StatusCode.NOT_FOUND: BeakerWorkloadNotFound(workload)}, ).workload
[docs] def cancel(self, *workloads: pb2.Workload) -> Iterable[str]: """ Cancel a running workload. """ return self.rpc_request( RpcMethod[pb2.CancelWorkloadsResponse](self.service.CancelWorkloads), pb2.CancelWorkloadsRequest( workload_ids=[self.resolve_workload_id(workload) for workload in workloads] ), ).workload_ids
[docs] def delete(self, *workloads: pb2.Workload): """ Delete workloads. """ self.rpc_request( RpcMethod[pb2.DeleteWorkloadsResponse](self.service.DeleteWorkloads), pb2.DeleteWorkloadsRequest( workload_ids=[self.resolve_workload_id(workload) for workload in workloads] ), )
def _resolve_task( self, workload: pb2.Workload, task: pb2.Task | None = None ) -> pb2.Task | None: if task is None: if len(workload.experiment.tasks) == 0: return None else: return workload.experiment.tasks[0] # Make sure task is actually part of workload. all_tasks = [t.id for t in workload.experiment.tasks] if task.id not in all_tasks: raise ValueError( f"invalid task '{task.id}', task is not part of workload '{workload.experiment.id}' " f"with tasks {all_tasks}" ) return task
[docs] def get_latest_job( self, workload: pb2.Workload, *, task: pb2.Task | None = None, finalized: bool | None = None ) -> pb2.Job | None: """ Get the latest job of an experiment-type workload. :param task: Filter by a specific task. :param finalized: Filter by finalized status. :returns: The latest :class:`~beaker.types.BeakerJob` object or ``None``, if one hasn't been created yet. :raises ValueError: If the workload is not an experiment. """ env: pb2.Environment | None = None if self.is_experiment(workload): task = self._resolve_task(workload, task=task) if task is None: raise ValueError(f"Workload does not have any tasks, so no jobs: {workload}") elif self.is_environment(workload): if task is not None: raise ValueError(f"'task' option is invalid for environment workload: {workload}") env = workload.environment else: raise ValueError(f"Could not determine workload type: {workload}") jobs = list( self.beaker.job.list( task=task, environment=env, sort_field="created", sort_order=BeakerSortOrder.descending, limit=1, finalized=finalized, ) ) if not jobs: return None return jobs[0]
[docs] def get_results( self, workload: pb2.Workload, *, task: pb2.Task | None = None ) -> pb2.Dataset | None: """ Get the results :class:`~beaker.types.BeakerDataset` from a workload. """ job = self.get_latest_job(workload, task=task) if job is None: return None else: return self.beaker.job.get_results(job)
[docs] def list( self, *, org: pb2.Organization | None = None, author: pb2.User | None = None, workspace: pb2.Workspace | None = None, created_before: datetime | None = None, created_after: datetime | None = None, finalized: bool | None = None, workload_type: BeakerWorkloadType | None = None, statuses: Iterable[BeakerWorkloadStatus] | None = None, name_or_description: str | None = None, sort_order: BeakerSortOrder | None = None, sort_field: Literal["created"] = "created", limit: int | None = None, ) -> Iterable[pb2.Workload]: """ List workloads. :returns: An iterator over :class:`~beaker.types.BeakerWorkload` objects. """ Opts = pb2.ListWorkloadsRequest.Opts if limit is not None and limit <= 0: raise ValueError("'limit' must be a positive integer") count = 0 for response in self.rpc_paged_request( RpcMethod[pb2.ListWorkloadsResponse](self.service.ListWorkloads), pb2.ListWorkloadsRequest( options=Opts( sort_clause=Opts.SortClause( sort_order=None if sort_order is None else sort_order.as_pb2(), created={} if sort_field == "created" else None, ), created_before=created_before, # type: ignore[arg-type] created_after=created_after, # type: ignore[arg-type] job_finalized=finalized, # type: ignore[arg-type] workload_type=None if workload_type is None else workload_type.as_pb2(), statuses=None if statuses is None else [status.as_pb2() for status in statuses], name_or_description_substring=name_or_description, organization_id=self.resolve_org_id(org), author_id=None if author is None else self.resolve_user_id(author), workspace_id=None if workspace is None else self.resolve_workspace_id(workspace), page_size=self.MAX_PAGE_SIZE if limit is None else min(self.MAX_PAGE_SIZE, limit), ) ), ): for workload in response.workloads: count += 1 yield workload if limit is not None and count >= limit: return
[docs] def url(self, workload: pb2.Workload) -> str: """ Get the URL to a workload on the Beaker dashboard. """ return ( f"{self.config.agent_address}/ex/{self._url_quote(self.resolve_workload_id(workload))}" )