homepage

This issue tracker has been migrated to GitHub , and is currently read-only.
For more information, see the GitHub FAQs in the Python's Developer Guide.

classification
Title: Ability to adjust queue size in Executors
Type: enhancement Stage: resolved
Components: Library (Lib) Versions: Python 3.3
process
Status: closed Resolution: duplicate
Dependencies: Superseder: Expose max_queue_size in ThreadPoolExecutor
View: 29595
Assigned To: bquinlan Nosy List: Nam.Nguyen, Patrik Dufresne, Victor.Varvariuc, Vinay Anantharaman, Winterflower, bquinlan, mhrivnak, pitrou, r.david.murray, thehesiod
Priority: normal Keywords: patch

Created on 2012年02月25日 00:19 by Nam.Nguyen, last changed 2022年04月11日 14:57 by admin. This issue is now closed.

Files
File name Uploaded Description Edit
executor-queue-size.diff Nam.Nguyen, 2012年02月27日 23:01 patch to add queue size to ThreadPoolExecutor review
Messages (19)
msg154175 - (view) Author: Nam Nguyen (Nam.Nguyen) * Date: 2012年02月25日 00:19
I am running into a memory consumption issue with concurrent.futures module. Its Executors do not have a public API to adjust their queue size. and the queues are created unbounded initially.
It would be helpful to have some public method or a parameter at construction time to limit this queue size.
msg154177 - (view) Author: Nam Nguyen (Nam.Nguyen) * Date: 2012年02月25日 00:44
By the way, ProcessPoolExecutor actually sets its queue size to a reasonable number but ThreadPoolExecutor does not.
msg162599 - (view) Author: R. David Murray (r.david.murray) * (Python committer) Date: 2012年06月11日 00:50
Brian: ping. Since this is an enhancement, if you are going to accept it it would be nice to get it into 3.3, which means committing it before June 23rd.
msg162601 - (view) Author: Brian Quinlan (bquinlan) * (Python committer) Date: 2012年06月11日 02:55
Hey Nam,
I'm not sure that I understand. You want ThreadPoolExecutor.submit to block if there are too many work items in the queue? Are you sure that this happens currently with ProcessPoolExecutor? I can't see why it would.
msg162604 - (view) Author: Nam Nguyen (Nam.Nguyen) * Date: 2012年06月11日 03:56
Currently, ProcessionPoolExecutor has this line in its constructor:
self._call_queue = multiprocessing.Queue(self._max_workers +
 EXTRA_QUEUED_CALLS)
where EXTRA_QUEUED_CALLS is 1.
And yes, it would be best to export a method so that the app developer can set the queue size for themselves. In my case, I would want to limit the queue so that I dont run out of memory. Others might not want the queue to block, and hence would prefer an unlimited queue.
msg162605 - (view) Author: Brian Quinlan (bquinlan) * (Python committer) Date: 2012年06月11日 04:34
The queue that you identified i.e.
self._call_queue = multiprocessing.Queue(self._max_workers +
 EXTRA_QUEUED_CALLS)
