2
\$\begingroup\$

The code below downloads file in multiple chunks if split is set to some number, however there is a limit to maximum possible splits defined below by maxSplits if split is 0 or 1 then no extra thread overhead is laid on to split into further thread coming out of ThreadedFetch.

Please review for refactor and efficiency and performance. I am only doubtful that where I am using thread.join() can make the system performance slow, when file size is big since dataDict will be holding back all the chunks in memory.

In terms of style-guide I am not strictly following PEP-8, use of tab instead of 4 spaces and camelCase naming instead of using variables like this_is_variable. Class names start with capital letters.

import argparse
import logging
import Queue
import os
import requests
import signal
import sys
import time
import threading
import utils as _fdUtils
DESKTOP_PATH = os.path.expanduser("~/Desktop")
appName = 'FileDownloader'
logFile = os.path.join(DESKTOP_PATH, '%s.log' % appName)
_log = _fdUtils.fdLogger(appName, logFile, logging.DEBUG, logging.DEBUG, console_level=logging.DEBUG)
queue = Queue.Queue()
STOP_REQUEST = threading.Event()
maxSplits = threading.BoundedSemaphore(3)
threadLimiter = threading.BoundedSemaphore(5)
lock = threading.Lock()
pulledSize = 0
dataDict = {}
def _grabAndWriteToDisk(threadId, split, url, saveTo, first=None, queue=None, mode='wb', irange=None):
 """ Function to download file when single chunk..
 Args:
 url(str): url of file to download
 saveTo(str): path where to save file
 first(int): starting byte of the range
 queue(Queue.Queue): queue object to set status for file download
 mode(str): mode of file to be downloaded
 irange(str): range of byte to download
 """
 fileName = url.split('/')[-1]
 filePath = os.path.join(saveTo, fileName)
 fileSize = int(_fdUtils.getUrlSizeInBytes(url))
 downloadedFileSize = 0 if not first else first
 block_sz = 8192
 resp = requests.get(url, headers={'Range': 'bytes=%s' % irange}, stream=True)
 for fileBuffer in resp.iter_content(block_sz):
 if not fileBuffer:
 break
 with open(filePath, mode) as fd:
 downloadedFileSize += len(fileBuffer)
 fd.write(fileBuffer)
 mode = 'a'
 status = r"%10d [%3.2f%%]" % (downloadedFileSize, downloadedFileSize * 100. / fileSize)
 status = status + chr(8)*(len(status)+1)
 sys.stdout.write('%s\r' % status)
 time.sleep(.01)
 sys.stdout.flush()
 if downloadedFileSize == fileSize:
 STOP_REQUEST.set()
 queue.task_done()
 _log.info("Download Completed %s%% for file %s, saved to %s",
 downloadedFileSize * 100. / fileSize, fileName, saveTo)
def _downloadChunk(url, idx, irange, pulledSize, fileName, sizeInBytes):
 resp = requests.get(url, headers={'Range': 'bytes=%s' % irange}, stream=True)
 chunk_size = irange.split("-")[-1]
 for chunk in resp.iter_content(chunk_size):
 status = r"%10d [%3.2f%%]" % (pulledSize, pulledSize * 100. / int(chunk_size))
 status = status + chr(8)*(len(status)+1)
 sys.stdout.write('%s\r' % status)
 sys.stdout.flush()
 pulledSize += len(chunk)
 dataDict[idx] = chunk
 time.sleep(.03)
 if pulledSize == sizeInBytes:
 _log.info("%s downloaded %3.0f%%", fileName, pulledSize * 100. / sizeInBytes)
