I wrote a simple sitemap.xml checker using asyncio and aiohttp. I followed the documentation demonstrating the producer/consumer pattern. However, I noticed that as the URLs scale larger, it seems to get slower in performance. Is there something I'm doing wrong? Can I improve request speed?
Use case:
When check()
is given the URL https://www.google.com/flights/sitemap.xml with ~310 links, it takes approximately 00:03:24 minutes to complete. Github source code is available if needed.
# -*- coding: utf-8 -*-
from timeit import default_timer as timer
from sys import exit as abort
import time
import sys
import logging
import asyncio
import aiohttp
import defusedxml.ElementTree
class Logger(object):
FMT = '%(name)s: %(levelname)s: %(message)s'
def __init__(self):
self._logger = logging.getLogger(__name__)
self._logger.setLevel(level=logging.INFO)
stdout = logging.StreamHandler(stream=sys.stdout)
stderr = logging.StreamHandler(stream=sys.stderr)
stdout.setLevel(level=logging.INFO)
stderr.setLevel(level=logging.WARNING)
stdout.addFilter(lambda record: record.levelno == logging.INFO)
stdout.setFormatter(
logging.Formatter(
fmt=self.FMT,
datefmt=None,
style='%'))
stderr.setFormatter(
logging.Formatter(
fmt=self.FMT,
datefmt=None,
style='%'))
self._logger.addHandler(hdlr=stdout)
self._logger.addHandler(hdlr=stderr)
def __del__(self):
if not self._logger.hasHandlers():
return
for handler in self._logger.handlers:
if isinstance(handler, logging.StreamHandler):
handler.flush()
handler.close()
self._logger.removeHandler(handler)
class Config(object):
"""Base Config."""
LIMIT = 100
TIMEOUT = None
USER_AGENT = 'Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)'
MAXSIZE = 0
class ProdConfig(Config):
"""Prod Config."""
TIMEOUT = 8
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36'
MAXSIZE = 500
class Checker(object):
"""Sitemap Checker."""
def __init__(self):
self._logger = Logger()
self._loop = asyncio.get_event_loop()
self._queue = asyncio.Queue(
maxsize=ProdConfig.MAXSIZE, loop=self._loop)
def check(self, url):
"""Main() entry-point."""
start = timer()
self._loop.run_until_complete(self._fetch_links(url))
elapsed = time.strftime(
'%H:%M:%S', time.gmtime(timer() - start))
self._logger._logger.info('time elapsed {}'.format(elapsed))
async def _fetch_doc(self, client, url):
"""Fetch a sitemap.xml document."""
self._logger._logger.info('fetching sitemap @ {}'.format(url))
try:
async with client.get(
url=url,
allow_redirects=True,
timeout=ProdConfig.TIMEOUT,
verify_ssl=True
if url.startswith('https') else False) as response:
response.raise_for_status()
return await response.text()
except aiohttp.ClientResponseError as error:
self._logger._logger.error(
'sitemap yielded <{}>'.format(
error.status))
except aiohttp.ClientError as error:
self._logger._logger.error(str(error))
abort(1)
async def _producer(self, doc):
"""Parse sitemap.xml and queue discovered links."""
try:
root = defusedxml.ElementTree.fromstring(doc)
except defusedxml.ElementTree.ParseError:
self._logger._logger.error('failed to parse *.xml document')
abort(1)
self._logger._logger.info(
'*.xml document contains ({}) links'.format(
len(root)))
for link in root:
if link:
await self._queue.put(''.join(link[0].text.split()))
async def _consumer(self, client):
"""Process queued links with HEAD requests."""
while True:
url = await self._queue.get()
async with client.head(
url=url,
allow_redirects=True,
timeout=ProdConfig.TIMEOUT,
verify_ssl=True if url.startswith('https') else False) as http:
self._logger._logger.info(
'<{}> {} - {}'.format(http.status, http.reason, url))
self._queue.task_done()
async def _fetch_links(self, url):
"""Fetch sitemap.xml links."""
headers = {'User-Agent': ProdConfig.USER_AGENT}
connector = aiohttp.TCPConnector(
limit=ProdConfig.LIMIT, loop=self._loop)
async with aiohttp.ClientSession(
connector=connector, loop=self._loop, headers=headers) as client:
doc = await self._fetch_doc(client, url)
consumer = asyncio.ensure_future(self._consumer(client))
await self._producer(doc)
await self._queue.join()
consumer.cancel()
def __del__(self):
if self._loop:
if not self._loop.is_running:
self._loop.close()
if __name__ == '__main__':
Checker().check(sys.argv[1])
1 Answer 1
This question is about elapsed timings. Please update it to include cProfile observations or similar wallclock figures.
Reported throughput suggests that processing each URL takes ~ 660 msec wallclock time. It would be helpful to note how many milliseconds of that is local CPU busy time. It would be interesting to understand how far away the 95th percentile response time is from the median.
It is unclear if the google end server, or the e2e internet pipe, could support higher request rates, and the question does not include ab or similar benchmark figures.
style nit:
class Logger(object):
Prefer class Logger:
.
Yes, we know it's inheriting from object
.
In python2 this used to make a difference to the
MRO,
but ever since
sunsetting
it's not very relevant.
The source code looks good, with no obvious flaws.
Ship it.
async with aiohttp.ClientSession
refer to them as self.xxx - would that give you a speed up or break the code? I'm out at the moment and unable to try it myself at this moment. \$\endgroup\$