1
\$\begingroup\$

I spent the last couple of days reading/researching and learning sqlite and python multithreading.

I'm unwinding shortened urls stored in a sql table with three fields...
id | url_starting | url_unwound

The code chunks the database results, sends the chunk to a thread pool of the same chunk size, unwinds the urls and writes the results back to the database, then iterates to the next chunk.

I'm happy with the results but am looking for feedback as this is my first attempt.

General thoughts are greatly appreciated.
Also, any ideas on making the future_to_url = section more readable would be great.

import time
import os
import sqlite3
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.request import Request, urlopen
class MyDatabase(object):
 def __init__(self, db):
 self.__conn = sqlite3.connect(db)
 self.__conn.row_factory = sqlite3.Row # https://stackoverflow.com/a/41920171/25197
 self.__conn.execute('pragma foreign_keys = on')
 self.__conn.commit()
 self.__cursor = self.__conn.cursor()
 def __enter__(self):
 return self
 def __del__(self):
 self.__conn.close()
 def __exit__(self, exc_type, exc_value, traceback):
 self.__conn.close()
 def query(self, arg):
 self.__cursor.execute(arg)
 self.__conn.commit()
 return self.__cursor
 def updateUnwoundUrl(self, url_starting, url_unwound):
 self.__cursor.execute(
 'UPDATE url SET url_unwound = ? WHERE url_starting = ?', (url_unwound, url_starting))
 self.__conn.commit()
 return self.__cursor
def unwindUrl(starting_url, timeout):
 try:
 hdr = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)'}
 response = urlopen(Request(starting_url, headers=hdr), timeout=timeout)
 unwound_url = response.geturl()
 # For any http error just return error. Deal with specifics later.
 except Exception:
 unwound_url = 'error'
 return unwound_url
def chunk_iter(iterable, chunk_size):
 it = iter(iterable)
 while True:
 chunk = tuple(next(it) for _ in range(chunk_size))
 if not chunk:
 break
 yield chunk
def unwindUrls(workers=100, request_timeout=10):
 relative_path = os.path.dirname(__file__)
 my_db_path = os.path.join(relative_path, 'database', 'my.db')
 with MyDatabase(my_db_path) as my_db:
 wound_urls = my_db.query(
 'SELECT url_starting, id FROM url WHERE url_unwound IS NULL ORDER BY id ASC').fetchall()
 chunk_count = len(wound_urls) / workers
 for i, wound_urls_chunk in enumerate(chunk_iter(wound_urls, workers), start=1):
 print('starting chunk {0} of {1} at {2}'.format(
 i, chunk_count, datetime.now().strftime("%I:%M:%S %p")))
 with ThreadPoolExecutor(max_workers=workers) as executor:
 future_to_url = {executor.submit(
 unwindUrl, row['url_starting'], request_timeout): row['url_starting'] for row in wound_urls_chunk}
 for future in as_completed(future_to_url):
 try:
 url_starting = future_to_url[future]
 url_unwound = future.result()
 my_db.updateUnwoundUrl(url_starting, url_unwound)
 # print('starting: {0} unwound: {1}'.format(url_starting, url_unwound))
 except Exception as exc:
 print('!! {0}'.format(exc))
if __name__ == '__main__':
 startTime = time.time()
 unwindUrls()
 print('==--- unwind_urls took {0} seconds. ---=='.format(round(time.time() - startTime)))
200_success
145k22 gold badges190 silver badges478 bronze badges
asked Mar 5, 2018 at 3:46
\$\endgroup\$

2 Answers 2

4
\$\begingroup\$

__

There is liitle need for the __. A single _ should suffice to make people clear it's a private attribute. If they really want to use it, __ won't stop the access

Linelength

Especially the lines with queries are too long. Can't you make those queries multiline?

From:

