Source code for beaker._workspace

from typing import Iterable, Literal

import grpc

from . import beaker_pb2 as pb2
from ._service_client import RpcMethod, ServiceClient
from .exceptions import *
from .types import *


[docs] class WorkspaceClient(ServiceClient): """ Methods for interacting with Beaker `Workspaces <https://beaker-docs.apps.allenai.org/concept/workspaces.html>`_. Accessed via the :data:`Beaker.workspace <beaker.Beaker.workspace>` property. .. warning:: Do not instantiate this class directly! The :class:`~beaker.Beaker` client will create one automatically which you can access through the corresponding property. """
[docs] def get(self, workspace: str | None = None) -> pb2.Workspace: """ :examples: >>> with Beaker.from_env() as beaker: ... workspace = beaker.workspace.get(workspace_name) :returns: A :class:`~beaker.types.BeakerWorkspace` protobuf object. :raises: :class:`~beaker.exceptions.BeakerWorkspaceNotFound` if the workspace doesn't exist. """ return self.rpc_request( RpcMethod[pb2.GetWorkspaceResponse](self.service.GetWorkspace), pb2.GetWorkspaceRequest(workspace_id=self.resolve_workspace_id(workspace)), exceptions_for_status={grpc.StatusCode.NOT_FOUND: BeakerWorkspaceNotFound(workspace)}, ).workspace
[docs] def create( self, name: str, *, org: pb2.Organization | None = None, description: str | None = None, ) -> pb2.Workspace: org_id: str | None = None if "/" in name: org_name, name = name.split("/", 1) if org is None: org_id = self.resolve_org_id(org_name) elif org.name != org_name: raise ValueError("got conflicting options for organization") return self.rpc_request( RpcMethod[pb2.CreateWorkspaceResponse](self.service.CreateWorkspace), pb2.CreateWorkspaceRequest( name=self._validate_beaker_name(name), organization_id=org_id if org_id is not None else self.resolve_org_id(org), description=description, ), ).workspace
[docs] def update( self, workspace: pb2.Workspace | None = None, *, name: str | None = None, description: str | None = None, archived: bool | None = None, budget: str | None = None, maximum_workspace_priority: BeakerJobPriority | None = None, ) -> pb2.Workspace: return self.rpc_request( RpcMethod[pb2.UpdateWorkspaceResponse](self.service.UpdateWorkspace), pb2.UpdateWorkspaceRequest( workspace_id=self.resolve_workspace_id(workspace), name=None if name is None else self._validate_beaker_name(name), description=description, archived=archived, # type: ignore maximum_workspace_priority=None if maximum_workspace_priority is None else maximum_workspace_priority.as_pb2(), budget_id=None if budget is None else self.resolve_budget_id(budget), ), ).workspace
[docs] def transfer_into( self, *, entity_ids: list[str], workspace: pb2.Workspace | None = None ) -> pb2.Workspace: return self.rpc_request( RpcMethod[pb2.TransferIntoWorkspaceResponse](self.service.TransferIntoWorkspace), pb2.TransferIntoWorkspaceRequest( workspace_id=self.resolve_workspace_id(workspace), entity_ids=entity_ids ), ).workspace
[docs] def list( self, *, org: pb2.Organization | None = None, author: pb2.User | None = None, workload_author: pb2.User | None = None, name_or_description: str | None = None, only_archived: bool = False, sort_order: BeakerSortOrder | None = None, sort_field: Literal[ "created", "name", "recent_workload_activity", "maximum_workload_priority" ] = "name", include_workspace_size: bool = False, limit: int | None = None, ) -> Iterable[pb2.Workspace]: if limit is not None and limit <= 0: raise ValueError("'limit' must be a positive integer") count = 0 for response in self.rpc_paged_request( RpcMethod[pb2.ListWorkspacesResponse](self.service.ListWorkspaces), pb2.ListWorkspacesRequest( options=pb2.ListWorkspacesRequest.Opts( sort_clause=pb2.ListWorkspacesRequest.Opts.SortClause( sort_order=None if sort_order is None else sort_order.as_pb2(), created={} if sort_field == "created" else None, name={} if sort_field == "name" else None, recent_workload_activity={} if sort_field == "recent_workload_activity" else None, maximum_workload_priority={} if sort_field == "maximum_workload_priority" else None, ), organization_id=self.resolve_org_id(org), author_id=None if author is None else self.resolve_user_id(author), only_archived=only_archived, workload_author_id=None if workload_author is None else self.resolve_user_id(workload_author), name_or_description_substring=name_or_description, include_workspace_size=include_workspace_size, page_size=self.MAX_PAGE_SIZE if limit is None else min(self.MAX_PAGE_SIZE, limit), ) ), ): for workspace in response.workspaces: count += 1 yield workspace if limit is not None and count >= limit: return
[docs] def url(self, workspace: pb2.Workspace | None = None) -> str: workspace = workspace or self.get() return f"{self.config.agent_address}/ws/{workspace.name}"