Source code for beaker._job

import logging
from datetime import datetime
from typing import Iterable, Literal

import grpc

from . import beaker_pb2 as pb2
from ._service_client import RpcMethod, RpcStreamingMethod, ServiceClient
from .exceptions import *
from .types import *


[docs] class JobClient(ServiceClient): """ Methods for interacting with Beaker Jobs. Accessed via the :data:`Beaker.job <beaker.Beaker.job>` property. .. warning:: Do not instantiate this class directly! The :class:`~beaker.Beaker` client will create one automatically which you can access through the corresponding property. """
[docs] def get(self, job_id: str) -> pb2.Job: """ :examples: >>> with Beaker.from_env() as beaker: ... job = beaker.job.get(job_id) :returns: A :class:`~beaker.types.BeakerJob` protobuf object. :raises ~beaker.exceptions.BeakerJobNotFound: If the job doesn't exist. """ return self.rpc_request( RpcMethod[pb2.GetJobResponse](self.service.GetJob), pb2.GetJobRequest(job_id=job_id), exceptions_for_status={grpc.StatusCode.NOT_FOUND: BeakerJobNotFound(job_id)}, ).job
[docs] def get_results(self, job: pb2.Job) -> pb2.Dataset | None: """ :returns: A :class:`~beaker.types.BeakerDataset` protobuf object. """ if job.assignment_details.HasField("result_dataset_id"): return self.beaker.dataset.get(job.assignment_details.result_dataset_id) else: return None
[docs] def logs( self, job: pb2.Job, *, tail_lines: int | None = None, follow: bool | None = None, since: datetime | None = None, ) -> Iterable[pb2.JobLog]: """ :returns: An iterator over :class:`~beaker.types.BeakerJobLog` protobuf objects. """ request = pb2.StreamJobLogsRequest( job_id=job.id, tail_lines=tail_lines, follow=follow, # type: ignore since=since, # type: ignore ) retries = 0 last_log_timestamp = None def update_request(): # Update request to restart streaming from the last log message received. if last_log_timestamp is not None: request.MergeFrom(pb2.StreamJobLogsRequest(since=last_log_timestamp)) request.since.nanos += 1 # NOTE: 'since' timestamp is now a clone of 'last_log_timestamp', so modifying in-place is okay. request.ClearField("tail_lines") while True: try: for job_log in self.rpc_streaming_request( RpcStreamingMethod[pb2.JobLog](self.service.StreamJobLogs), request, exceptions_for_status={grpc.StatusCode.NOT_FOUND: BeakerJobNotFound(job)}, ): retries = 0 # reset because we've successfully received a new log last_log_timestamp = job_log.timestamp yield job_log # NOTE: Work-around for #6602 # If we're following the job, continue making streaming requests until the job is officially finalized. if follow and not self.get(job.id).status.HasField("finalized"): update_request() continue return except BeakerStreamConnectionClosedError as err: # These errors are expected, see https://github.com/allenai/beaker/issues/6532 self._log_and_wait(1, err, log_level=logging.DEBUG) update_request() except BeakerServerError as err: if retries < self.beaker.MAX_RETRIES: self._log_and_wait(retries, err) retries += 1 update_request() else: raise
[docs] def list_summarized_events( self, job: pb2.Job, *, sort_order: BeakerSortOrder | None = None, sort_field: Literal["latest_occurrence"] = "latest_occurrence", limit: int | None = None, ) -> Iterable[pb2.SummarizedJobEvent]: """ :returns: An iterator over :class:`~beaker.types.BeakerSummarizedJobEvent` protobuf objects. """ if limit is not None and limit <= 0: raise ValueError("'limit' must be a positive integer") count = 0 for response in self.rpc_paged_request( RpcMethod[pb2.ListSummarizedJobEventsResponse](self.service.ListSummarizedJobEvents), pb2.ListSummarizedJobEventsRequest( options=pb2.ListSummarizedJobEventsRequest.Opts( sort_clause=pb2.ListSummarizedJobEventsRequest.Opts.SortClause( sort_order=None if sort_order is None else sort_order.as_pb2(), latest_occurrence={} if sort_field == "latest_occurrence" else None, ), job_id=job.id, page_size=self.MAX_PAGE_SIZE if limit is None else min(self.MAX_PAGE_SIZE, limit), ) ), ): for event in response.summarized_job_events: count += 1 yield event if limit is not None and count >= limit: return
[docs] def list( self, *, org: pb2.Organization | None = None, task: pb2.Task | None = None, environment: pb2.Environment | None = None, finalized: bool | None = None, elegible_for_cluster: pb2.Cluster | None = None, scheduled_on_node: pb2.Node | None = None, scheduled_on_cluster: pb2.Cluster | None = None, scheduled: bool | None = None, sort_order: BeakerSortOrder = BeakerSortOrder.descending, sort_field: Literal["created", "cluster_job_queue"] = "created", limit: int | None = None, ) -> Iterable[pb2.Job]: """ :returns: An iterator over :class:`~beaker.types.BeakerJob` protobuf objects. """ if limit is not None and limit <= 0: raise ValueError("'limit' must be a positive integer") count = 0 for response in self.rpc_paged_request( RpcMethod[pb2.ListJobsResponse](self.service.ListJobs), pb2.ListJobsRequest( options=pb2.ListJobsRequest.Opts( sort_clause=pb2.ListJobsRequest.Opts.SortClause( sort_order=None if sort_order is None else sort_order.as_pb2(), created={} if sort_field == "created" else None, cluster_job_queue={} if sort_field == "cluster_job_queue" else None, ), organization_id=self.resolve_org_id(org), task_id=None if task is None else task.id, environment_id=None if environment is None else environment.id, finalized=finalized, # type: ignore eligible_for_cluster_id=None if elegible_for_cluster is None else elegible_for_cluster.id, scheduled_on_node_id=None if scheduled_on_node is None else scheduled_on_node.id, scheduled_on_cluster_id=None if scheduled_on_cluster is None else scheduled_on_cluster.id, scheduled=scheduled, # type: ignore page_size=self.MAX_PAGE_SIZE if limit is None else min(self.MAX_PAGE_SIZE, limit), ) ), ): for job in response.jobs: count += 1 yield job if limit is not None and count >= limit: return
[docs] def url(self, job: pb2.Job) -> str: """ Get the URL to the job on the Beaker dashboard. """ job_id = job.id return f"{self.config.agent_address}/job/{self._url_quote(job_id)}"