def updateUnwoundUrl(self, url_starting, url_unwound):
 self.__cursor.execute(
 'UPDATE url SET url_unwound = ? WHERE url_starting = ?', (url_unwound, url_starting))
 self.__conn.commit()
 return self.__cursor

to:

def updateUnwoundUrl(self, url_starting, url_unwound):
 self.__cursor.execute(
 '''UPDATE url 
 SET url_unwound = ? 
 WHERE url_starting = ?
 '''
 , (url_unwound, url_starting)
 )
 self.__conn.commit()
 return self.__cursor

Hoist the IO

If ever you want to change the db format to something else, you will need to change the internals of the code and how the my_db_path is made.

To change this, I would follow the tips from this talk.

Database path

def unwindUrls(workers=100, request_timeout=10):
 relative_path = os.path.dirname(__file__)
 my_db_path = os.path.join(relative_path, 'database', 'my.db')

First, I would use my_db_path to handle filepaths.

Secondly I would add relative_path as an an argument, with Path('.') as default argument, this way you can specify the path.

def unwindUrls(workers=100, request_timeout=10):
 relative_path = os.path.dirname(__file__)
 my_db_path = os.path.join(relative_path, 'database', 'my.db')
 with MyDatabase(my_db_path) as my_db:
...
if __name__ == '__main__':
 unwindUrls()

To:

def unwindUrls(my_db, workers=100, request_timeout=10):
...
if __name__ == '__main__':
 my_db_path = pathlib.Path('.') / 'database' / 'my.db'
 with MyDatabase(my_db_path) as my_db:
 unwindUrls(my_db)

chunk_iter

def chunk_iter(iterable, chunk_size):
 it = iter(iterable)
 while True:
 chunk = tuple(next(it) for _ in range(chunk_size))
 if not chunk:
 break
 yield chunk

raises a Warning in Python 3.6

C:\miniconda3\envs\test_env\lib\site-packages\ipykernel_launcher.py:4: DeprecationWarning: generator 'chunk_iter..' raised StopIteration after removing the cwd from sys.path.

Better to use itertools.islice

def chunk_iter(iterable, chunk_size):
 it = iter(iterable)
 while True:
 chunk = tuple(islice(it, chunk_size))
 if not chunk:
 return
 yield chunk

Exception

a few time you use except Exception. This is way too broad. You should specify which Exceptions you want to catch

answered Mar 5, 2018 at 10:16
\$\endgroup\$
1
  • \$\begingroup\$ Hi @Maarten. Thanks for the review! I've implemented everything you mentioned. Thanks for pointing out islice and pathlib. I wasn't yet aware of either. \$\endgroup\$ Commented Mar 5, 2018 at 22:19
1
\$\begingroup\$

future_to_url =

To answer my own question on this part...
I ended up renaming some variables and using a loop instead of a comprehension to fill the dictionary. I feel it's more readable in this case.

Instead of this...

with ThreadPoolExecutor(max_workers=workers) as executor:
 future_to_url = {executor.submit(
 unwindUrl, row['url_starting'], request_timeout): row['url_starting'] for row in wound_urls_chunk}
 for future in as_completed(future_to_url):
 try:
 url_starting = future_to_url[future]
 url_unwound = future.result()
 my_db.updateUnwoundUrl(url_starting, url_unwound)
 except Exception as exc:
 print('!! {0}'.format(exc))

This...

with ThreadPoolExecutor(max_workers=workers) as executor:
 thread_pool = {}
 for row in wound_urls_chunk:
 unwindUrl_thread = executor.submit(unwindUrl, row['url_starting'], timeout)
 thread_pool[unwindUrl_thread] = row['url_starting']
 for completed_thread in as_completed(thread_pool):
 try:
 url_starting = thread_pool[completed_thread]
 url_unwound = completed_thread.result()
 my_db.updateUnwoundUrl(url_starting, url_unwound)
 except Exception as exc:
 print('!! {0}'.format(exc))
answered Mar 5, 2018 at 22:36
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.