does not get considered during submit() - are you sure that it somehow causes submit() to block.
Could you explain what your use case is such that you can run out of memory?
msg162606 - (view) Author: Nam Nguyen (Nam.Nguyen) * Date: 2012年06月11日 04:46
I used the ThreadPoolExecutor to process a large number (bounded) of input/output jobs. Because there were too many of them and the worker threads could not process them fast enough to drain them from the queue, the queue kept increasing in size.
It was okay for the script, though, to block, waiting for the queue to drain, before submitting new jobs. So I needed to limit the queue size.
msg162664 - (view) Author: Brian Quinlan (bquinlan) * (Python committer) Date: 2012年06月12日 09:00
I've had people request that they be able control the order of processed work submissions. So a more general way to solve your problem might be to make the two executors take an optional Queue argument in their constructors.
You'd have to explain in detail in the document how the queues are used.
What do you think?
msg162700 - (view) Author: Nam Nguyen (Nam.Nguyen) * Date: 2012年06月13日 02:50
+1
That was actually what I did. I replaced the internal queue with another one whose limit was properly set.
If you are busy to write one, let me find some time to create another patch.
msg207512 - (view) Author: Victor Varvariuc (Victor.Varvariuc) Date: 2014年01月07日 08:42
Maybe I should have created another issue for this, but without this issue being solved, the new issue will not help much.
Here it is:
http://hg.python.org/cpython/file/37caaf21f827/Lib/concurrent/futures/thread.py#l63
After running an work item `work_queue.task_done()` is not called.
So it's difficult to know if worker threads have any more work to do.
http://stackoverflow.com/questions/20965754/determine-if-worker-threads-are-doing-any-work?noredirect=1#comment31495804_20965754 
msg207602 - (view) Author: Brian Quinlan (bquinlan) * (Python committer) Date: 2014年01月07日 21:06
Hi Victor,
I don't understand your problem. Could you be very specific in your description?
msg207672 - (view) Author: Victor Varvariuc (Victor.Varvariuc) Date: 2014年01月08日 05:53
Hi Brian,
In one my projects I had to monkey-patch module `concurrent.futures.thread:60`( http://hg.python.org/cpython/file/37caaf21f827/Lib/concurrent/futures/thread.py#l60) with:
def _worker(executor_reference, work_queue):
 try:
 while True:
 work_item = work_queue.get(block=True)
 if work_item is not None:
 work_item.run()
 work_queue.task_done() # <-- added this line
 continue
 executor = executor_reference()
 # Exit if:
 # - The interpreter is shutting down OR
 # - The executor that owns the worker has been collected OR
 # - The executor that owns the worker has been shutdown.
 if futures_thread._shutdown or executor is None or executor._shutdown:
 # Notice other workers
 work_queue.put(None)
 return
 del executor
 except BaseException:
 futures_thread._base.LOGGER.critical('Exception in worker', exc_info=True)
This helps me to control the state of the work queue -- I can see if there are any work items still being processed:
if executor._work_queue.unfinished_tasks:
 # executor is still producing something
 ...
msg207893 - (view) Author: Brian Quinlan (bquinlan) * (Python committer) Date: 2014年01月11日 00:43
Can't you accomplish what you want using add_done_callback?
e.g.
# Pseudocode
class MyExecutor(ThreadPoolExecutor):
 def __init__(self):
 self._count = 0
 def _decrement(self):
 with self._some_lock:
 self._count -= 1
 def submit(self, fn, *args, **kwargs):
 f = super(self).submit(fn, *args, **kwargs)
 with self._some_lock:
 self._count += 1
 f.add_done_callback(self._decrement)
 @property
 def num_pending_futures(self):
 return self._count
msg207896 - (view) Author: Victor Varvariuc (Victor.Varvariuc) Date: 2014年01月11日 06:16
Hi!
Looks like your pseudocode will work as a workaround instead of monkey-patching!
Still the my suggestion to add the line to code stays.
self._count should be always equal to the length of self._work_queue? If yes, why duplication. If no - which one to use, why duplication? Also there is an additional lock.
http://docs.python.org/3.3/library/queue.html#queue.Queue.task_done - there is a special method, why not using it?
Looks like you think that `work_queue.task_done()` should not be added. I don't understand why, but you decide what's better for Python.
Thank you for your time!
Victor
msg253895 - (view) Author: Alexander Mohr (thehesiod) * Date: 2015年11月02日 06:58
adding support for internal queue size is critical to avoid chewing through all your memory when you have a LOT of tasks. I just hit this issue myself. If we could have a simple parameter to set the max queue size this would help tremendously!
msg274588 - (view) Author: Patrik Dufresne (Patrik Dufresne) Date: 2016年09月06日 18:19
Any update on this subject ?
Also had to monkey patch the implementation to avoid consuming all the system memory.
msg280727 - (view) Author: Vinay Anantharaman (Vinay Anantharaman) Date: 2016年11月14日 03:10
I did a code reading myself and I noticed that task_done is not called as well. Is there a reason?
msg305857 - (view) Author: Michael Hrivnak (mhrivnak) Date: 2017年11月08日 15:47
My project also has a use case for this, very similar to the others described. Here's what we want:
with ThreadPoolExecutor(queue_size=500) as executor:
 for item in parse_a_long_list_of_work(somefile.xml):
 executor.submit(Job(item))
I do not want to parse the entire list of work items and load them into memory at once. It is preferable for the main thread running the above code to block on submit() when the queue size is above some threshold.
It's a classic case of the producer and consumer operating at different speeds. In the past, a Queue object has been the way to connect such a producer and consumer. The various Executor classes do not provide an easy way to consume from a provided Queue object, so giving them that capability would be a reasonable alternative to having the submit() method block.
msg314742 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2018年03月31日 17:04
Issue 29595 has a more complete PR, so closing this issue as duplicate.
History
Date User Action Args
2022年04月11日 14:57:27adminsetgithub: 58327
2018年03月31日 17:04:32pitrousetstatus: open -> closed

superseder: Expose max_queue_size in ThreadPoolExecutor

nosy: + pitrou
messages: + msg314742
resolution: duplicate
stage: patch review -> resolved
2017年11月08日 15:47:47mhrivnaksetnosy: + mhrivnak
messages: + msg305857
2017年02月12日 20:30:05Winterflowersetnosy: + Winterflower
2016年11月14日 03:10:51Vinay Anantharamansetnosy: + Vinay Anantharaman
messages: + msg280727
2016年09月06日 18:19:30Patrik Dufresnesetnosy: + Patrik Dufresne
messages: + msg274588
2015年11月02日 06:58:55thehesiodsetnosy: + thehesiod
messages: + msg253895
2014年01月11日 06:16:57Victor.Varvariucsetmessages: + msg207896
2014年01月11日 00:43:38bquinlansetmessages: + msg207893
2014年01月08日 05:53:45Victor.Varvariucsetmessages: + msg207672
2014年01月07日 21:06:53bquinlansetmessages: + msg207602
2014年01月07日 08:42:46Victor.Varvariucsetnosy: + Victor.Varvariuc
messages: + msg207512
2012年06月13日 02:50:39Nam.Nguyensetmessages: + msg162700
2012年06月12日 09:00:14bquinlansetmessages: + msg162664
2012年06月11日 04:46:07Nam.Nguyensetmessages: + msg162606
2012年06月11日 04:34:51bquinlansetmessages: + msg162605
2012年06月11日 03:56:35Nam.Nguyensetmessages: + msg162604
2012年06月11日 02:55:12bquinlansetmessages: + msg162601
2012年06月11日 00:50:37r.david.murraysetversions: + Python 3.3
nosy: + r.david.murray

messages: + msg162599

stage: patch review
2012年03月07日 20:11:59bquinlansetassignee: bquinlan

nosy: + bquinlan
2012年02月27日 23:01:54Nam.Nguyensetfiles: + executor-queue-size.diff
keywords: + patch
2012年02月25日 00:44:40Nam.Nguyensetmessages: + msg154177
2012年02月25日 00:19:01Nam.Nguyencreate

AltStyle によって変換されたページ (->オリジナル) /