tornado.queues – Queues for coroutines¶
Added in version 4.2.
Asynchronous queues for coroutines. These classes are very similar to those provided in the standard library’s asyncio package.
Warning
Unlike the standard library’s queue module, the classes defined here
are not thread-safe. To use these queues from another thread,
use IOLoop.add_callback to transfer control to the IOLoop thread
before calling any queue methods.
Classes¶
Queue¶
- classtornado.queues.Queue(maxsize:int =0)[source] ¶
Coordinate producer and consumer coroutines.
If maxsize is 0 (the default) the queue size is unbounded.
import asyncio from tornado.ioloop import IOLoop from tornado.queues import Queue q = Queue(maxsize=2) async def consumer(): async for item in q: try: print('Doing work on %s' % item) await asyncio.sleep(0.01) finally: q.task_done() async def producer(): for item in range(5): await q.put(item) print('Put %s' % item) async def main(): # Start consumer without waiting (since it never finishes). IOLoop.current().spawn_callback(consumer) await producer() # Wait for producer to put all tasks. await q.join() # Wait for consumer to finish all tasks. print('Done') asyncio.run(main())
Put 0 Put 1 Doing work on 0 Put 2 Doing work on 1 Put 3 Doing work on 2 Put 4 Doing work on 3 Doing work on 4 Done
In versions of Python without native coroutines (before 3.5),
consumer()could be written as:@gen.coroutine def consumer(): while True: item = yield q.get() try: print('Doing work on %s' % item) yield gen.sleep(0.01) finally: q.task_done()
Changed in version 4.3: Added
async forsupport in Python 3.5.- put(item:_T, timeout:float |timedelta |None =None) → Future[None ][source] ¶
Put an item into the queue, perhaps waiting until there is room.
Returns a Future, which raises
tornado.util.TimeoutErrorafter a timeout.timeoutmay be a number denoting a time (on the same scale astornado.ioloop.IOLoop.time, normallytime.time), or adatetime.timedeltaobject for a deadline relative to the current time.
- put_nowait(item:_T) → None [source] ¶
Put an item into the queue without blocking.
If no free slot is immediately available, raise
QueueFull.
- get(timeout:float |timedelta |None =None) → Awaitable [_T][source] ¶
Remove and return an item from the queue.
Returns an awaitable which resolves once an item is available, or raises
tornado.util.TimeoutErrorafter a timeout.timeoutmay be a number denoting a time (on the same scale astornado.ioloop.IOLoop.time, normallytime.time), or adatetime.timedeltaobject for a deadline relative to the current time.Note
The
timeoutargument of this method differs from that of the standard library’squeue.Queue.get. That method interprets numeric values as relative timeouts; this one interprets them as absolute deadlines and requirestimedeltaobjects for relative timeouts (consistent with other timeouts in Tornado).
- get_nowait() → _T[source] ¶
Remove and return an item from the queue without blocking.
Return an item if one is immediately available, else raise
QueueEmpty.
- task_done() → None [source] ¶
Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each
getused to fetch a task, a subsequent call totask_donetells the queue that the processing on the task is complete.If a
joinis blocking, it resumes when all items have been processed; that is, when everyputis matched by atask_done.Raises
ValueErrorif called more times thanput.
PriorityQueue¶
- classtornado.queues.PriorityQueue(maxsize:int =0)[source] ¶
A
Queuethat retrieves entries in priority order, lowest first.Entries are typically tuples like
(priority number, data).import asyncio from tornado.queues import PriorityQueue async def main(): q = PriorityQueue() q.put((1, 'medium-priority item')) q.put((0, 'high-priority item')) q.put((10, 'low-priority item')) print(await q.get()) print(await q.get()) print(await q.get()) asyncio.run(main())
(0, 'high-priority item') (1, 'medium-priority item') (10, 'low-priority item')
LifoQueue¶
- classtornado.queues.LifoQueue(maxsize:int =0)[source] ¶
A
Queuethat retrieves the most recently put items first.import asyncio from tornado.queues import LifoQueue async def main(): q = LifoQueue() q.put(3) q.put(2) q.put(1) print(await q.get()) print(await q.get()) print(await q.get()) asyncio.run(main())
1 2 3
Exceptions¶
QueueEmpty¶
- exceptiontornado.queues.QueueEmpty[source] ¶
Raised by
Queue.get_nowaitwhen the queue has no items.
QueueFull¶
- exceptiontornado.queues.QueueFull[source] ¶
Raised by
Queue.put_nowaitwhen a queue is at its maximum size.