I am currently working with the Stack Exchange Data Dump - to be more precise - with the dumped Posts.xml
data set from Stack Overflow.
What am I trying to achieve? I want to read the whole data set and import each row (a post on Stack Overflow) as a document into an MongoDB database.
What am I doing right now?
I am using the iterparse()
function from lxml
to iterate over each row, without building a DOM. Every row contains attributes which hold the actual data. As every attribute is a String, I need to parse some attributes into Integers, Dates and Lists. This is done by the attrib_to_dict()
function. The resulting Dictionary is simply inserted into the database collection.
What is the problem?
The parsing of the attributes is quite slow. The whole process took about two hours on my machine. By using the multiprocessing
module I was able to speed up the process substantially. The iteration over the whole data set without doing anything is quite fast.
# main.py
from lxml import etree as et
from tqdm import tqdm
import multiprocessing as mp
import pymongo
from constants import POSTS_SIZE
from posts import attrib_to_dict
client = pymongo.MongoClient("mongodb://localhost:27017/")
# database
stackoverflow = client["stackoverflow"]
# collection
posts = stackoverflow["posts"]
def work(elem):
try:
# turn the String back into an element, pass attributes to parsing function
posts.insert_one(attrib_to_dict(et.fromstring(elem).attrib))
except pymongo.errors.DuplicateKeyError:
# skip element
pass
if __name__ == "__main__":
pool = mp.Pool(4)
# progress bar
pbar = tqdm(total=POSTS_SIZE)
def update(*args):
# add one to total processed elements
pbar.update(1)
try:
for event, elem in et.iterparse("Posts.xml", tag="row"):
# pass element as a String to the worker
# passing the attribute object directly did not seem to work
pool.apply_async(work, args=(et.tostring(elem),), callback=update)
elem.clear()
pool.close()
except KeyboardInterrupt:
pool.terminate()
finally:
pbar.close()
pool.join()
# posts.py
from datetime import datetime as dt
def attrib_to_dict(attrib):
result = {}
result["_id"] = int(attrib.get("Id"))
result["PostTypeId"] = int(attrib.get("PostTypeId"))
# nullable attributes
acceptedAnswerId = attrib.get("AcceptedAnswerId")
if acceptedAnswerId: result["AcceptedAnswerId"] = int(acceptedAnswerId)
result["CreationDate"] = dt.fromisoformat(attrib.get("CreationDate"))
# about 10 more conversions ...
tags = attrib.get("Tags")
# "<python><mongodb>" -> ["python", "mongodb"]
if tags: result["Tags"] = [tag[:-1] for tag in tags.split("<")[1:]]
return result
Some performance metrics:
no inserts, no parsing, passing None to worker: 13427.88 items/s
no inserts, no parsing, passing et.tostring(elem) to worker: 10177.07 items/s
no inserts, parsing, passing et.tostring(elem) to worker: 9637.41 items/s
inserts, parsing, passing et.tostring(elem) to worker: 7185.15 items/s
-
4\$\begingroup\$ Hey, welcome to Code Review! Here we like to have as much of the code as possible. You might get better reviews if you also include the skipped conversions, there might be some structure there that helps simplifying it. You might also want to mention the size of the XML file (14.6GB in 7z format). Have you tried profiling your code to see which part exactly is the slowest? Maybe it is the inserting rows one at a time into the DB and not the parsing? \$\endgroup\$Graipher– Graipher2020年05月18日 20:47:16 +00:00Commented May 18, 2020 at 20:47
-
\$\begingroup\$ Thanks for your comment and interest! The skipped conversions are structurally build like the ones posted, but simply with other attribute names, which is why I have skipped them. I included all types of conversions that I used. I added some performance metrics to show the processing speed with and without parsing, etc. It seems that the insertion into the database is slowing down the process more than the parsing itself. \$\endgroup\$djozefiak– djozefiak2020年05月19日 19:36:53 +00:00Commented May 19, 2020 at 19:36
1 Answer 1
Since inserting into the DB takes a non-negligible amount of time, you should try to use insert_many
, instead of insert_one
. If you used a single thread this would be easy, just chunk your file and insert a whole chunk. Since you are using multiprocessing, this is a bit more complicated, but it should still be doable
(untested code)
from itertools import islice
import pymongo
import multiprocessing as mp
from tqdm import tqdm
import et
def chunks(iterable, n):
it = iter(iterable)
while (chunk := tuple(islice(it, n))): # Python 3.8+
yield chunk
def work(chunk):
try:
posts.insert_many([attrib_to_dict(elem.attrib) for _, elem in chunk],
ordered=False)
except pymongo.errors.BulkWriteError:
# skip element
pass
if __name__ == "__main__":
pool = mp.Pool(4)
# progress bar
pbar = tqdm(total=POSTS_SIZE)
n = 100
try:
for chunk in chunks(et.iterparse("Posts.xml", tag="row"), n):
pool.apply_async(work, args=(chunk,),
callback=lambda: pbar.update(len(chunk)))
pool.close()
except KeyboardInterrupt:
pool.terminate()
finally:
pbar.close()
pool.join()
Here I used this to ignore duplicate keys.
-
1\$\begingroup\$ Thanks for your help! I'm struggling with the multiprocessing implementation because I'm not able to clear the current element correctly with
elem.clear()
to free memory. I used your chunk based approach with a single thread instead which already resulted in a processing speed of 15596.80 items/s. Maybe updating the progress bar every item also slowed the process down, I am updating it for every chunk now instead. \$\endgroup\$djozefiak– djozefiak2020年05月27日 13:02:48 +00:00Commented May 27, 2020 at 13:02
Explore related questions
See similar questions with these tags.