class ThreadedFetch(threading.Thread):
 """ docstring for ThreadedFetch
 """
 def __init__(self, queue):
 super(ThreadedFetch, self).__init__()
 self.queue = queue
 self.lock = threading.Lock()
 def run(self):
 threadLimiter.acquire()
 try:
 items = self.queue.get()
 url = items[0]
 saveTo = DESKTOP_PATH if not items[1] else items[1]
 split = items[-1]
 # grab split chunks in separate thread.
 if split > 1:
 maxSplits.acquire()
 try:
 fileName = url.split('/')[-1]
 sizeInBytes = int(_fdUtils.getUrlSizeInBytes(url))
 byteRanges = _fdUtils.getRange(sizeInBytes, split)
 filePath = os.path.join(saveTo, fileName)
 downloaders = [
 threading.Thread(
 target=_downloadChunk, 
 args=(url, idx, irange, int(irange.split('-')[0]), fileName, sizeInBytes),
 )
 for idx, irange in enumerate(byteRanges)
 ]
 # start threads, let run in parallel, wait for all to finish
 for th in downloaders:
 th.start()
 # this makes the wait for all thread to finish
 # which confirms the dataDict is up-to-date
 for th in downloaders:
 th.join()
 downloadedSize = 0
 with open(filePath, 'wb') as fh:
 for _idx, chunk in sorted(dataDict.iteritems()):
 downloadedSize += len(chunk)
 status = r"%10d [%3.2f%%]" % (downloadedSize, downloadedSize * 100. / sizeInBytes)
 status = status + chr(8)*(len(status)+1)
 fh.write(chunk)
 sys.stdout.write('%s\r' % status)
 time.sleep(.04)
 sys.stdout.flush()
 if downloadedSize == sizeInBytes:
 _log.info("%s, saved to %s", fileName, saveTo)
 self.queue.task_done()
 finally:
 maxSplits.release()
 else:
 while not STOP_REQUEST.isSet():
 self.setName("primary_%s" % url.split('/')[-1])
 # if downlaod whole file in single chunk no need
 # to start a new thread, so directly download here.
 _grabAndWriteToDisk(None, split, url, saveTo, 0, self.queue)
 finally:
 threadLimiter.release()
def main(appName):
 args = _fdUtils.getParser()
 urls_saveTo = {}
 # spawn a pool of threads, and pass them queue instance
 # each url will be downloaded concurrently
 for i in xrange(len(args.urls)):
 t = ThreadedFetch(queue)
 t.daemon = True
 t.start()
 split = 3
 try:
 for url in args.urls:
 urls_saveTo[url] = args.saveTo
 # populate queue with data 
 for url, saveTo in urls_saveTo.iteritems():
 queue.put((url, saveTo, split))
 # wait on the queue until everything has been processed
 queue.join()
 _log.info('Finsihed all dowonloads.')
 except (KeyboardInterrupt, SystemExit):
 _log.critical('Received keyboard interrupt, quitting threads.')
200_success
145k22 gold badges190 silver badges478 bronze badges
asked Jul 20, 2014 at 8:39
\$\endgroup\$
4
  • \$\begingroup\$ Obligatory suggestion to read and consider following the Python style guide, PEP-0008. \$\endgroup\$ Commented Jul 20, 2014 at 9:03
  • \$\begingroup\$ @jonrsharpe my apologies I am a bit used to style guide adopted at my workplace and unintentionally I have always end up like this. \$\endgroup\$ Commented Jul 20, 2014 at 9:22
  • \$\begingroup\$ If you're following a different style guide you should mention that (and post a link, if possible). \$\endgroup\$ Commented Jul 20, 2014 at 9:23
  • \$\begingroup\$ cool, I have updated with the message. but I do not have any link to post. \$\endgroup\$ Commented Jul 20, 2014 at 9:37

1 Answer 1

3
\$\begingroup\$

The join seems right to me.

Tip 1: Instead of storing the file in memory using dataDict, you can directly write to file using

f = open('workfile', 'r+')
f.seek(5) # Go to the 6th byte in the file
f.write('0123456789abcdef')

Tip 2: On _grabAndWriteToDisk you should check the following lines.

for fileBuffer in resp.iter_content(block_sz):
 if not fileBuffer:
 break
 with open(filePath, mode) as fd:
 downloadedFileSize += len(fileBuffer)

you are repeatedly opening a file for each chunk.

answered Jun 2, 2015 at 7:09
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.