2
\$\begingroup\$
def getProxiesLstFromDB():
 global g_proxies
 # insert to status page links table
 proxies_list = g_db["proxies_list"]
 proxies_list_records = proxies_list.find()
 print_to_log("get proxies list from db")
 for proxies_list_record in proxies_list_records:
 if proxies_list_record is not None:
 if len(proxies_list_record["user"]) == 0:
 proxies_list_record["user"] = "user"
 if len(proxies_list_record["password"]) == 0:
 proxies_list_record["password"] = "password"
 g_proxies.append(proxies_list_record)
def get_ProxyInfo(proxy_no):
 global g_proxies
 if len(g_proxies) == 0:
 return ""
 proxy_no = proxy_no % len(g_proxies)
 proxy_info = "%s:%s@%s:%s" % (g_proxies[proxy_no]["user"], g_proxies[proxy_no] ["password"], g_proxies[proxy_no]["ProxyIP"], g_proxies[proxy_no]["ProxyPort"])
 return proxy_info
def readHtml_using_proxy(one_url, proxy_no):
 global g_proxies
 proxy = None
 page_text = ""
 try:
 proxy_info = "%s:%s@%s:%s" % (g_proxies[proxy_no]["user"], g_proxies[proxy_no]["password"], g_proxies[proxy_no]["ProxyIP"], g_proxies[proxy_no]["ProxyPort"])
 proxy = urllib2.ProxyHandler({"http": proxy_info,
 "https":proxy_info})
 except:
 print_to_log("Proxy setting error. proxy_no: %d" % proxy_no)
 return page_text
 try:
 opener = urllib2.build_opener(proxy)
 req = urllib2.Request(one_url)
 r = opener.open(req)
 html = r.read()
 converter = html2text.HTML2Text()
 converter.ignore_links = True
 page_text = ""
 page_text = html.decode("utf8", "ignore")
 page_text = converter.handle(page_text)
 except:
 print_to_log("Unable to read or decode page html from %s" % one_url)
 return page_text
