2
\$\begingroup\$

I am working on below python copy utility that should work on windows & linux, But I am looking for a more efficient approach that could optimize my I/O correction, as my target location is network dependent as well... I have computed the utility execution time factor in the code.

#!/usr/bin/python
"""
Pythonic implementation of multi-target copy (Parallel Copy). 
"""
import Queue
import threading
import time
import os, os.path
import sys
import shutil, hashlib
exitFlag = 0
class myThread(threading.Thread):
 def __init__(self, threadID, name, queue, idFlag):
 threading.Thread.__init__(self)
 self.threadID = threadID
 self.name = name
 self.queue = queue
 self.idFlag = idFlag
 def run(self):
 if debugFlag:
 print "**** Starting %s" % self.name
 process_data(self.name, self.queue, self.idFlag)
 if debugFlag:
 print "**** Ending %s" % self.name
def copy_files_concurrantly(src_filename, target_filename, idFlag):
 """
 """
 sha512_hash = hashlib.sha512()
 src_filepath = os.path.join(os.getcwd(), src_filename)
 try:
 with open(src_filepath, "r") as sf:
 statinfo = os.stat(src_filepath)
 block_size = 100 * (2 ** 20) # Magic number: 100 * 1MB blocks
 nb_blocks = (statinfo.st_size / block_size) + 1
 cnt_blocks = 0
 l = len(src_filepath.split('\\'))
 target_file_path = os.path.join(target_filename, src_filepath.split('\\')[l - 1])
 while True:
 block = sf.read(block_size)
 sha512_hash.update(block) # Todo a blockwise copy
 if not block: break
 cnt_blocks = cnt_blocks + 1
 with open(target_filename, "a") as tf:
 tf.write(block)
 tf.close()
 print "\nCopying %s (to) %s" % (src_filepath, target_filename)
 sf.close()
 except IOError:
 print "Error: cant find or read '%s' file" % (src_filename)
def delete_folder(target_path):
 """
 Deletes a folder, if it already exists
 @param target_path: Relative path of the directory to delete
 """
 if (os.path.exists(target_path) or os.path.isdir(target_path)):
 print "Directory %s already exists.. deleting..." % target_path
 try:
 shutil.rmtree(target_path)
 except OSError:
 os.remove(target_path)
def process_data(threadName, queue, idFlag):
 while not exitFlag:
 if not workQueue.empty():
 (sfile, dfile) = queue.get()
 copy_files_concurrantly(sfile, dfile, idFlag)
 time.sleep(0.5)
