8
\$\begingroup\$

This is a much simplified version of the real code focusing just on the handling of Futures from Requests Futures.

I have a few questions:

  1. I had to implement my own version of as_completed because the data handlers may add more Futures to _pending. Is this a decent way to handle the problem, or is there another approach?
  2. Is stop sufficient to handle KeyboardInterrupt in all cases? It has worked well in my limited testing. I found it hard to find a solution via Google.
  3. Is my rate limiting solution OK or is there a better approach? It is not about the number of concurrent connections but about the number of connections per second.
import argparse
from concurrent.futures import ThreadPoolExecutor
import requests
from requests_futures.sessions import FuturesSession
import time
def background_callback(sess, resp):
 # parse the json storing the result on the response object
 if resp.status_code == requests.codes.ok:
 resp.data = resp.json()
 else:
 resp.data = None
class JSONRetriever(object):
 def __init__(self):
 self._executor = ThreadPoolExecutor(max_workers=10)
 self._session = FuturesSession(executor=self._executor)
 self._pending = {}
 def fetch(self, url):
 future = self._session.get(url,
 background_callback=background_callback)
 self._pending[future] = url
 def drain(self):
 # Look for completed requests by hand because in the real code
 # the responses my trigger further URLs to be retrieved so
 # self._pending is modified. New requests being added really
 # confused as_completed().
 for future in [f for f in self._pending if f.done()]:
 url = self._pending[future]
 del self._pending[future]
 response = future.result()
 response.raise_for_status()
 if response.status_code == requests.codes.ok:
 print response.data
 # real code would handle data possibly adding more requests
 else:
 # the real code is smarter, this is just for CR
 raise Exception("FIXME: unhandle response")
 def finish(self):
 while self._pending:
 self.drain()
 if self._pending:
 time.sleep(1)
 def stop(self):
 for i in self._pending:
 try:
 i.cancel()
 except Exception as e:
 sys.stderr.write("Caught: " + str(e) + "\n")
 self._executor.shutdown()
if __name__ == "__main__":
 parser = argparse.ArgumentParser(description="Perform all REST calls")
 parser.add_argument("--delay", type=int, default=0)
 parser.add_argument("urls", nargs="+")
 args = parser.parse_args()
 retriever = JSONRetriever()
 try:
 for url in args.urls:
 retriever.fetch(url)
 if args.delay > 0: # may need a delay to rate limit requests
 time.sleep(args.delay)
 retriever.drain() # clear any requests that completed while asleep
 retriever.finish()
 except KeyboardInterrupt:
 retriever.stop()
asked Jul 17, 2014 at 22:40
\$\endgroup\$
3
  • \$\begingroup\$ Sean, do you not find the requests-futures a bit slow? For example I can't get an improvement over mulitprocessing.Pool() and some people have suggested tornado instead. \$\endgroup\$ Commented Mar 2, 2016 at 20:43
  • \$\begingroup\$ I have never needed blazing speed. \$\endgroup\$ Commented Mar 2, 2016 at 20:46
  • \$\begingroup\$ The current question title, which states your concerns about the code, is too general to be useful here. Please edit to the site standard, which is for the title to simply state the task accomplished by the code. Please see How to get the best value out of Code Review: Asking Questions for guidance on writing good question titles. \$\endgroup\$ Commented Sep 6, 2018 at 15:26

1 Answer 1

1
\$\begingroup\$

Nice code, clearly written.

I understand the rate limiting requirement. Having the drain() call within the loop doesn't seem like the caller's responsibility, better to let the BG callback handle it, or defer until finish() as written, which does make sense. Each url fetch could take more or less than the delay time. So this seems to be a bug / wart still lurking within the code.

answered Sep 11, 2017 at 0:55
\$\endgroup\$
1
  • \$\begingroup\$ This was extracted from something I wrote while working on a REST client where the server sent back "not ready yet" responses. There was behavior that lead me to the drain() call but 3 years later I no longer remember what it was. There was definitely a wart but the API was dealing with has since been radically overhauled so I cannot dredge up the issue any longer. Thanks for looking at my post. \$\endgroup\$ Commented Sep 11, 2017 at 5:25

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.