Queues¶
- class QueueClient(beaker: Beaker)[source]¶
Methods for interacting with Beaker Queues. Accessed via the
Beaker.queue
property.Warning
Do not instantiate this class directly! The
Beaker
client will create one automatically which you can access through the corresponding property.- get(queue: str) Queue [source]¶
- Examples:
>>> with Beaker.from_env() as beaker: ... queue = beaker.queue.get(queue_id)
- Returns:
A
BeakerQueue
protobuf object.- Raises:
BeakerQueueNotFound – If the queue doesn’t exist.
- create(name: str | None = None, workspace: Workspace | None = None, input_schema: dict | None = {}, output_schema: dict | None = {}, batch_size: int | None = 1, max_claimed_entries: int | None = None, wait_timeout_ms: int | None = 0) Queue [source]¶
Create a new queue.
- Returns:
A new
BeakerQueue
object.
- create_worker(queue: Queue) QueueWorker [source]¶
Create a new queue worker.
- Returns:
A new
BeakerQueueWorker
object.
- list_workers(queue: Queue, limit: int | None = None) Iterable[QueueWorker] [source]¶
List queue workers.
- Returns:
An iterator over
BeakerQueueWorker
objects.
- get_entry(entry_id: str) QueueEntry [source]¶
Get a queue entry object.
- Returns:
A
BeakerQueueEntry
object.- Raises:
BeakerQueueEntryNotFound – If the entry doesn’t exist or has expired.
- create_entry(queue: Queue, *, input: dict | None = {}, expires_in_sec: int = 86400, block: bool = True) Iterable[CreateQueueEntryResponse] [source]¶
Submit an entry to a queue and stream response events as they happen.
Important
This method will block until the entry has been finalized. If you expect the entry will take a while to process, you should use
create_entry_async()
instead and periodically poll the entry status withget_entry()
.- Parameters:
input – The input data.
expires_in_sec – Time until the entry expires (in seconds). Defaults to 24 hours.
block – If
True
(the default), this method will block until new responses become available and continue streaming until the entry is finalized. IfFalse
this method will only yield thepending_entry
response and then return.
- create_entry_async(queue: Queue, *, input: dict | None = {}, expires_in_sec: int = 86400) QueueEntry [source]¶
A convenience wrapper for
create_entry()
withblock=False
. Returns the created entry right away.- Returns:
A new
BeakerQueueEntry
object.
- list_entries(queue: Queue, limit: int | None = None) Iterable[QueueEntry] [source]¶
List entries within a queue.
- Returns:
An iterator over
BeakerQueueEntry
objects.
- worker_channel(queue: Queue, worker: QueueWorker) AbstractContextManager[tuple[BeakerEntrySender, BeakerEntryReceiver]] [source]¶
A context manager for opening a bidirectional worker channel for consuming and responding to entries. The channel returned is a tuple of a
BeakerEntrySender
and aBeakerEntryReceiver
, respectively.Example:
>>> with beaker.queue.worker_channel(queue, worker) as (tx, rx): ... for batch in rx.recv(max_batches=2, time_limit=10.0): ... for entry_id, entry_input in batch: ... tx.send(entry_id, output=entry_input) ... tx.send(entry_id, done=True)
- class BeakerEntrySender(queue: Queue, worker: QueueWorker, tx: SimpleQueue[ProcessQueueEntriesRequest | None])[source]¶
Queue entry sender. Use this to respond to queue entries consumed by a worker.
Warning
Do not instantiated this class directly! Use
worker_channel()
to create one.- send(entry_id: str, *, output: dict)[source]¶
- send(entry_id: str, *, rejection: str)
- send(entry_id: str, *, done: Literal[True])
Send output to an entry, reject, or mark the entry as done.
Important
Only one of
output
,rejection
, ordone
can be specified at a time, and you should eventually setdone=True
(orrejection=...
) on every entry.- Parameters:
entry_id – The ID of the entry.
output – Worker response data for the entry. Mutually exclusive with the other keyword args.
rejection – Marks the entry as rejected. This should be a human-readable reason for rejecting the entry. Mutually exclusive with the other keyword args.
done – Mark the entry as done. Mutually exclusive with the other keyword args.
- class BeakerEntryReceiver(queue: Queue, worker: QueueWorker, rx: SimpleQueue[list[QueueWorkerInput] | None], error: Event, logger: Logger)[source]¶
Queue entry receiver. Use this to consume queue entries as a worker.
Warning
Do not instantiated this class directly! Use
worker_channel()
to create one.- recv(*, max_batches: int | None = None, time_limit: float | None = None) Generator[list[tuple[str, dict | None]], None, None] [source]¶
Receive batches of queue entries as they become available. Returns a generator of lists of tuples in the form
(entry_id: str, input_data: dict | None)
.This will wait indefinitely on more batches unless
max_batches
ortime_limit
is set.- Parameters:
max_batches – Stop receiving after this many batches.
time_limit – Stop receiving after this many seconds.