def queue_mt(argv):
 """
 Implementation to do multi-target copy (recursive) of directories
 @param argv: Arguments passed at command-line 
 """
 desc = "Recursively copies the files to destination directories."
 syntax = "\nUsage:\n c4.py cp -L -R <src-dir> <target-dir>\n c4.py cp -L -R <src-dir> -t <target-dir1> <target-dir2>..."
 options = "\n\n cp\t\t\tCopy operation to perform.\n -L\t\t\tDisplay running logs.(Optional)\n -R\t\t\tRecursively copy source files to target.\n <src-dir>\t\tSpecify source directory to copy.\n <target-dir>\tSpecify target directory to copy."
 win = "\n\n Windows: c4.py cp -R d:\src-dir\*.* e:\dst-dir (OR) c4.py cp -R d:\src-dir\*.* -t d:\dst-dir1 e:\dst-dir2"
 linux = "\n Linux: c4.py cp -R /src-dir/*.* /dst-dir (OR) c4.py cp -R /src-dir/*.* -t /dst-dir1 /dst-dir2"
 cmd_usage = desc + syntax + options + win + linux
 # Displays the command-usage incase of incorrect arguments specified 
 if len(argv) < 4:
 print cmd_usage
 sys.exit(2)
 global threadID, workQueue, debugFlag
 threads, threadList, threadID, debugFlag, cnt = [], [], 1, False, 0
 stime = time.time()
 # Perform single source to single target directory copy
 if ((len(argv) == 4) and (("-R" in argv[1]) or ("-r" in argv[1]))) or ((len(argv) == 5) and (("-R" in argv[2]) or ("-r" in argv[2]))):
 if (len(argv) == 4):
 src_path, dest_path = argv[2], argv[3]
 if (len(argv) == 5) and ("-L" in argv[1]):
 debugFlag = True
 src_path, dest_path = argv[3], argv[4]
 if src_path.endswith('/*') or src_path.endswith('\*'):
 src_path = src_path[:-2]
 if src_path.endswith('/*.*') or src_path.endswith('\*.*'):
 src_path = src_path[:-4]
 # Computing the file-count recursively traversing the directory
 # Excludes the count of number of directories
 fcnt = sum([len(f) for r, d, f in os.walk(src_path)])
 print "File)s) count in source directory: %d" % fcnt
 cnt = fcnt * 1
 workQueue = Queue.Queue(cnt)
 # Fill the Queue
 for root, subfolder, filenames in os.walk(src_path):
 newDir = os.path.join(dest_path, root[1 + len(src_path):])
 if not os.path.exists(newDir):
 os.makedirs(newDir)
 else:
 delete_folder(newDir)
 for filename in filenames:
 sfpath = str(os.path.join(root, filename))
 dfpath = str(os.path.join(newDir, filename))
 workQueue.put((sfpath, dfpath))
 if debugFlag:
 print "***** Added to Q... %s | %s" % (sfpath, dfpath)
 elif ((len(argv) > 4) and (("-t" in argv[3]) or ("-t" in argv[4]))):
 if ("-L" in argv[1]):
 debugFlag = True
 src_path, st = argv[3], 5
 else:
 src_path, st = argv[2], 4
 if src_path.endswith('/*') or src_path.endswith('\*'):
 src_path = src_path[:-2]
 if src_path.endswith('/*.*') or src_path.endswith('\*.*'):
 src_path = src_path[:-4]
 # Computing the file-count recursively traversing the directory
 # Excludes the count of number of directories
 fcnt = sum([len(f) for r, d, f in os.walk(src_path)])
 if ("-L" in argv[1]):
 dst = (len(argv) - 5)
 else:
 dst = (len(argv) - 4)
 print "File(s) count in source directory:%d | Destination directories count:%s" % (fcnt, dst)
 cnt = fcnt * dst
 workQueue = Queue.Queue(cnt)
 # Fill the Queue
 for root, subfolder, filenames in os.walk(src_path):
 for i in range(st, (len(argv))):
 dest_path = argv[i]
 newDir = os.path.join(dest_path, root[1 + len(src_path):])
 if not os.path.exists(newDir):
 os.makedirs(newDir)
 else:
 delete_folder(newDir)
 for filename in filenames:
 sfpath = str(os.path.join(root, filename))
 dfpath = str(os.path.join(newDir, filename))
 workQueue.put((sfpath, dfpath))
 if debugFlag:
 print "***** Added to Q... %s | %s" % (sfpath, dfpath)
 print "\nGenerating c4id's for source directory files only...\n"
 # Create new threads
 max_threads = 100
 if cnt > max_threads:
 cnt = max_threads
 for i in range(1, cnt+1):
 s = 'Thread'+str(i)
 threadList.append(s)
 if debugFlag:
 print "***** ThreadsList: %s" % str(threadList)
 for tName in threadList:
 thread = myThread(threadID, tName, workQueue, idFlag=True)
 thread.start()
 threads.append(thread)
 threadID += 1
 # Wait for queue to empty
 while not workQueue.empty():
 pass
 # Notify threads its time to exit
 global exitFlag
 exitFlag = 1
 # Wait for all threads to complete
 for t in threads:
 t.join()
 if debugFlag:
 print "\nUtility Exec time: %s sec" %(time.time() - stime)
if __name__ == '__main__':
 queue_mt(sys.argv[1:])
mdfst13
22.4k6 gold badges34 silver badges70 bronze badges
asked Apr 12, 2016 at 20:09
\$\endgroup\$

2 Answers 2

2
\$\begingroup\$

Let me begin with copy_files_concurrantly.

The sha512_hash variable is not used, except for being updated. It can go. The same goes for nb_blocks, cnt_blocks, stat_info, target_file_path, l.

def copy_files_concurrantly(src_filename, target_filename, idFlag):
 """
 """
 src_filepath = os.path.join(os.getcwd(), src_filename)
 try:
 with open(src_filepath, "r") as sf:
 block_size = 100 * (2 ** 20) # Magic number: 100 * 1MB blocks
 while True:
 block = sf.read(block_size)
 if not block: break
 with open(target_filename, "a") as tf:
 tf.write(block)
 tf.close()
 print "\nCopying %s (to) %s" % (src_filepath, target_filename)
 sf.close()
 except IOError:
 print "Error: cant find or read '%s' file" % (src_filename)

This way, the code is just a bit more readable, because you're left with only the functional part of the code.

Another thing that's worrisome is using with blocks and manually closing the file. A with block closes the file when the code leaves the with block. Let's get rid of the explicit close.

def copy_files_concurrantly(src_filename, target_filename, idFlag):
 """
 """
 src_filepath = os.path.join(os.getcwd(), src_filename)
 try:
 with open(src_filepath, "r") as sf:
 block_size = 100 * (2 ** 20) # Magic number: 100 * 1MB blocks
 while True:
 block = sf.read(block_size)
 if not block: break
 with open(target_filename, "a") as tf:
 tf.write(block)
 print "\nCopying %s (to) %s" % (src_filepath, target_filename)
 except IOError:
 print "Error: cant find or read '%s' file" % (src_filename)

