Queues

class QueueClient(beaker: Beaker)[source]

Methods for interacting with Beaker Queues. Accessed via the Beaker.queue property.

Warning

Do not instantiate this class directly! The Beaker client will create one automatically which you can access through the corresponding property.

get(queue: str) Queue[source]
Examples:

>>> with Beaker.from_env() as beaker:
...     queue = beaker.queue.get(queue_id)
Returns:

A BeakerQueue protobuf object.

Raises:

BeakerQueueNotFound – If the queue doesn’t exist.

create(name: str | None = None, workspace: Workspace | None = None, input_schema: dict | None = {}, output_schema: dict | None = {}, batch_size: int | None = 1, max_claimed_entries: int | None = None, wait_timeout_ms: int | None = 0) Queue[source]

Create a new queue.

Returns:

A new BeakerQueue object.

delete(*queues: Queue)[source]

Delete queues.

create_worker(queue: Queue) QueueWorker[source]

Create a new queue worker.

Returns:

A new BeakerQueueWorker object.

list_workers(queue: Queue, limit: int | None = None) Iterable[QueueWorker][source]

List queue workers.

Returns:

An iterator over BeakerQueueWorker objects.

get_entry(entry_id: str) QueueEntry[source]

Get a queue entry object.

Returns:

A BeakerQueueEntry object.

Raises:

BeakerQueueEntryNotFound – If the entry doesn’t exist or has expired.

create_entry(queue: Queue, *, input: dict | None = {}, expires_in_sec: int = 86400, block: bool = True) Iterable[CreateQueueEntryResponse][source]

Submit an entry to a queue and stream response events as they happen.

Important

This method will block until the entry has been finalized. If you expect the entry will take a while to process, you should use create_entry_async() instead and periodically poll the entry status with get_entry().

Parameters:
  • input – The input data.

  • expires_in_sec – Time until the entry expires (in seconds). Defaults to 24 hours.

  • block – If True (the default), this method will block until new responses become available and continue streaming until the entry is finalized. If False this method will only yield the pending_entry response and then return.

create_entry_async(queue: Queue, *, input: dict | None = {}, expires_in_sec: int = 86400) QueueEntry[source]

A convenience wrapper for create_entry() with block=False. Returns the created entry right away.

Returns:

A new BeakerQueueEntry object.

list_entries(queue: Queue, limit: int | None = None) Iterable[QueueEntry][source]

List entries within a queue.

Returns:

An iterator over BeakerQueueEntry objects.

worker_channel(queue: Queue, worker: QueueWorker) AbstractContextManager[tuple[BeakerEntrySender, BeakerEntryReceiver]][source]

A context manager for opening a bidirectional worker channel for consuming and responding to entries. The channel returned is a tuple of a BeakerEntrySender and a BeakerEntryReceiver, respectively.

Example:

>>> with beaker.queue.worker_channel(queue, worker) as (tx, rx):
...     for batch in rx.recv(max_batches=2, time_limit=10.0):
...         for entry_id, entry_input in batch:
...             tx.send(entry_id, output=entry_input)
...             tx.send(entry_id, done=True)
list(*, org: Organization | None = None, workspace: Workspace | None = None, sort_order: BeakerSortOrder | None = BeakerSortOrder.descending, sort_field: Literal['created'] = 'created', limit: int | None = None) Iterable[Queue][source]

List queues.

Returns:

An iterator over BeakerQueue objects.

class BeakerEntrySender(queue: Queue, worker: QueueWorker, tx: SimpleQueue[ProcessQueueEntriesRequest | None])[source]

Queue entry sender. Use this to respond to queue entries consumed by a worker.

Warning

Do not instantiated this class directly! Use worker_channel() to create one.

send(entry_id: str, *, output: dict)[source]
send(entry_id: str, *, rejection: str)
send(entry_id: str, *, done: Literal[True])

Send output to an entry, reject, or mark the entry as done.

Important

Only one of output, rejection, or done can be specified at a time, and you should eventually set done=True (or rejection=...) on every entry.

Parameters:
  • entry_id – The ID of the entry.

  • output – Worker response data for the entry. Mutually exclusive with the other keyword args.

  • rejection – Marks the entry as rejected. This should be a human-readable reason for rejecting the entry. Mutually exclusive with the other keyword args.

  • done – Mark the entry as done. Mutually exclusive with the other keyword args.

class BeakerEntryReceiver(queue: Queue, worker: QueueWorker, rx: SimpleQueue[list[QueueWorkerInput] | None], error: Event, logger: Logger)[source]

Queue entry receiver. Use this to consume queue entries as a worker.

Warning

Do not instantiated this class directly! Use worker_channel() to create one.

recv(*, max_batches: int | None = None, time_limit: float | None = None) Generator[list[tuple[str, dict | None]], None, None][source]

Receive batches of queue entries as they become available. Returns a generator of lists of tuples in the form (entry_id: str, input_data: dict | None).

This will wait indefinitely on more batches unless max_batches or time_limit is set.

Parameters:
  • max_batches – Stop receiving after this many batches.

  • time_limit – Stop receiving after this many seconds.