Queues¶
- class QueueClient(beaker: Beaker)[source]¶
Methods for interacting with Beaker Queues. Accessed via the
Beaker.queueproperty.Warning
Do not instantiate this class directly! The
Beakerclient will create one automatically which you can access through the corresponding property.- get(queue: str) Queue[source]¶
- Examples:
>>> with Beaker.from_env() as beaker: ... queue = beaker.queue.get(queue_id)
- Returns:
A
BeakerQueueprotobuf object.- Raises:
BeakerQueueNotFound – If the queue doesn’t exist.
- create(name: str | None = None, workspace: Workspace | None = None, input_schema: dict | None = {}, output_schema: dict | None = {}, batch_size: int | None = 1, max_claimed_entries: int | None = None, wait_timeout_ms: int | None = 0) Queue[source]¶
Create a new queue.
- Returns:
A new
BeakerQueueobject.
- create_worker(queue: Queue) QueueWorker[source]¶
Create a new queue worker.
- Returns:
A new
BeakerQueueWorkerobject.
- list_workers(queue: Queue, limit: int | None = None) Iterable[QueueWorker][source]¶
List queue workers.
- Returns:
An iterator over
BeakerQueueWorkerobjects.
- get_entry(entry_id: str) QueueEntry[source]¶
Get a queue entry object.
- Returns:
A
BeakerQueueEntryobject.- Raises:
BeakerQueueEntryNotFound – If the entry doesn’t exist or has expired.
- create_entry(queue: Queue, *, input: dict | None = {}, expires_in_sec: int = 86400, block: bool = True) Iterable[CreateQueueEntryResponse][source]¶
Submit an entry to a queue and stream response events as they happen.
Important
This method will block until the entry has been finalized. If you expect the entry will take a while to process, you should use
create_entry_async()instead and periodically poll the entry status withget_entry().- Parameters:
input – The input data.
expires_in_sec – Time until the entry expires (in seconds). Defaults to 24 hours.
block – If
True(the default), this method will block until new responses become available and continue streaming until the entry is finalized. IfFalsethis method will only yield thepending_entryresponse and then return.
- create_entry_async(queue: Queue, *, input: dict | None = {}, expires_in_sec: int = 86400) QueueEntry[source]¶
A convenience wrapper for
create_entry()withblock=False. Returns the created entry right away.- Returns:
A new
BeakerQueueEntryobject.
- list_entries(queue: Queue, limit: int | None = None) Iterable[QueueEntry][source]¶
List entries within a queue.
- Returns:
An iterator over
BeakerQueueEntryobjects.
- worker_channel(queue: Queue, worker: QueueWorker) AbstractContextManager[tuple[BeakerEntrySender, BeakerEntryReceiver]][source]¶
A context manager for opening a bidirectional worker channel for consuming and responding to entries. The channel returned is a tuple of a
BeakerEntrySenderand aBeakerEntryReceiver, respectively.Example:
>>> with beaker.queue.worker_channel(queue, worker) as (tx, rx): ... for batch in rx.recv(max_batches=2, time_limit=10.0): ... for entry_id, entry_input in batch: ... tx.send(entry_id, output=entry_input) ... tx.send(entry_id, done=True)
- class BeakerEntrySender(queue: Queue, worker: QueueWorker, tx: SimpleQueue[ProcessQueueEntriesRequest | None])[source]¶
Queue entry sender. Use this to respond to queue entries consumed by a worker.
Warning
Do not instantiated this class directly! Use
worker_channel()to create one.- send(entry_id: str, *, output: dict)[source]¶
- send(entry_id: str, *, rejection: str)
- send(entry_id: str, *, done: Literal[True])
Send output to an entry, reject, or mark the entry as done.
Important
Only one of
output,rejection, ordonecan be specified at a time, and you should eventually setdone=True(orrejection=...) on every entry.- Parameters:
entry_id – The ID of the entry.
output – Worker response data for the entry. Mutually exclusive with the other keyword args.
rejection – Marks the entry as rejected. This should be a human-readable reason for rejecting the entry. Mutually exclusive with the other keyword args.
done – Mark the entry as done. Mutually exclusive with the other keyword args.
- class BeakerEntryReceiver(queue: Queue, worker: QueueWorker, rx: SimpleQueue[list[QueueWorkerInput] | None], error: Event, logger: Logger)[source]¶
Queue entry receiver. Use this to consume queue entries as a worker.
Warning
Do not instantiated this class directly! Use
worker_channel()to create one.- recv(*, max_batches: int | None = None, time_limit: float | None = None) Generator[list[tuple[str, dict | None]], None, None][source]¶
Receive batches of queue entries as they become available. Returns a generator of lists of tuples in the form
(entry_id: str, input_data: dict | None).This will wait indefinitely on more batches unless
max_batchesortime_limitis set.- Parameters:
max_batches – Stop receiving after this many batches.
time_limit – Stop receiving after this many seconds.