Another thing to note is that the target_file is getting opened and closed all the time. We can do better!

def copy_files_concurrantly(src_filename, target_filename, idFlag):
 """
 """
 src_filepath = os.path.join(os.getcwd(), src_filename)
 try:
 with open(src_filepath, "r") as sf, open(target_filename, "a") as tf:
 block_size = 100 * (2 ** 20) # Magic number: 100 * 1MB blocks
 while True:
 block = sf.read(block_size)
 if not block: break
 tf.write(block)
 print "\nCopying %s (to) %s" % (src_filepath, target_filename)
 except IOError:
 print "Error: cant find or read '%s' file" % (src_filename)

One last improvement: use partial and iter:

while True:
 block = sf.read(block_size)
 if not block:
 break
 ...

can be written as

from functools import partial
for block in iter(partial(sf.read, block_size), ''):
 ...

Leaving us with

def copy_files_concurrantly(src_filename, target_filename, idFlag):
 """
 """
 src_filepath = os.path.join(os.getcwd(), src_filename)
 try:
 with open(src_filepath, "r") as sf, open(target_filename, "a") as tf:
 block_size = 100 * (2 ** 20) # Magic number: 100 * 1MB blocks
 for block in iter(partial(sf.read, block_size), ''):
 tf.write(block)
 print "\nCopying %s (to) %s" % (src_filepath, target_filename)
 except IOError:
 print "Error: cant find or read '%s' file" % (src_filename)

Another (very!!!) import suggestion would be to move the print statement up a bit higher.

from functools import partial
def copy_files_concurrantly(src_filename, target_filename, idFlag):
 """
 """
 src_filepath = os.path.join(os.getcwd(), src_filename)
 print "\nCopying %s (to) %s" % (src_filepath, target_filename)
 try:
 with open(src_filepath, "r") as sf, open(target_filename, "a") as tf:
 block_size = 100 * (2 ** 20) # Magic number: 100 * 1MB blocks
 for block in iter(partial(sf.read, block_size), ''):
 tf.write(block)
 except IOError:
 print "Error: cant find or read '%s' file" % (src_filename)

Especially if the file is large, and needs to be transfered over the network, it's good to know what the file is working on.

Now, I assume the "a" is now a bug, as the file no longer gets truncated. Assuming we can replace it with "w" instead, we could write:

import shutil
def copy_files_concurrantly(src_filename, target_filename, idFlag):
 """
 """
 src_filepath = os.path.join(os.getcwd(), src_filename)
 print "\nCopying %s (to) %s" % (src_filepath, target_filename)
 try:
 shutil.copy(src_filepath, target_filename)
 except IOError:
 print "Error: cant find or read '%s' file" % (src_filename)

And be done with it.

There's probably a lot that can be cleaned up further in the rest of the code, but I just wanted to give a short overview of how to fix the copy_files_concurrantly.

answered Apr 12, 2016 at 20:48
\$\endgroup\$
1
  • 2
    \$\begingroup\$ While you're at it, it should be spelled _concurrently too. \$\endgroup\$ Commented Apr 12, 2016 at 22:11
1
\$\begingroup\$

One thing I don't quite understand is: Why is process_data outside of your myThread class? I mean, you give it the whole set of attributes as parameters; and there is only one call to it: inside of the run method.

If you did that to use the global exitFlag in queue_mt, well, it a bad idea. Globals are considered bad practice and there exist way better alternatives to synchronize informations between threads. You can use events for instance:

def queue_mt(argv):
 ...
 # Create new threads
 cnt = min(100, cnt) ## No need to do a compare-and-set explicitly here
 threadList = ['Thread{}'.format(i) for i in range(1, cnt+1)] ## List comprehension is better than loop + append
 if debugFlag:
 print "***** ThreadsList:", threadList
 exit = threading.Event()
 for threadID, tName in enumerate(threadList): ## Let python do the indexing for you
 thread = myThread(threadID, tName, workQueue, exit, idFlag=True)
 thread.start()
 threads.append(thread)
 # Wait for queue to empty
 while not workQueue.empty():
 pass
 # Notify threads its time to exit
 exit.set()
 # Wait for all threads to complete
 for t in threads:
 t.join()
 if debugFlag:
 print "\nUtility Exec time: %s sec" %(time.time() - stime)

Small tips in here before going on:

  • cnt = max_thread if cnt > max_thread else cnt is clearer (to me) than what you wrote. cnt = min(cnt, max_thread) is better. You may want to define MAX_THREAD = 100 as a constant at the top of your file, though.
  • threadList = [] + loop + threadList.append(...) is bad practice: use a list comprehension instead.
  • string concatenation is a poor way of building strings, use format you'll have more control over the formatting.
  • you almost never need to maintain an explicit index in Python. Use enumerate while iterating over a collection if you need one.