def scrape_one_url(db, one_url, domain_url, domain_url_idx, threadname):
 scrape_results = []
 threadnamelen = 13
 threadname = "(%s)" % threadname
 if len(threadname) < threadnamelen:
 idx = 0
 lenOfthreadname = len(threadname)
 while idx < threadnamelen - lenOfthreadname:
 idx += 1
 threadname = threadname + " "
 one_url = urljoin(domain_url, one_url)
 if "google.com" in one_url:
 #skip for google.com
 return
 if is_debug:
 print_to_log( "%s>>> processing domain_idx: %s, url: %s" % (threadname, domain_url_idx, one_url) )
 else:
 print_to_log( "%s>>> processing %s" % (threadname, one_url) )
 #get page content from url
 br = mechanize.Browser()
 br.set_handle_equiv(True)
 br.set_handle_redirect(True)
 br.set_handle_referer(True)
 br.set_handle_robots(False)
 try:
 response = br.open(one_url)
 page_type = response.info()["Content-Type"]
 if is_debug:
 print response.info()["Content-Type"]
 if "image" in page_type:
 return
 page_html = response.read()
 except:
 return
 converter = html2text.HTML2Text()
 converter.ignore_links = True
 try:
 page_text = page_html.decode("utf8", "ignore")
 page_text = converter.handle(page_text)
 page_text = re.sub("[^a-zA-Z0-9_!]+[ ,.\?!]", "", page_text)
 except:
 return
 #print page_text
 ###########################
 Site_Cate1 = ""
 Site_Cate2 = ""
 Site_Cate3 = ""
 Google_Page_Rank = "0"
 Twitter_share_cnt = ""
 Facebook_share_cnt = ""
 MAX_LIMIT_CNT_TO_INSERT = 100
 #1. Site Categorization
 xml = """<?xml version="1.0" encoding="utf-8" ?>
 <uclassify xmlns="http://api.uclassify.com/1/RequestSchema" version="1.01">
 <texts>
 <textBase64 id="TextId">%s</textBase64>
 </texts>
 <readCalls readApiKey="secret">
 <classify id="Clas" username="uClas" classifierName="Topic" textId="TextId"/>
 </readCalls>
 </uclassify>"""
 xml = xml % base64.b64encode(page_text.encode("utf8", "ignore"))
 headers = {'Content-Type': 'text/xml; charset=utf-8'} # set what your server accepts
 response = requests.post('http://api.uclas.com', data=xml, headers=headers).text
 res_xml = BeautifulSoup(response)
 Cate_lst = {}
 try:
 for one_class in res_xml.findAll("class"):
 Cate_lst[one_class.attrs["classname"]] = float(one_class.attrs["p"])
 except:
 pass
 idx = 1
 for key, value in sorted(Cate_lst.iteritems(), key=lambda (k,v): (v,k), reverse=True):
 if idx == 1:
 Site_Cate1 = key
 elif idx == 2:
 Site_Cate2 = key
 elif idx == 3:
 Site_Cate3 = key
 break
 idx += 1
 global g_proxy_counter
 g_queueLock.acquire()
 g_proxy_counter += 1
 g_queueLock.release()
 proxy_info = get_ProxyInfo(g_proxy_counter)
 try:
 if len(proxy_info) > 0:
 Google_Page_Rank = rank_provider.GooglePageRank().get_rank_using_proxy(one_url, proxy_info)
 if Google_Page_Rank is None:
 Google_Page_Rank = "0"
 else:
 Google_Page_Rank = str(Google_Page_Rank)
 except:
 Google_Page_Rank = "0"
 if is_debug:
 print_to_log( "%sGoogle Page Rank: %s" % (threadname, Google_Page_Rank) )
 #8. Social Share Counts
 one_url_tw = "http://cdn.api.twitter.com/1/urls/count.json?url=%s" % one_url
 try:
 response_tw = br.open(one_url_tw)
 page_html_tw = response_tw.read()
 j = json.loads(page_html_tw)
 Twitter_share_cnt = j['count']
 except:
 pass
 one_url_fb = "http://graph.facebook.com/?id=%s" % one_url
 try:
 response_fb = br.open(one_url_fb)
 page_html_fb = response_fb.read()
 j = json.loads(page_html_fb)
 Facebook_share_cnt = j['shares']
 except:
 pass
 #5. Total Backlinks
 t_soup = BeautifulSoup(page_html)
 t_as = t_soup.findAll('a')
 Total_Incoming_links = 0
 Total_Outgoing_links = 0
 links_idx = 0
 records = []
 status_records = []
 for t_a in t_as:
 if not t_a.has_attr('href') or (not t_a['href'].startswith("http") and not t_a['href'].startswith("/")):
 continue
 #/#comment, /#readmore, /#respond
 if ("/#comment" in t_a['href']) or ("/#readmore" in t_a['href']) or ("/#respond" in t_a['href']):
 continue
 domain_url_www = domain_url.replace('https://','https://www.').replace('http://','http://www.')
 if t_a['href'].startswith(domain_url):
 t_a['href'] = t_a['href'][len(domain_url):]
 elif t_a['href'].startswith(domain_url_www):
 t_a['href'] = t_a['href'][len(domain_url_www):]
 InOrOutLink = ""
 if t_a['href'].startswith("http"):
 Total_Outgoing_links += 1
 InOrOutLink = "Outgoing"
 else:
 Total_Incoming_links += 1
 InOrOutLink = "Incoming"
 #################################
 # insert to status page links table
 status_page_links = db["status_page_links"]
 status_page_links_record = status_page_links.find_one({"DomainURL": domain_url, "DomainURLIDX": domain_url_idx, "Link": t_a['href']})
 if status_page_links_record is None:
 status_record = {
 "DomainURL": domain_url,
 "DomainURLIDX": domain_url_idx,
 "Link": t_a['href'],
 "Status": 0, #0: not processed, 1: processed
 "date": datetime.datetime.utcnow()}
 status_page_links_id = status_page_links.insert(status_record)
 scrape_results.append(status_record)
 links_idx += 1
 ###################################
 # insert to db
 record = {
 "DomainURL": domain_url,
 "DomainURLIDX": domain_url_idx,
 "PageURL": one_url,
 "Link": t_a['href'],
 "InOrOutLink": InOrOutLink,
 "date": datetime.datetime.utcnow()}
 records.append(record)
 if links_idx % MAX_LIMIT_CNT_TO_INSERT == 0:
 page_link_info = db["page_link_info"]
 page_link_info_ids = page_link_info.insert(records)
 records = []
 if is_debug:
 print_to_log( "%sinserted %s links" % (threadname, links_idx) )
 ####################################
 #calculate total count of incoming / outcoming links
 if links_idx % MAX_LIMIT_CNT_TO_INSERT > 0:
 page_link_info = db["page_link_info"]
 page_link_info_ids = page_link_info.insert(records)
 records = []
 #2. Keyword Relevance, 3. Key word Sentiment Value
 extractor = extract.TermExtractor()
 extractor.filter = extract.DefaultFilter(noLimitStrength=2)
 kwds = sorted( extractor(page_text) )
 total_cntOfWds = len([word for word in page_text.split() if word.isalnum()])
 KeywordsCnt = 0
 xml_sentiment_templ = """<?xml version="1.0" encoding="utf-8" ?>
<uclassify xmlns="http://api.uclassify.com/1/RequestSchema" version="1.01">
<texts>
 <textBase64 id="tweet1">%s</textBase64>
</texts>
<readCalls readApiKey="secret">
<classifyKeywords id="ClassifyKeywords" username="uClassify" classifierName="Sentiment" textId="tweet1"/>
</readCalls>
</uclassify>"""
 records = []
 for kwd in kwds:
 #break # for test
 (one_word, occurences, cntOfWd) = kwd
 if re.search('[a-zA-Z]+',one_word) == None:
 pass
 elif len(one_word) < 3:
 pass
 elif cntOfWd > 3:
 pass
 else:
 Keyword_Density = float(occurences) * cntOfWd / total_cntOfWds
 Keyword_Density = "%.6f" % round(float(Keyword_Density),6)
 try:
 one_word = one_word.decode("utf8", "ignore")
 xml_sentiment = xml_sentiment_templ % base64.b64encode(one_word.encode("utf8", "ignore"))
 except:
 continue
 KS_positive = 0
 KS_negative = 0
 KS_type = "neutral"
 #check if kw exists in db already
 KeywordsCnt += 1
 ###################################
 # insert to db
 record = {
 "DomainURL": domain_url,
 "DomainURLIDX": domain_url_idx,
 "PageURL": one_url,
 "Keyword": one_word,
 "KWSentiment": KS_type,
 "KWSPositive": KS_positive,
 "KWSNegative": KS_negative,
 "KWDensity": Keyword_Density,
 "KWOccurences": occurences,
 "KWCntOfWords": cntOfWd,
 "date": datetime.datetime.utcnow()}
 records.append(record)
 if KeywordsCnt % MAX_LIMIT_CNT_TO_INSERT == 0:
 page_kw_info = db["page_kw_info"]
 page_kw_info_ids = page_kw_info.insert(records)
 records = []
 ####################################
 if KeywordsCnt % MAX_LIMIT_CNT_TO_INSERT > 0:
 page_kw_info = db["page_kw_info"]
 page_kw_info_ids = page_kw_info.insert(records)
 records = []
 ####################################
 #6. Kewyword Density
 # already got
 #7. Domain Age
 #8. Calculate Points
 Points = 0
 try:
 int_Google_Page_Rank = int(Google_Page_Rank)
 except:
 int_Google_Page_Rank = 0
 try:
 int_Total_Incoming_links = int(Total_Incoming_links)
 except:
 int_Total_Incoming_links = 0
 try:
 int_Total_Outgoing_links = int(Total_Outgoing_links)
 except:
 int_Total_Outgoing_links = 0
 try:
 int_Twitter_share_cnt = int(Twitter_share_cnt)
 except:
 int_Twitter_share_cnt = 0
 try:
 int_Facebook_share_cnt = int(Facebook_share_cnt)
 except:
 int_Facebook_share_cnt = 0
 try:
 Points = int_Google_Page_Rank*10 + int_Total_Incoming_links/10 - int_Total_Outgoing_links/5 + ( int_Twitter_share_cnt + int_Facebook_share_cnt )/15
 except:
 Points = 0
 ###################################
 # insert to db
 record = {
 "DomainURL": domain_url,
 "DomainURLIDX": domain_url_idx,
 "PageURL": one_url,
 "SiteCate1": Site_Cate1,
 "SiteCate2": Site_Cate2,
 "SiteCate3": Site_Cate3,
 "GooglePageRank": Google_Page_Rank,
 "FacebookShareCnt": Facebook_share_cnt,
 "TwitterShareCnt": Twitter_share_cnt,
 "TotalBacklinks": Total_Incoming_links + Total_Outgoing_links,
 "TotalIncomingLinksCnt": Total_Incoming_links,
 "TotalOutgoingLinksCnt": Total_Outgoing_links,
 "TotalWordsCnt": total_cntOfWds,
 "TotalKeywordsCnt": KeywordsCnt,
 "Points": Points,
 "date": datetime.datetime.utcnow()}
 page_main_info = db["page_main_info"]
 update_status = page_main_info.update({"PageURL":one_url}, record, upsert=True)
 return scrape_results
