Here is a simple script that I am using to ping 50 sites at a time and check if they are up or not. If not, save the down time with error in MongoDB.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from urllib.request import urlopen, Request
from threading import Thread
from time import sleep, time
import datetime
import queue
import pymongo
''' A Simple script to ping multiple sites at a time
and capture the down sites
'''
__author__ = "Aamir khan"
__version__ = 1.1
_MAX_CONNECTIONS = 50
counter = 0
downsites = []
now = datetime.datetime.utcnow # time stamp
# DO NOT ON BOTH AT THE SAME TIME
_DEBUG = False
_MONITOR = True
def getcollection(db='websites', colname='website_urls'):
return pymongo.MongoClient().get_database(db).get_collection(colname)
# to save downsites in db
ds = getcollection(colname="downsites")
# fetch urls from db
if _DEBUG:
print("Fetching Urls")
urls = getcollection().find()
print("%d Urls Fetched" % urls.count())
print("pulling urls to the queue")
q = queue.Queue(urls.count())
for url in urls:
url = url['url']
q.put(url)
print("pulled urls to the queue")
print("The size of the Queue is %d" % q.qsize())
else:
urls = getcollection().find()
q = queue.Queue(urls.count())
for url in urls:
url = url['url']
q.put(url)
del urls
def inc_counter():
global counter
# lock.acquire()
counter += 1
# lock.release()
def monitor():
total = q.qsize()
if total > 0:
while counter < total:
print("%d Request sent" % counter)
sleep(1)
print("Total {}/{} Request Sent".format(counter, total))
assert counter == total
def ping(uri):
req = Request(uri, headers={
"User-Agent": ("Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:51.0)"
" Gecko/20100101 Firefox/51.0")
})
req.get_method = lambda: 'HEAD'
try:
with urlopen(req) as r:
res = r.getcode(), uri, now()
except Exception as e:
res = str(e), uri, now()
finally:
if _DEBUG:
err, uri, last_check = res
print("Requesting = ", uri, "Request Method = ", req.get_method(),
"\nstatus = ", err, "time", last_check)
print("-----" * 10)
if _MONITOR:
inc_counter()
sleep(1)
sleep(0.5) # sleep a while to release the workload from cpu
return res
def process(url):
err, uri, last_check = ping(url)
if err != 200:
ds.insert_one({"Error": err.strip('<>'),
"url": uri, "last_checked": last_check})
def worker():
while True:
url = q.get()
if url is None:
break
process(url)
q.task_done()
if __name__ == '__main__':
workers = []
if _MONITOR:
Thread(target=monitor).start() # start monitoring reqest/sec
start_time = time()
for i in range(_MAX_CONNECTIONS):
t = Thread(target=worker)
t.start()
workers.append(t)
# block until all tasks are done
q.join()
# poision kill
for i in range(_MAX_CONNECTIONS):
q.put(None)
# wait for all the threads to join
for w in workers:
w.join()
if _MONITOR:
print("Time taken %f (sec)" % (time() - start_time))
Questions:
- Can I make use of better threading techniques?
- Can I eliminate the duplication of code while
_DEBUG
is on line 35 to 54? - I would love/prefer to see a functional version of this program.
- How can I improve the performance? (my target is to ping 1000000 sites under an hour)
1 Answer 1
The question is tagged python-3.x
; this answer assumes 3.2 or higher.
Threading
You can dramatically simplify the code by using a framework for concurrency and
queueing. Specifically, instead of spawning a fixed number of concurrent
threads and managing a queue, your code will be simpler by using an Executor
subclass from concurrent.futures
(see
https://docs.python.org/3/library/concurrent.futures.html)
Remove the worker function, the queue management logic (including the poison
kill), and create something like this in your if __name__ == '__main__'
block:
import concurrent.futures as futures
urls = [url['url'] for url in ds.getcollection().find()]
with futures.ThreadPoolExecutor(max_workers=32) as executor:
executor.map(process, urls)
Note the with
statement implicitly calls executor.shutdown(wait=True)
, so
the block is essentially synchronous. And once the iterable is drained, the
executor
manages clean up.
Functional
This pattern is inherently more functional, though I would not say it's "pure"
functional. The map()
method applies your function process
to each of the
items in the associated iterable, spreading the load across available threads
(or processes) in the pool.
DEBUG duplication
To address the _DEBUG
duplication issue (or generally any selective print()
output), in similar situations I use a logger:
log = logging.getLogger(__name__)
log.addHandler(logging.StreamHandler(sys.stdout)
if _DEBUG:
log.setLevel(logging.DEBUG)
else:
log.setLevel(logging.INFO)
Then, that section becomes:
log.debug("Fetching Urls")
urls = getcollection().find()
log.debug("%d Urls Fetched" % urls.count())
log.debug("pulling urls to the queue")
q = queue.Queue(urls.count())
for url in urls:
url = url['url']
q.put(url)
log.debug("pulled urls to the queue")
log.debug("The size of the Queue is %d" % q.qsize())
With no leading if _DEBUG:
. Note, though, that if you go with the executor, the queue goes away as well. Play with the levels that work for your needs.
Performance
Honestly, there is not a lot you'll be able to do here above what you are already doing - your main bottleneck is going to be I/O. For 1m sites in 1 hour, you should be averaging close to 280 requests a second. Play around with your concurrency (there is a tipping point where too many threads will actually slow things down) - on an 8 core machine w/ 64 workers, I was only hitting about 100 sites in 1.4 seconds.
Additional notes:
- Note that many sites won't return 200 for
HEAD
requests, so you might consider a list of "acceptable" codes, which will likely include 301 and 302 - You don't need the
sleep()
calls in the worker function.
-
\$\begingroup\$ That was quite helpful and i was considering 301 and 302 but then i test and found that urllib was returning 200 even when the original return was 301/302 \$\endgroup\$Mak– Mak2017年04月02日 14:47:14 +00:00Commented Apr 2, 2017 at 14:47
-
\$\begingroup\$ I was thinking that if i use multiprocessing to spawn 2 or 3 process in which i will start 50 threads each. any suggestion on this ? \$\endgroup\$Mak– Mak2017年04月02日 18:02:03 +00:00Commented Apr 2, 2017 at 18:02
-
\$\begingroup\$ Yes, that's a reasonable idea. The problem with threads in Python is that they are actually still serially executed because of the Global Interpreter Lock. Multiple processes will, however, execute concurrently because multiple interpreters are involved. The optimal approach will probably be one process per CPU core, then play with the number of threads in each process. The trickiest part of the solution is going to be the two tiers of delegation and IPC to divvy out the URLs - first to the processes, then to the threads. \$\endgroup\$bimsapi– bimsapi2017年04月04日 13:50:30 +00:00Commented Apr 4, 2017 at 13:50
-
\$\begingroup\$ yeah agreed! GIL is a big stone i may consider using some other implementations such as jython... till i am gonna with this.. thx again... \$\endgroup\$Mak– Mak2017年04月06日 11:29:00 +00:00Commented Apr 6, 2017 at 11:29
Explore related questions
See similar questions with these tags.