So we added an Event as the fourth parameter of myThread let's handle that. And in the meantime, give a meaningful name to that class:

class ThreadedFileCopy(threading.Thread):
 def __init__(self, threadID, name, queue, event, idFlag):
 supe(ThreadedFileCopy, self).__init__()
 self.threadID = threadID
 self.name = name
 self.queue = queue
 self.stop = event
 self.idFlag = idFlag
 def run(self):
 if debugFlag:
 print "**** Starting %s" % self.name
 while not self.stop.is_set():
 if not workQueue.empty():
 (sfile, dfile) = self.queue.get()
 copy_files_concurrantly(sfile, dfile, self.idFlag)
 time.sleep(0.5)
 if debugFlag:
 print "**** Ending %s" % self.name

This got me thinking... Where does this global workQueue come from? Oh there is a global workQueue in queue_mt. And you build your thread passing workQueue as the third parameter. So basically, workQueue is self.queue; why not use that in the first place?

Also, looking at its usage, you populate the queue before creating the threads. They are merely consumers and should stop as soon as the queue is empty, but you do not need any synchronization mechanisms for that you can know when the queue is empty directly in the thread: use get_nowait and handle the exception to terminate gracefully:

class ThreadedFileCopy(threading.Thread):
 def __init__(self, threadID, name, queue, idFlag):
 supe(ThreadedFileCopy, self).__init__()
 self.threadID = threadID
 self.name = name
 self.queue = queue
 self.idFlag = idFlag
 def run(self):
 if debugFlag:
 print "**** Starting %s" % self.name
 try:
 while True:
 (sfile, dfile) = self.queue.get_nowait()
 copy_files_concurrantly(sfile, dfile, self.idFlag)
 except queue.Empty:
 # Continue gracefully when every file has been handled
 pass
 if debugFlag:
 print "**** Ending %s" % self.name

So, it turns out we didn't need that Event after all. I also removed the time.sleep(0.5), I don't understand why you put that here in the first place if performances matters.


Now let's get back to queue_mt for the usage. Since each thread now stops itself when the queue is empty, there is no need to monitor that in this function. We can also improve the way threads are created and reduce the amount of useless data stored.

MAX_THREADS = 100
def queue_mt(argv):
 ...
 # Create new threads
 threadList = [
 ThreadedFileCopy(i, 'Thread{}'.format(i+1), workQueue, True)
 for i in range(min(MAX_THREADS, cnt))
 ]
 if debugFlag:
 print "***** ThreadsList:", threadList
 for thread in threadList:
 thread.start()
 # Wait for all threads to complete
 for thread in threadList:
 thread.join()
 if debugFlag:
 print "\nUtility Exec time: %s sec" %(time.time() - stime)

This way you don't need to store both the names and the threads since the latter contains the former. It also lets you build the list of threads faster. Oh, and while I’m at it, stop declaring your variable way earlier than where you will need them (such as threads, threadList, threadID, debugFlag, cnt = [], [], 1, False, 0). Python is a dynamic language, if you happen to need a variable at some point, just perform an assignment at this point.

However, there is one problem now, the call to print "***** ThreadsList:", threadList gives an unhelpfull reprensation of the threads in the list. When building the string representation of the list, Python had no difficulties to convert a list of strings to a single string. But now it has to convert a list of threads to a string and it doesn't really know oh to convert a thread to a string. Let's help it a little by adding the __repr__ method to our thread object:

class ThreadedFileCopy(threading.Thread):
 def __init__(self, threadID, name, queue, idFlag):
 supe(ThreadedFileCopy, self).__init__()
 self.threadID = threadID
 self.name = name
 self.queue = queue
 self.idFlag = idFlag
 def __repr__(self):
 return self.name
 def run(self):
 if debugFlag:
 print "**** Starting %s" % self.name
 try:
 while True:
 (sfile, dfile) = self.queue.get_nowait()
 copy_files_concurrantly(sfile, dfile, self.idFlag)
 except queue.Empty:
 # Continue gracefully when every file has been handled
 pass
 if debugFlag:
 print "**** Ending %s" % self.name

And there it is, same output than previously.


One last note, we removed the need for global workQueue, we removed the need for threadID at all, try to avoid using globals for debugFlag too. You can pass it as a parameter for the thread and call it a day.

I could also have told you about how managing command-line parsing and performing the logic in the same function is pretty bad, you should parse the command line beforehand and then call the function performing the logic with the right set of parameters. I could also have told you about argparse or getopt to generate usage message and parse command line options... but I’ll let that to someone else.

answered Apr 13, 2016 at 11:09
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.