I'm writing a script that:
- fetch a list of urls from a db (about 10000 urls)
- download all the pages and insert them into the db
- parse the code
- if(some condition) do other inserts into the db
I have a Xeon quad-core with hyper-threading, so a total of 8 thread available and I'm under Linux (64 bit).
I'm using cStringIO
as buffer, pycurl
to fetch the pages, BeautifulSoup
to parse them and MySQLdb
to interact with the database.
I tried to simplify the code below (removing all the try/except, parsing operation, ...).
import cStringIO, threading, MySQLdb.cursors, pycurl
NUM_THREADS = 100
lock_list = threading.Lock()
lock_query = threading.Lock()
db = MySQLdb.connect(host = "...", user = "...", passwd = "...", db = "...", cursorclass=MySQLdb.cursors.DictCursor)
cur = db.cursor()
cur.execute("SELECT...")
rows = cur.fetchall()
rows = [x for x in rows] # convert to a list so it's editable
class MyThread(threading.Thread):
def run(self):
""" initialize a StringIO object and a pycurl object """
while True:
lock_list.acquire() # acquire the lock to extract a url
if not rows: # list is empty, no more url to process
lock_list.release()
break
row = rows.pop()
lock_list.release()
""" download the page with pycurl and do some check """
""" WARNING: possible bottleneck if all the pycurl
connections are waiting for the timeout """
lock_query.acquire()
cur.execute("INSERT INTO ...") # insert the full page into the database
db.commit()
lock_query.release()
"""do some parse with BeautifulSoup using the StringIO object"""
if something is not None:
lock_query.acquire()
cur.execute("INSERT INTO ...") # insert the result of parsing into the database
db.commit()
lock_query.release()
# create and start all the threads
threads = []
for i in range(NUM_THREADS):
t = MyThread()
t.start()
threads.append(t)
# wait for threads to finish
for t in threads:
t.join()
I use multithreading
so I don't need to wait if some requests are going to fail for timeout. That specific thread will wait but the others are free to continue with the other urls.
Here is a screenshot while doing nothing but the script:
enter image description here It seems that 5 cores are busy while the other are not. So the questions are:
- should I create as many cursors as the number of threads?
- do I really need to lock the execution of a query? What happened if a thread execute a cur.execute() but not the db.commit() and another thread come in doing the execution + commit with another query?
- I read about the Queue class, but I'm not sure if I understood correctly: can I use it instead of doing lock + extract a url + release?
- using
multithreading
can I suffer from I/O (network) bottleneck? With 100 threads my speed doesn't exceed ~500Kb/s while my connection can go faster. If I move tomultiprocess
will I see some improvement on this side? - the same question but with MySQL: using my code, there could be a bottleneck on this side? All those lock + insert query + release can be improved in some way?
if the way to go is
multithreading
, is 100 an high number of threads? I mean, too many threads doing I/O requests (or DB queries) are useless because of the mutual exclusion of these operations? Or more threads means more network speed?1:
2 Answers 2
Take a look at the eventlet library. It'll let you write code that fetches all the web pages in parallel without ever explicitly implementing threads or locking.
import cStringIO, threading, MySQLdb.cursors, pycurl
NUM_THREADS = 100
lock_list = threading.Lock()
lock_query = threading.Lock()
The purist that I am, I wouldn't make this locks globals.
db = MySQLdb.connect(host = "...", user = "...", passwd = "...", db = "...", cursorclass=MySQLdb.cursors.DictCursor)
cur = db.cursor()
cur.execute("SELECT...")
rows = cur.fetchall()
rows = [x for x in rows] # convert to a list so it's editable
It would make more sense to do this sort of thing after you've define your classes. At least that would be python's convention.
class MyThread(threading.Thread):
def run(self):
""" initialize a StringIO object and a pycurl object """
that's pretty much the most terrible description of this function I could have come up with. (You seem to be thinking of that as a comment, but by convention this should be a docstring, and describe the function)
while True:
lock_list.acquire() # acquire the lock to extract a url
if not rows: # list is empty, no more url to process
lock_list.release()
break
row = rows.pop()
lock_list.release()
It'd be a lot simpler to use a queue. It'd basically do all of that part for you.
""" download the page with pycurl and do some check """
""" WARNING: possible bottleneck if all the pycurl
connections are waiting for the timeout """
lock_query.acquire()
cur.execute("INSERT INTO ...") # insert the full page into the database
db.commit()
lock_query.release()
It'd be better to put this data in another queue and have a database thread take care of it. This works, but I think the multi-queue approach would be cleaner.
"""do some parse with BeautifulSoup using the StringIO object"""
if something is not None:
lock_query.acquire()
cur.execute("INSERT INTO ...") # insert the result of parsing into the database
db.commit()
lock_query.release()
Same here. Note that there is no point in python of trying to split up processing using threads. The GIL means you'll get no advatange.
# create and start all the threads
threads = []
for i in range(NUM_THREADS):
t = MyThread()
t.start()
threads.append(t)
# wait for threads to finish
for t in threads:
t.join()
Q: should I create as many cursors as the number of threads?
A: (削除) Yes (削除ここまで) Maybe. Don't share DB connection among threads, as docs say thread safety level = 1. Maybe better to have a queue
of db connections. Once a thread popped a cursor from the queue, it's his only.
Q: do I really need to lock the execution of a query?
A: No. Trust your DB to take care for its own locking. That's what DBs are for.
Q: I read about the Queue class...
A: (削除) You don't need any locks at all in this code. Just don't share anything. (削除ここまで) Yap, a queue
of db connections would be great here.
Q: using multithreading can I suffer from I/O (network) bottleneck?
A: Yes, but that's not a point against threads.
Q: using my code, there could be a bottleneck...
A: Though 'bottles necks' should be verified by testing, not by reading anonymous posts on forums, it's very likely that downloading the files will always be your bottle neck, regardless of implementation.
Q: if the way to go is multithreading, is 100 an high number of threads?
A: I don't think you should explicitly use threads at all here. Can't you assign a callback to an async http request?
Code sample for async http request, taken almost as-is from this post:
I still owe you the db part.
import socket
import asyncore
from cStringIO import StringIO
from urlparse import urlparse
def noop(*args):
pass
class HttpClient(asyncore.dispatcher):
def __init__(self, url, callback = noop):
self.url = url
asyncore.dispatcher.__init__(self)
self.write_buffer = 'GET %s HTTP/1.0\r\n\r\n' % self.url
self.read_buffer = StringIO()
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect((urlparse(url).netloc, 80))
self.on_close = callback
def handle_read(self):
data = self.recv(8192)
self.read_buffer.write(data)
def handle_write(self):
sent = self.send(self.write_buffer)
self.write_buffer = self.write_buffer[sent:]
def handle_close(self):
self.close()
self.on_close(self.url, self.read_buffer.getvalue())
def parse(source, response):
print source, 'got', len(response), 'bytes'
if __name__ == '__main__':
clients = [HttpClient('http://codereview.stackexchange.com/questions/18618/improve-multithreading-with-network-io-and-database-queries/18642#18642/', parse),
HttpClient('http://www.doughellmann.com/PyMOTW/contents.html', parse)]
print ('LOOP STARTING')
asyncore.loop()
print ('LOOP DONE')