I spent the last couple of days reading/researching and learning sqlite and python multithreading.
I'm unwinding shortened urls stored in a sql table with three fields...
id | url_starting | url_unwound
The code chunks the database results, sends the chunk to a thread pool of the same chunk size, unwinds the urls and writes the results back to the database, then iterates to the next chunk.
I'm happy with the results but am looking for feedback as this is my first attempt.
General thoughts are greatly appreciated.
Also, any ideas on making the future_to_url =
section more readable would be great.
import time
import os
import sqlite3
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.request import Request, urlopen
class MyDatabase(object):
def __init__(self, db):
self.__conn = sqlite3.connect(db)
self.__conn.row_factory = sqlite3.Row # https://stackoverflow.com/a/41920171/25197
self.__conn.execute('pragma foreign_keys = on')
self.__conn.commit()
self.__cursor = self.__conn.cursor()
def __enter__(self):
return self
def __del__(self):
self.__conn.close()
def __exit__(self, exc_type, exc_value, traceback):
self.__conn.close()
def query(self, arg):
self.__cursor.execute(arg)
self.__conn.commit()
return self.__cursor
def updateUnwoundUrl(self, url_starting, url_unwound):
self.__cursor.execute(
'UPDATE url SET url_unwound = ? WHERE url_starting = ?', (url_unwound, url_starting))
self.__conn.commit()
return self.__cursor
def unwindUrl(starting_url, timeout):
try:
hdr = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)'}
response = urlopen(Request(starting_url, headers=hdr), timeout=timeout)
unwound_url = response.geturl()
# For any http error just return error. Deal with specifics later.
except Exception:
unwound_url = 'error'
return unwound_url
def chunk_iter(iterable, chunk_size):
it = iter(iterable)
while True:
chunk = tuple(next(it) for _ in range(chunk_size))
if not chunk:
break
yield chunk
def unwindUrls(workers=100, request_timeout=10):
relative_path = os.path.dirname(__file__)
my_db_path = os.path.join(relative_path, 'database', 'my.db')
with MyDatabase(my_db_path) as my_db:
wound_urls = my_db.query(
'SELECT url_starting, id FROM url WHERE url_unwound IS NULL ORDER BY id ASC').fetchall()
chunk_count = len(wound_urls) / workers
for i, wound_urls_chunk in enumerate(chunk_iter(wound_urls, workers), start=1):
print('starting chunk {0} of {1} at {2}'.format(
i, chunk_count, datetime.now().strftime("%I:%M:%S %p")))
with ThreadPoolExecutor(max_workers=workers) as executor:
future_to_url = {executor.submit(
unwindUrl, row['url_starting'], request_timeout): row['url_starting'] for row in wound_urls_chunk}
for future in as_completed(future_to_url):
try:
url_starting = future_to_url[future]
url_unwound = future.result()
my_db.updateUnwoundUrl(url_starting, url_unwound)
# print('starting: {0} unwound: {1}'.format(url_starting, url_unwound))
except Exception as exc:
print('!! {0}'.format(exc))
if __name__ == '__main__':
startTime = time.time()
unwindUrls()
print('==--- unwind_urls took {0} seconds. ---=='.format(round(time.time() - startTime)))
2 Answers 2
__
There is liitle need for the __
. A single _
should suffice to make people clear it's a private attribute. If they really want to use it, __
won't stop the access
Linelength
Especially the lines with queries are too long. Can't you make those queries multiline?
From:
def updateUnwoundUrl(self, url_starting, url_unwound):
self.__cursor.execute(
'UPDATE url SET url_unwound = ? WHERE url_starting = ?', (url_unwound, url_starting))
self.__conn.commit()
return self.__cursor
to:
def updateUnwoundUrl(self, url_starting, url_unwound):
self.__cursor.execute(
'''UPDATE url
SET url_unwound = ?
WHERE url_starting = ?
'''
, (url_unwound, url_starting)
)
self.__conn.commit()
return self.__cursor
Hoist the IO
If ever you want to change the db format to something else, you will need to change the internals of the code and how the my_db_path
is made.
To change this, I would follow the tips from this talk.
Database path
def unwindUrls(workers=100, request_timeout=10):
relative_path = os.path.dirname(__file__)
my_db_path = os.path.join(relative_path, 'database', 'my.db')
First, I would use my_db_path
to handle filepaths.
Secondly I would add relative_path
as an an argument, with Path('.')
as default argument, this way you can specify the path.
def unwindUrls(workers=100, request_timeout=10):
relative_path = os.path.dirname(__file__)
my_db_path = os.path.join(relative_path, 'database', 'my.db')
with MyDatabase(my_db_path) as my_db:
...
if __name__ == '__main__':
unwindUrls()
To:
def unwindUrls(my_db, workers=100, request_timeout=10):
...
if __name__ == '__main__':
my_db_path = pathlib.Path('.') / 'database' / 'my.db'
with MyDatabase(my_db_path) as my_db:
unwindUrls(my_db)
chunk_iter
def chunk_iter(iterable, chunk_size):
it = iter(iterable)
while True:
chunk = tuple(next(it) for _ in range(chunk_size))
if not chunk:
break
yield chunk
raises a Warning
in Python 3.6
C:\miniconda3\envs\test_env\lib\site-packages\ipykernel_launcher.py:4: DeprecationWarning: generator 'chunk_iter..' raised StopIteration after removing the cwd from sys.path.
Better to use itertools.islice
def chunk_iter(iterable, chunk_size):
it = iter(iterable)
while True:
chunk = tuple(islice(it, chunk_size))
if not chunk:
return
yield chunk
Exception
a few time you use except Exception
. This is way too broad. You should specify which Exceptions you want to catch
-
\$\begingroup\$ Hi @Maarten. Thanks for the review! I've implemented everything you mentioned. Thanks for pointing out
islice
andpathlib
. I wasn't yet aware of either. \$\endgroup\$GollyJer– GollyJer2018年03月05日 22:19:56 +00:00Commented Mar 5, 2018 at 22:19
future_to_url =
To answer my own question on this part...
I ended up renaming some variables and using a loop instead of a comprehension to fill the dictionary.
I feel it's more readable in this case.
Instead of this...
with ThreadPoolExecutor(max_workers=workers) as executor:
future_to_url = {executor.submit(
unwindUrl, row['url_starting'], request_timeout): row['url_starting'] for row in wound_urls_chunk}
for future in as_completed(future_to_url):
try:
url_starting = future_to_url[future]
url_unwound = future.result()
my_db.updateUnwoundUrl(url_starting, url_unwound)
except Exception as exc:
print('!! {0}'.format(exc))
This...
with ThreadPoolExecutor(max_workers=workers) as executor:
thread_pool = {}
for row in wound_urls_chunk:
unwindUrl_thread = executor.submit(unwindUrl, row['url_starting'], timeout)
thread_pool[unwindUrl_thread] = row['url_starting']
for completed_thread in as_completed(thread_pool):
try:
url_starting = thread_pool[completed_thread]
url_unwound = completed_thread.result()
my_db.updateUnwoundUrl(url_starting, url_unwound)
except Exception as exc:
print('!! {0}'.format(exc))
Explore related questions
See similar questions with these tags.