I am working on below python copy utility that should work on windows & linux, But I am looking for a more efficient approach that could optimize my I/O correction, as my target location is network dependent as well... I have computed the utility execution time factor in the code.
#!/usr/bin/python
"""
Pythonic implementation of multi-target copy (Parallel Copy).
"""
import Queue
import threading
import time
import os, os.path
import sys
import shutil, hashlib
exitFlag = 0
class myThread(threading.Thread):
def __init__(self, threadID, name, queue, idFlag):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.queue = queue
self.idFlag = idFlag
def run(self):
if debugFlag:
print "**** Starting %s" % self.name
process_data(self.name, self.queue, self.idFlag)
if debugFlag:
print "**** Ending %s" % self.name
def copy_files_concurrantly(src_filename, target_filename, idFlag):
"""
"""
sha512_hash = hashlib.sha512()
src_filepath = os.path.join(os.getcwd(), src_filename)
try:
with open(src_filepath, "r") as sf:
statinfo = os.stat(src_filepath)
block_size = 100 * (2 ** 20) # Magic number: 100 * 1MB blocks
nb_blocks = (statinfo.st_size / block_size) + 1
cnt_blocks = 0
l = len(src_filepath.split('\\'))
target_file_path = os.path.join(target_filename, src_filepath.split('\\')[l - 1])
while True:
block = sf.read(block_size)
sha512_hash.update(block) # Todo a blockwise copy
if not block: break
cnt_blocks = cnt_blocks + 1
with open(target_filename, "a") as tf:
tf.write(block)
tf.close()
print "\nCopying %s (to) %s" % (src_filepath, target_filename)
sf.close()
except IOError:
print "Error: cant find or read '%s' file" % (src_filename)
def delete_folder(target_path):
"""
Deletes a folder, if it already exists
@param target_path: Relative path of the directory to delete
"""
if (os.path.exists(target_path) or os.path.isdir(target_path)):
print "Directory %s already exists.. deleting..." % target_path
try:
shutil.rmtree(target_path)
except OSError:
os.remove(target_path)
def process_data(threadName, queue, idFlag):
while not exitFlag:
if not workQueue.empty():
(sfile, dfile) = queue.get()
copy_files_concurrantly(sfile, dfile, idFlag)
time.sleep(0.5)
def queue_mt(argv):
"""
Implementation to do multi-target copy (recursive) of directories
@param argv: Arguments passed at command-line
"""
desc = "Recursively copies the files to destination directories."
syntax = "\nUsage:\n c4.py cp -L -R <src-dir> <target-dir>\n c4.py cp -L -R <src-dir> -t <target-dir1> <target-dir2>..."
options = "\n\n cp\t\t\tCopy operation to perform.\n -L\t\t\tDisplay running logs.(Optional)\n -R\t\t\tRecursively copy source files to target.\n <src-dir>\t\tSpecify source directory to copy.\n <target-dir>\tSpecify target directory to copy."
win = "\n\n Windows: c4.py cp -R d:\src-dir\*.* e:\dst-dir (OR) c4.py cp -R d:\src-dir\*.* -t d:\dst-dir1 e:\dst-dir2"
linux = "\n Linux: c4.py cp -R /src-dir/*.* /dst-dir (OR) c4.py cp -R /src-dir/*.* -t /dst-dir1 /dst-dir2"
cmd_usage = desc + syntax + options + win + linux
# Displays the command-usage incase of incorrect arguments specified
if len(argv) < 4:
print cmd_usage
sys.exit(2)
global threadID, workQueue, debugFlag
threads, threadList, threadID, debugFlag, cnt = [], [], 1, False, 0
stime = time.time()
# Perform single source to single target directory copy
if ((len(argv) == 4) and (("-R" in argv[1]) or ("-r" in argv[1]))) or ((len(argv) == 5) and (("-R" in argv[2]) or ("-r" in argv[2]))):
if (len(argv) == 4):
src_path, dest_path = argv[2], argv[3]
if (len(argv) == 5) and ("-L" in argv[1]):
debugFlag = True
src_path, dest_path = argv[3], argv[4]
if src_path.endswith('/*') or src_path.endswith('\*'):
src_path = src_path[:-2]
if src_path.endswith('/*.*') or src_path.endswith('\*.*'):
src_path = src_path[:-4]
# Computing the file-count recursively traversing the directory
# Excludes the count of number of directories
fcnt = sum([len(f) for r, d, f in os.walk(src_path)])
print "File)s) count in source directory: %d" % fcnt
cnt = fcnt * 1
workQueue = Queue.Queue(cnt)
# Fill the Queue
for root, subfolder, filenames in os.walk(src_path):
newDir = os.path.join(dest_path, root[1 + len(src_path):])
if not os.path.exists(newDir):
os.makedirs(newDir)
else:
delete_folder(newDir)
for filename in filenames:
sfpath = str(os.path.join(root, filename))
dfpath = str(os.path.join(newDir, filename))
workQueue.put((sfpath, dfpath))
if debugFlag:
print "***** Added to Q... %s | %s" % (sfpath, dfpath)
elif ((len(argv) > 4) and (("-t" in argv[3]) or ("-t" in argv[4]))):
if ("-L" in argv[1]):
debugFlag = True
src_path, st = argv[3], 5
else:
src_path, st = argv[2], 4
if src_path.endswith('/*') or src_path.endswith('\*'):
src_path = src_path[:-2]
if src_path.endswith('/*.*') or src_path.endswith('\*.*'):
src_path = src_path[:-4]
# Computing the file-count recursively traversing the directory
# Excludes the count of number of directories
fcnt = sum([len(f) for r, d, f in os.walk(src_path)])
if ("-L" in argv[1]):
dst = (len(argv) - 5)
else:
dst = (len(argv) - 4)
print "File(s) count in source directory:%d | Destination directories count:%s" % (fcnt, dst)
cnt = fcnt * dst
workQueue = Queue.Queue(cnt)
# Fill the Queue
for root, subfolder, filenames in os.walk(src_path):
for i in range(st, (len(argv))):
dest_path = argv[i]
newDir = os.path.join(dest_path, root[1 + len(src_path):])
if not os.path.exists(newDir):
os.makedirs(newDir)
else:
delete_folder(newDir)
for filename in filenames:
sfpath = str(os.path.join(root, filename))
dfpath = str(os.path.join(newDir, filename))
workQueue.put((sfpath, dfpath))
if debugFlag:
print "***** Added to Q... %s | %s" % (sfpath, dfpath)
print "\nGenerating c4id's for source directory files only...\n"
# Create new threads
max_threads = 100
if cnt > max_threads:
cnt = max_threads
for i in range(1, cnt+1):
s = 'Thread'+str(i)
threadList.append(s)
if debugFlag:
print "***** ThreadsList: %s" % str(threadList)
for tName in threadList:
thread = myThread(threadID, tName, workQueue, idFlag=True)
thread.start()
threads.append(thread)
threadID += 1
# Wait for queue to empty
while not workQueue.empty():
pass
# Notify threads its time to exit
global exitFlag
exitFlag = 1
# Wait for all threads to complete
for t in threads:
t.join()
if debugFlag:
print "\nUtility Exec time: %s sec" %(time.time() - stime)
if __name__ == '__main__':
queue_mt(sys.argv[1:])
2 Answers 2
Let me begin with copy_files_concurrantly
.
The sha512_hash
variable is not used, except for being updated. It can go. The same goes for nb_blocks
, cnt_blocks
, stat_info
, target_file_path
, l
.
def copy_files_concurrantly(src_filename, target_filename, idFlag):
"""
"""
src_filepath = os.path.join(os.getcwd(), src_filename)
try:
with open(src_filepath, "r") as sf:
block_size = 100 * (2 ** 20) # Magic number: 100 * 1MB blocks
while True:
block = sf.read(block_size)
if not block: break
with open(target_filename, "a") as tf:
tf.write(block)
tf.close()
print "\nCopying %s (to) %s" % (src_filepath, target_filename)
sf.close()
except IOError:
print "Error: cant find or read '%s' file" % (src_filename)
This way, the code is just a bit more readable, because you're left with only the functional part of the code.
Another thing that's worrisome is using with
blocks and manually closing the file. A with
block closes the file when the code leaves the with
block. Let's get rid of the explicit close
.
def copy_files_concurrantly(src_filename, target_filename, idFlag):
"""
"""
src_filepath = os.path.join(os.getcwd(), src_filename)
try:
with open(src_filepath, "r") as sf:
block_size = 100 * (2 ** 20) # Magic number: 100 * 1MB blocks
while True:
block = sf.read(block_size)
if not block: break
with open(target_filename, "a") as tf:
tf.write(block)
print "\nCopying %s (to) %s" % (src_filepath, target_filename)
except IOError:
print "Error: cant find or read '%s' file" % (src_filename)
Another thing to note is that the target_file is getting opened and closed all the time. We can do better!
def copy_files_concurrantly(src_filename, target_filename, idFlag):
"""
"""
src_filepath = os.path.join(os.getcwd(), src_filename)
try:
with open(src_filepath, "r") as sf, open(target_filename, "a") as tf:
block_size = 100 * (2 ** 20) # Magic number: 100 * 1MB blocks
while True:
block = sf.read(block_size)
if not block: break
tf.write(block)
print "\nCopying %s (to) %s" % (src_filepath, target_filename)
except IOError:
print "Error: cant find or read '%s' file" % (src_filename)
One last improvement: use partial
and iter
:
while True:
block = sf.read(block_size)
if not block:
break
...
can be written as
from functools import partial
for block in iter(partial(sf.read, block_size), ''):
...
Leaving us with
def copy_files_concurrantly(src_filename, target_filename, idFlag):
"""
"""
src_filepath = os.path.join(os.getcwd(), src_filename)
try:
with open(src_filepath, "r") as sf, open(target_filename, "a") as tf:
block_size = 100 * (2 ** 20) # Magic number: 100 * 1MB blocks
for block in iter(partial(sf.read, block_size), ''):
tf.write(block)
print "\nCopying %s (to) %s" % (src_filepath, target_filename)
except IOError:
print "Error: cant find or read '%s' file" % (src_filename)
Another (very!!!) import suggestion would be to move the print
statement up a bit higher.
from functools import partial
def copy_files_concurrantly(src_filename, target_filename, idFlag):
"""
"""
src_filepath = os.path.join(os.getcwd(), src_filename)
print "\nCopying %s (to) %s" % (src_filepath, target_filename)
try:
with open(src_filepath, "r") as sf, open(target_filename, "a") as tf:
block_size = 100 * (2 ** 20) # Magic number: 100 * 1MB blocks
for block in iter(partial(sf.read, block_size), ''):
tf.write(block)
except IOError:
print "Error: cant find or read '%s' file" % (src_filename)
Especially if the file is large, and needs to be transfered over the network, it's good to know what the file is working on.
Now, I assume the "a"
is now a bug, as the file no longer gets truncated. Assuming we can replace it with "w"
instead, we could write:
import shutil
def copy_files_concurrantly(src_filename, target_filename, idFlag):
"""
"""
src_filepath = os.path.join(os.getcwd(), src_filename)
print "\nCopying %s (to) %s" % (src_filepath, target_filename)
try:
shutil.copy(src_filepath, target_filename)
except IOError:
print "Error: cant find or read '%s' file" % (src_filename)
And be done with it.
There's probably a lot that can be cleaned up further in the rest of the code, but I just wanted to give a short overview of how to fix the copy_files_concurrantly
.
-
2\$\begingroup\$ While you're at it, it should be spelled
_concurrently
too. \$\endgroup\$ferada– ferada2016年04月12日 22:11:41 +00:00Commented Apr 12, 2016 at 22:11
One thing I don't quite understand is: Why is process_data
outside of your myThread
class? I mean, you give it the whole set of attributes as parameters; and there is only one call to it: inside of the run
method.
If you did that to use the global exitFlag
in queue_mt
, well, it a bad idea. Globals are considered bad practice and there exist way better alternatives to synchronize informations between threads. You can use event
s for instance:
def queue_mt(argv):
...
# Create new threads
cnt = min(100, cnt) ## No need to do a compare-and-set explicitly here
threadList = ['Thread{}'.format(i) for i in range(1, cnt+1)] ## List comprehension is better than loop + append
if debugFlag:
print "***** ThreadsList:", threadList
exit = threading.Event()
for threadID, tName in enumerate(threadList): ## Let python do the indexing for you
thread = myThread(threadID, tName, workQueue, exit, idFlag=True)
thread.start()
threads.append(thread)
# Wait for queue to empty
while not workQueue.empty():
pass
# Notify threads its time to exit
exit.set()
# Wait for all threads to complete
for t in threads:
t.join()
if debugFlag:
print "\nUtility Exec time: %s sec" %(time.time() - stime)
Small tips in here before going on:
cnt = max_thread if cnt > max_thread else cnt
is clearer (to me) than what you wrote.cnt = min(cnt, max_thread)
is better. You may want to defineMAX_THREAD = 100
as a constant at the top of your file, though.threadList = []
+ loop +threadList.append(...)
is bad practice: use a list comprehension instead.- string concatenation is a poor way of building strings, use
format
you'll have more control over the formatting. - you almost never need to maintain an explicit index in Python. Use
enumerate
while iterating over a collection if you need one.
So we added an Event
as the fourth parameter of myThread
let's handle that. And in the meantime, give a meaningful name to that class:
class ThreadedFileCopy(threading.Thread):
def __init__(self, threadID, name, queue, event, idFlag):
supe(ThreadedFileCopy, self).__init__()
self.threadID = threadID
self.name = name
self.queue = queue
self.stop = event
self.idFlag = idFlag
def run(self):
if debugFlag:
print "**** Starting %s" % self.name
while not self.stop.is_set():
if not workQueue.empty():
(sfile, dfile) = self.queue.get()
copy_files_concurrantly(sfile, dfile, self.idFlag)
time.sleep(0.5)
if debugFlag:
print "**** Ending %s" % self.name
This got me thinking... Where does this global workQueue
come from? Oh there is a global workQueue
in queue_mt
. And you build your thread passing workQueue
as the third parameter. So basically, workQueue
is self.queue
; why not use that in the first place?
Also, looking at its usage, you populate the queue before creating the threads. They are merely consumers and should stop as soon as the queue is empty, but you do not need any synchronization mechanisms for that you can know when the queue is empty directly in the thread: use get_nowait
and handle the exception to terminate gracefully:
class ThreadedFileCopy(threading.Thread):
def __init__(self, threadID, name, queue, idFlag):
supe(ThreadedFileCopy, self).__init__()
self.threadID = threadID
self.name = name
self.queue = queue
self.idFlag = idFlag
def run(self):
if debugFlag:
print "**** Starting %s" % self.name
try:
while True:
(sfile, dfile) = self.queue.get_nowait()
copy_files_concurrantly(sfile, dfile, self.idFlag)
except queue.Empty:
# Continue gracefully when every file has been handled
pass
if debugFlag:
print "**** Ending %s" % self.name
So, it turns out we didn't need that Event
after all. I also removed the time.sleep(0.5)
, I don't understand why you put that here in the first place if performances matters.
Now let's get back to queue_mt
for the usage. Since each thread now stops itself when the queue is empty, there is no need to monitor that in this function. We can also improve the way threads are created and reduce the amount of useless data stored.
MAX_THREADS = 100
def queue_mt(argv):
...
# Create new threads
threadList = [
ThreadedFileCopy(i, 'Thread{}'.format(i+1), workQueue, True)
for i in range(min(MAX_THREADS, cnt))
]
if debugFlag:
print "***** ThreadsList:", threadList
for thread in threadList:
thread.start()
# Wait for all threads to complete
for thread in threadList:
thread.join()
if debugFlag:
print "\nUtility Exec time: %s sec" %(time.time() - stime)
This way you don't need to store both the names and the threads since the latter contains the former. It also lets you build the list of threads faster. Oh, and while I’m at it, stop declaring your variable way earlier than where you will need them (such as threads, threadList, threadID, debugFlag, cnt = [], [], 1, False, 0
). Python is a dynamic language, if you happen to need a variable at some point, just perform an assignment at this point.
However, there is one problem now, the call to print "***** ThreadsList:", threadList
gives an unhelpfull reprensation of the threads in the list. When building the string representation of the list, Python had no difficulties to convert a list of strings to a single string. But now it has to convert a list of threads to a string and it doesn't really know oh to convert a thread to a string. Let's help it a little by adding the __repr__
method to our thread object:
class ThreadedFileCopy(threading.Thread):
def __init__(self, threadID, name, queue, idFlag):
supe(ThreadedFileCopy, self).__init__()
self.threadID = threadID
self.name = name
self.queue = queue
self.idFlag = idFlag
def __repr__(self):
return self.name
def run(self):
if debugFlag:
print "**** Starting %s" % self.name
try:
while True:
(sfile, dfile) = self.queue.get_nowait()
copy_files_concurrantly(sfile, dfile, self.idFlag)
except queue.Empty:
# Continue gracefully when every file has been handled
pass
if debugFlag:
print "**** Ending %s" % self.name
And there it is, same output than previously.
One last note, we removed the need for global workQueue
, we removed the need for threadID
at all, try to avoid using globals for debugFlag
too. You can pass it as a parameter for the thread and call it a day.
I could also have told you about how managing command-line parsing and performing the logic in the same function is pretty bad, you should parse the command line beforehand and then call the function performing the logic with the right set of parameters. I could also have told you about argparse
or getopt
to generate usage message and parse command line options... but I’ll let that to someone else.