This is a much simplified version of the real code focusing just on the handling of Futures from Requests Futures.
I have a few questions:
- I had to implement my own version of
as_completed
because the data handlers may add more Futures to_pending
. Is this a decent way to handle the problem, or is there another approach? - Is
stop
sufficient to handleKeyboardInterrupt
in all cases? It has worked well in my limited testing. I found it hard to find a solution via Google. - Is my rate limiting solution OK or is there a better approach? It is not about the number of concurrent connections but about the number of connections per second.
import argparse
from concurrent.futures import ThreadPoolExecutor
import requests
from requests_futures.sessions import FuturesSession
import time
def background_callback(sess, resp):
# parse the json storing the result on the response object
if resp.status_code == requests.codes.ok:
resp.data = resp.json()
else:
resp.data = None
class JSONRetriever(object):
def __init__(self):
self._executor = ThreadPoolExecutor(max_workers=10)
self._session = FuturesSession(executor=self._executor)
self._pending = {}
def fetch(self, url):
future = self._session.get(url,
background_callback=background_callback)
self._pending[future] = url
def drain(self):
# Look for completed requests by hand because in the real code
# the responses my trigger further URLs to be retrieved so
# self._pending is modified. New requests being added really
# confused as_completed().
for future in [f for f in self._pending if f.done()]:
url = self._pending[future]
del self._pending[future]
response = future.result()
response.raise_for_status()
if response.status_code == requests.codes.ok:
print response.data
# real code would handle data possibly adding more requests
else:
# the real code is smarter, this is just for CR
raise Exception("FIXME: unhandle response")
def finish(self):
while self._pending:
self.drain()
if self._pending:
time.sleep(1)
def stop(self):
for i in self._pending:
try:
i.cancel()
except Exception as e:
sys.stderr.write("Caught: " + str(e) + "\n")
self._executor.shutdown()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Perform all REST calls")
parser.add_argument("--delay", type=int, default=0)
parser.add_argument("urls", nargs="+")
args = parser.parse_args()
retriever = JSONRetriever()
try:
for url in args.urls:
retriever.fetch(url)
if args.delay > 0: # may need a delay to rate limit requests
time.sleep(args.delay)
retriever.drain() # clear any requests that completed while asleep
retriever.finish()
except KeyboardInterrupt:
retriever.stop()
-
\$\begingroup\$ Sean, do you not find the requests-futures a bit slow? For example I can't get an improvement over mulitprocessing.Pool() and some people have suggested tornado instead. \$\endgroup\$mptevsion– mptevsion2016年03月02日 20:43:08 +00:00Commented Mar 2, 2016 at 20:43
-
\$\begingroup\$ I have never needed blazing speed. \$\endgroup\$Sean Perry– Sean Perry2016年03月02日 20:46:00 +00:00Commented Mar 2, 2016 at 20:46
-
\$\begingroup\$ The current question title, which states your concerns about the code, is too general to be useful here. Please edit to the site standard, which is for the title to simply state the task accomplished by the code. Please see How to get the best value out of Code Review: Asking Questions for guidance on writing good question titles. \$\endgroup\$Toby Speight– Toby Speight2018年09月06日 15:26:21 +00:00Commented Sep 6, 2018 at 15:26
1 Answer 1
Nice code, clearly written.
I understand the rate limiting requirement.
Having the drain()
call within the loop doesn't seem like the caller's responsibility, better to let the BG callback handle it, or defer until finish()
as written, which does make sense. Each url fetch could take more or less than the delay time. So this seems to be a bug / wart still lurking within the code.
-
\$\begingroup\$ This was extracted from something I wrote while working on a REST client where the server sent back "not ready yet" responses. There was behavior that lead me to the
drain()
call but 3 years later I no longer remember what it was. There was definitely a wart but the API was dealing with has since been radically overhauled so I cannot dredge up the issue any longer. Thanks for looking at my post. \$\endgroup\$Sean Perry– Sean Perry2017年09月11日 05:25:09 +00:00Commented Sep 11, 2017 at 5:25
Explore related questions
See similar questions with these tags.