def scrape_one_domain(domain_url, domain_url_idx, threadname):
 global PAGES_CRAWLING_THREADS
 one_url = "/"
 client = MongoClient('secret', 2422)
 db = client['site_analysis']
 # insert to status page links table
 status_page_links = db["status_page_links"]
 status_page_links_record = status_page_links.find_one({"DomainURL": domain_url, "DomainURLIDX": domain_url_idx, "Link": one_url})
 if status_page_links_record is None:
 status_record = {
 "DomainURL": domain_url,
 "DomainURLIDX": domain_url_idx,
 "Link": one_url,
 "Status": 0, #0: not processed, 1: processed
 "date": datetime.datetime.utcnow()}
 status_page_links_id = status_page_links.insert(status_record)
 #process not processed links in separate threads
def process_link_worker(queue, db, domain_url, domain_url_idx, threadname, threadsubname, threads_signals):
 global PAGES_CRAWLING_THREADS
 while True:
 try:
 status_page_links_record = queue.get_nowait()
 except Queue.Empty:
 threads_signals.add(threadsubname)
 if len(threads_signals) >= PAGES_CRAWLING_THREADS * 10: # if no job left
 return
 else:
 time.sleep(1)
 continue
 threads_signals.discard(threadsubname)
 one_url = status_page_links_record["Link"]
 scrape_results = scrape_one_url(db, one_url, domain_url, domain_url_idx, threadname)
 for scrape_result in scrape_results:
 queue.put(scrape_result)
 db["status_page_links"].update({'_id': ObjectId(status_page_links_record["_id"])},
 {'$set': {"Status": 1, "date": datetime.datetime.utcnow()}})
