I am looking for a code review for this multipart or single file chunk downloader using threading and queues.
downloader.py
import argparse
import logging
import Queue
import urllib2
import os
import utils as _fdUtils
import signal
import sys
import time
import threading
DESKTOP_PATH = os.path.expanduser("~/Desktop")
appName = 'FileDownloader'
logFile = os.path.join(DESKTOP_PATH, '%s.log' % appName)
_log = _fdUtils.fdLogger(appName, logFile, logging.DEBUG, logging.DEBUG, console_level=logging.DEBUG)
queue = Queue.Queue()
out_queue = Queue.Queue()
STOP_REQUEST = threading.Event()
class SplitBufferThread(threading.Thread):
""" Splits the buffer to ny number of threads
thereby, concurrently downloading through
ny number of threads.
"""
def __init__(self, url, numSplits, queue, out_queue):
super(SplitBufferThread, self).__init__()
self.__url = url
self.__splits = numSplits
self.queue = queue
self.outQue = out_queue
self.__fileName = url.split('/')[-1]
self.__path = DESKTOP_PATH
def run(self):
print "Inside SplitBufferThread: %s\n URL: %s" % (self.getName(), self.__url)
sizeInBytes = int(_fdUtils.getUrlSizeInBytes(self.__url))
byteRanges = _fdUtils.getRange(sizeInBytes, self.__splits)
mode = 'wb'
for _range in byteRanges:
first = int(_range.split('-')[0])
self.outQue.put((self.__url, self.__path, first, self.queue, mode, _range))
mode = 'a'
class DatamineThread(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, out_queue):
threading.Thread.__init__(self)
self.out_queue = out_queue
def run(self):
while True:
#grabs host from queue
chunk = self.out_queue.get()
if self._grabAndWriteToDisk(*chunk):
#signals to queue job is done
self.out_queue.task_done()
def _grabAndWriteToDisk(self, url, saveTo, first=None, queue=None, mode='wb', irange=None):
fileName = url.split('/')[-1]
filePath = os.path.join(saveTo, fileName)
file_size = int(_fdUtils.getUrlSizeInBytes(url))
req = urllib2.Request(url)
if irange:
req.headers['Range'] = 'bytes=%s' % irange
urlFh = urllib2.urlopen(req)
file_size_dl = 0 if not first else first
with open(filePath, mode) as fh:
block_sz = 8192
while not STOP_REQUEST.isSet():
fileBuffer = urlFh.read(block_sz)
if not fileBuffer:
break
file_size_dl += len(fileBuffer)
fh.write(fileBuffer)
status = r"%10d [%3.2f%%]" % (file_size_dl, file_size_dl * 100. / file_size)
status = status + chr(8)*(len(status)+1)
sys.stdout.write('%s\r' % status)
time.sleep(.05)
sys.stdout.flush()
if file_size_dl == file_size:
STOP_REQUEST.set()
if queue:
queue.task_done()
_log.info("Download Completed %s%% for file %s, saved to %s",
file_size_dl * 100. / file_size, fileName, saveTo)
return True
class ThreadedFetch(threading.Thread):
""" docstring for ThreadedFetch
"""
def __init__(self, queue, out_queue):
super(ThreadedFetch, self).__init__()
self.queue = queue
self.lock = threading.Lock()
self.outQueue = out_queue
def run(self):
items = self.queue.get()
url = items[0]
saveTo = DESKTOP_PATH if not items[1] else items[1]
split = items[-1]
# grab split chiunks in separate thread.
if split > 1:
bufferThreads = []
print url
splitBuffer = SplitBufferThread(url, split, self.queue, self.outQueue)
splitBuffer.start()
else:
while not STOP_REQUEST.isSet():
self.setName("primary_%s" % url.split('/')[-1])
# if downlaod whole file in single chunk no need
# to start a new thread, so directly download here.
if self.outQueue.put((url, saveTo, 0, self.queue)):
self.out_queue.task_done()
def main(appName, flag='with'):
args = _fdUtils.getParser()
urls_saveTo = {}
if flag == 'with':
_fdUtils.Watcher()
elif flag != 'without':
_log.info('unrecognized flag: %s', flag)
sys.exit()
# spawn a pool of threads, and pass them queue instance
# each url will be downloaded concurrently
for i in xrange(len(args.urls)):
t = ThreadedFetch(queue, out_queue)
t.daemon = True
t.start()
split = 1
try:
for url in args.urls:
# TODO: put split as value of url as tuple with saveTo
urls_saveTo[url] = args.saveTo
# urls_saveTo = {urls[0]: args.saveTo, urls[1]: args.saveTo, urls[2]: args.saveTo}
# populate queue with data
for url, saveTo in urls_saveTo.iteritems():
queue.put((url, saveTo, split))
for i in range(split):
dt = DatamineThread(out_queue)
dt.setDaemon(True)
dt.start()
# wait on the queue until everything has been processed
queue.join()
out_queue.join()
print '*** Done'
except (KeyboardInterrupt, SystemExit):
_log.critical('! Received keyboard interrupt, quitting threads.')
utils.py
import argparse
import logging
import os
import requests
import signal
import sys
def getUrlSizeInBytes(url):
return requests.head(url, headers={'Accept-Encoding': 'identity'}).headers.get('content-length', None)
def getRange(sizeInBytes, numsplits):
""" Splits the range equally based on file size
and number of splits.
"""
if numsplits <= 1:
return ["0-%s" % sizeInBytes]
lst = []
for i in range(numsplits):
if i == 0:
lst.append('%s-%s' % (i, int(round(1 + i * sizeInBytes/(numsplits*1.0) + sizeInBytes/(numsplits*1.0)-1, 0))))
else:
lst.append('%s-%s' % (int(round(1 + i * sizeInBytes/(numsplits*1.0),0)), int(round(1 + i * sizeInBytes/(numsplits*1.0) + sizeInBytes/(numsplits*1.0)-1, 0))))
return lst
def getParser():
parser = argparse.ArgumentParser(prog='FileDownloader',
description='Utility to download files from internet')
parser.add_argument('-v', '--verbose', default=logging.DEBUG,
help='by default its on, pass None or False to not spit in shell')
parser.add_argument('-st', '--saveTo', action=FullPaths,
help='location where you want files to download to')
parser.add_argument('-urls', nargs='*',
help='urls of files you want to download.')
return parser.parse_args()
def sizeof(bytes):
""" Takes the size of file or folder in bytes and
returns size formatted in kb, MB, GB, TB or PB.
Args:
bytes(int): size of the file in bytes
Return:
(str): containing size with formatting.
"""
alternative = [
(1024 ** 5, ' PB'),
(1024 ** 4, ' TB'),
(1024 ** 3, ' GB'),
(1024 ** 2, ' MB'),
(1024 ** 1, ' KB'),
(1024 ** 0, (' byte', ' bytes')),
]
for factor, suffix in alternative:
if bytes >= factor:
break
amount = int(bytes/factor)
if isinstance(suffix, tuple):
singular, multiple = suffix
if amount == 1:
suffix = singular
else:
suffix = multiple
return "%s %s" % (str(amount), suffix)
class FullPaths(argparse.Action):
""" Expand user- and relative-paths
"""
def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, os.path.abspath(os.path.expanduser(values)))
def fdLogger(appName, logFile, fileDebugLevel, file_level, console_level=None):
logger = logging.getLogger(appName)
# By default, logs all messages
logger.setLevel(logging.DEBUG)
if console_level != None:
# StreamHandler logs to console
ch = logging.StreamHandler()
ch.setLevel(fileDebugLevel)
chFormat = logging.Formatter('%(levelname)s - %(message)s')
ch.setFormatter(chFormat)
logger.addHandler(ch)
fh = logging.FileHandler(logFile)
fh.setLevel(file_level)
fhFormat = logging.Formatter('%(asctime)s - (%(threadName)-10s) - %(levelname)s: %(message)s')
fh.setFormatter(fhFormat)
logger.addHandler(fh)
return logger
1 Answer 1
I'll try it:
You are requesting for range queries in parallel, and write them all to the disk: Where are you enforcing the order of chunks? I think you may get an unordered file as a result.
I think you loose time instead of saving it, by using thread for
ThreadedFetch
andSplitBufferThread
as they do nothing more than variable setting and pushing in into queues. You win nothing trying to set variables in parallel (remember the GIL, the fact that setting variables will cost virtually no time, but creating a thread costs some time), what is worth running concurrently is what actually costs time, like waiting for the network (or doing huge computation in a context we're not blocked by the GIL).Your code is not PEP8 compliant.
I don't think it's useful to download chunks of file in parallel, you're in both way limited by your bandwidth.
I'm OK with the fact to download files in parallel from multiple servers, just in case a server is slow, but I'm not OK with using as many threads as URLs to download. If you want to download 500 files you'll fork 500 threads, spending a lot of time in context-switch, sharing 1/500 of your bandwidth for each file, all you'll get is timeouts. Better use something like 2 or 3 threads, remember: you're limited by your bandwidth.
Depending on your libc, your program will NOT be thread-safe, at the name resolution level, because it uses
getaddrinfo
, that is known to be thread-safe on Linux but may NOT be (When it uses aAF_NETLINK
socket to query on which interface the DNS query must be sent. (The bug has been fixed but you may not be up to date)). urllib2 will however use a lock on other OSes, known to have a not thread safegetaddrinfo
:./python2.6-2.6.8/Modules/socketmodule.c:180
/* On systems on which getaddrinfo() is believed to not be thread-safe, (this includes the getaddrinfo emulation) protect access with a lock. */ #if defined(WITH_THREAD) && (defined(__APPLE__) || \ (defined(__FreeBSD__) && __FreeBSD_version+0 < 503000) || \ defined(__OpenBSD__) || defined(__NetBSD__) || \ defined(__VMS) || !defined(HAVE_GETADDRINFO)) #define USE_GETADDRINFO_LOCK #endif
You may not mix logger and print usage, stick to logger, use a debug level when needed.
-
\$\begingroup\$ yea, I was trying to implement ThreadPool as in this example: [here][1] However, instead of using urllib2 should I got for using
requests.get(url).content
? regarding maximum number of threads I have usedthreading.BoundedSemaphore
if thats ok ? [1]: ibm.com/developerworks/aix/library/au-threadingpython \$\endgroup\$Ciasto piekarz– Ciasto piekarz2014年07月12日 18:32:49 +00:00Commented Jul 12, 2014 at 18:32 -
\$\begingroup\$ @san: I prefer requests over urllib, big time, so yes, I think you should migrate. You do not need a semaphore : you're not locking the concourent access of a resource. You just need to only spawn the required number of threads, like 3, and that's all. Also I'll drop the threads ThreadedFetch and SplitBufferThread (keeping classes, but dropping the their threads, you'll also reduce complexity and you'll drop one now unused Queue). \$\endgroup\$Julien Palard– Julien Palard2014年07月12日 20:08:28 +00:00Commented Jul 12, 2014 at 20:08
-
\$\begingroup\$ Hi @Julien Palard, do you think that is a usable workaround for the problem you mentioned with getaadrInfo: stackoverflow.com/a/6319043/1622400 \$\endgroup\$Ciasto piekarz– Ciasto piekarz2014年07月20日 08:52:33 +00:00Commented Jul 20, 2014 at 8:52
-
\$\begingroup\$ @san I don't think this fix the bug. The only workaround I used is reducing DNS queries by caching the results, worked like a charm for me as I only hit a few servers. \$\endgroup\$Julien Palard– Julien Palard2014年07月20日 08:55:00 +00:00Commented Jul 20, 2014 at 8:55
-
\$\begingroup\$ How do browsers like firefox, safari, IE, Chrome deal with this problem ? \$\endgroup\$Ciasto piekarz– Ciasto piekarz2014年07月26日 07:02:03 +00:00Commented Jul 26, 2014 at 7:02
Explore related questions
See similar questions with these tags.