status_page_links = db["status_page_links"]
status_page_links_records = list(status_page_links.find({"DomainURL": domain_url, "DomainURLIDX": domain_url_idx, "Status": 0}))
if status_page_links_records:
 links_queue = Queue.Queue()
 for status_page_links_record in status_page_links_records:
 links_queue.put(status_page_links_record)
 link_processing_threads = []
 threads_signals = set()
 for i in range(PAGES_CRAWLING_THREADS): #start 2 threads
 thread = threading.Thread(target=process_link_worker, args=(links_queue,db,domain_url,domain_url_idx,
 threadname,'%s_%s' % (threadname, i+1),
 threads_signals))
 thread.start()
 link_processing_threads.append(thread)
 #wait for threads return
 for thread in link_processing_threads:
 thread.join()
exitFlag = 0
class myThread (threading.Thread):
 def __init__(self, threadID, name, q):
 threading.Thread.__init__(self)
 self.threadID = threadID
 self.name = name
 self.q = q
 def run(self):
 global g_queueLock
 global g_workQueue
 global g_workingEntry
 global g_db
 print_to_log( "Starting " + self.name )
 while not exitFlag:
 g_queueLock.acquire()
 if not g_workQueue.empty():
 data = g_workQueue.get()
 domain_url = data[0]
 domain_url_idx = data[1]
 g_workingEntry.append(domain_url)
 g_queueLock.release()
 scrape_one_domain(domain_url, domain_url_idx, self.name)
 #updated domain status in db
 g_queueLock.acquire()
 g_workingEntry.remove(domain_url)
 status_domain = g_db["status_domain"]
 status_domain.update({"DomainURL": domain_url.strip(), "DomainURLIDX": domain_url_idx,"Status": 0},
 {'$set': {"Status": 1, "date": datetime.datetime.utcnow()}})
 g_queueLock.release()
 else:
 g_queueLock.release()
 time.sleep(1)
 print_to_log( "Exiting " + self.name )
def main():
 global g_threads
 global g_workQueue
 global g_db
 global g_workingEntry
 global g_queueLock
 if len(sys.argv) < 2:
 print_to_log("================Crawler newly started==============")
 print_to_log("you did not give any arguments.")
 threadCnt = 10
 else:
 print_to_log("================Crawler newly started==============")
 threadCnt = int(sys.argv[1])
 getProxiesLstFromDB()
 print_to_log("Threads Count: %d" % threadCnt)
 # Create new threads
 threadID = 1
 while threadID <= threadCnt:
 threadName = "Thread %d" % threadID
 thread = myThread(threadID, threadName, g_workQueue)
 thread.start()
 g_threads.append(thread)
 threadID += 1
 is_first = True
 while True:
 # insert to status page links table
 status_domain = g_db["status_domain"]
 status_domain_records_cnt = status_domain.find({"Status": 0}).count()
 if (is_first and status_domain_records_cnt == 0) or (is_first == False):
 #read domains
 domains_to_crawl = g_db["domains_to_crawl"]
 domains_to_crawl_records = domains_to_crawl.find()
 for domains_to_crawl_record in domains_to_crawl_records:
 max_status_domain_record = status_domain.find_one(sort=[("DomainURLIDX", -1)])
 max_DomainURLIDX = 0
 if max_status_domain_record is not None:
 max_DomainURLIDX = max_status_domain_record["DomainURLIDX"]
 status_domain_record = {
 "DomainURL": domains_to_crawl_record["DomainURL"],
 "DomainURLIDX": max_DomainURLIDX+1,
 "Status": 0, #0: not processed, 1: processed
 "date": datetime.datetime.utcnow()}
 try:
 index = g_workingEntry.index(domains_to_crawl_record["DomainURL"])
 except:
 update_status = status_domain.update({"DomainURL": domains_to_crawl_record["DomainURL"].strip()}, status_domain_record, upsert=True)
 print_to_log("domains_to_crawl table content: %s" % domains_to_crawl_record["DomainURL"] )
 status_domain_records = status_domain.find({"Status": 0}).sort("DomainURLIDX", 1)
 # Fill the queue
 g_queueLock.acquire()
 while not g_workQueue.empty():
 data = g_workQueue.get()
 for record in status_domain_records:
 try:
 index = g_workingEntry.index(record["DomainURL"])
 except:
 #not exist the url in working list
 g_workQueue.put([record["DomainURL"], record["DomainURLIDX"]])
 g_queueLock.release()
 # Wait for queue to empty
 while not g_workQueue.empty():
 pass
 is_first = False
 time.sleep(1)
 # Notify threads it's time to exit
 exitFlag = 1
 # Wait for all threads to complete
 for t in g_threads:
 t.join()
 print_to_log( "Exiting Main Thread" )
200_success
145k22 gold badges190 silver badges478 bronze badges
asked Jan 28, 2014 at 20:41
\$\endgroup\$
3
  • 6
    \$\begingroup\$ Could you give some explanation/background to this? Why it was written maybe. What times you are seeing for the amount of data you're processing. etc. etc. \$\endgroup\$ Commented Jan 29, 2014 at 0:21
  • 1
    \$\begingroup\$ I agree with @JamesKhoury. You really need to profile this code to show us what is slow. A simple way on Linux is to use 'time python module.py' so that you know how much time you spend waiting for the API to answer (eg. if the 'cpu' percentage is low). Then you can use Python's profiler to highlight the slow parts. \$\endgroup\$ Commented Feb 14, 2014 at 12:34
  • \$\begingroup\$ Certainly need more detail to point anyone wanting to help in the right direction. That is alot of code there with very little explanation. \$\endgroup\$ Commented Apr 23, 2014 at 3:55

1 Answer 1

2
\$\begingroup\$

This is just one bit that jumped out at me:

 page_text = page_html.decode("utf8", "ignore")
 page_text = converter.handle(page_text)
 page_text = re.sub("[^a-zA-Z0-9_!]+[ ,.\?!]", "", page_text)

It seems you are converting as utf8, ignoring errors, and then forcing it into ascii with a regex. Why not just decode as ascii and ignore or replace the errors.

There are certainly many other optimizations in a chunk of code that size. You really are doing alot in that chunk of code and there are so many ways to improve on it.

Another observation is the page_html.split() to glean the words out of the document. You have used BeautifulSoup else where in the doc why not use that to get the words. Preferably all in one global parse pass.

Further, you then turn around and submit those words each to an API. It doesn't look like there is any thought that the same word might return the same result each time and as such the results could be stored in some other way so as to avoid needing to request them for each word on each page, which surely will lead to many many redundant queries against the classification.

There is really too much to cover in a single response here.

answered Apr 23, 2014 at 3:54
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.