爬虫相关

基于Tornado的异步爬虫

#!/usr/bin/env python
# -*- coding:utf-8 -*-
importtime
importlogging
fromdatetimeimport timedelta
fromtornadoimport httpclient, gen, ioloop, queues
importtraceback
frombs4import BeautifulSoup
deflogged(class_):
 logging.basicConfig(level=logging.INFO)
 class_.logger = logging.getLogger(class_.__name__)
 return class_
@logged
classAsyncSpider(object):
"""A simple class of asynchronous spider."""
 def__init__(self, urls, concurrency=10, results=None, **kwargs):
 super(AsyncSpider, self).__init__(**kwargs)
 self.concurrency = concurrency
 self._q = queues.Queue()
 self._fetching = set()
 self._fetched = set()
 if results is None:
 self.results = []
 for url in urls:
 self._q.put(url)
 httpclient.AsyncHTTPClient.configure(
 "tornado.curl_httpclient.CurlAsyncHTTPClient"
 )
 deffetch(self, url, **kwargs):
 fetch = getattr(httpclient.AsyncHTTPClient(), 'fetch')
 http_request = httpclient.HTTPRequest(url, **kwargs)
 return fetch(http_request, raise_error=False)
 defhandle_html(self, url, html):
"""处理html页面"""
 print(url)
 defhandle_response(self, url, response):
"""处理http响应,对于200响应码直接处理html页面,
 否则按照需求处理不同响应码"""
 if response.code == 200:
 self.handle_html(url, response.body)
 elif response.code == 599: # retry
 self._fetching.remove(url)
 self._q.put(url)
 @gen.coroutine
 defget_page(self, url):
 # yield gen.sleep(10) # sleep when need
 try:
 response = yield self.fetch(url)
 self.logger.debug('######fetched %s' % url)
 except Exception as e:
 self.logger.debug('Exception: %s%s' % (e, url))
 raise gen.Return(e)
 raise gen.Return(response) # py3 can just return response
 @gen.coroutine
 def_run(self):
 @gen.coroutine
 deffetch_url():
 current_url = yield self._q.get()
 try:
 if current_url in self._fetching:
 return
 self.logger.debug('fetching****** %s' % current_url)
 self._fetching.add(current_url)
 response = yield self.get_page(current_url)
 self.handle_response(current_url, response) # handle reponse
 self._fetched.add(current_url)
 finally:
 self._q.task_done()
 @gen.coroutine
 defworker():
 while True:
 yield fetch_url()
 # Start workers, then wait for the work queue to be empty.
 for _ in range(self.concurrency):
 worker()
 yield self._q.join(timeout=timedelta(seconds=300000))
 try:
 assert self._fetching == self._fetched
 except AssertionError: # some http error not handle
 print(self._fetching-self._fetched)
 print(self._fetched-self._fetching)
 defrun(self):
 io_loop = ioloop.IOLoop.current()
 io_loop.run_sync(self._run)
classMySpider(AsyncSpider):
 deffetch(self, url, **kwargs):
"""重写父类fetch方法可以添加cookies,headers,timeout等信息"""
 cookies_str = "PHPSESSID=j1tt66a829idnms56ppb70jri4; pspt=%7B%22id%22%3A%2233153%22%2C%22pswd%22%3A%228835d2c1351d221b4ab016fbf9e8253f%22%2C%22_code%22%3A%22f779dcd011f4e2581c716d1e1b945861%22%7D; key=%E9%87%8D%E5%BA%86%E5%95%84%E6%9C%A8%E9%B8%9F%E7%BD%91%E7%BB%9C%E7%A7%91%E6%8A%80%E6%9C%89%E9%99%90%E5%85%AC%E5%8F%B8; think_language=zh-cn; SERVERID=a66d7d08fa1c8b2e37dbdc6ffff82d9e|1444973193|1444967835; CNZZDATA1254842228=1433864393-ひく1442810831-%7C1444972138"
 headers = {
 'User-Agent': 'mozilla/5.0 (compatible; baiduspider/2.0; +http://www.baidu.com/search/spider.html)',
 'cookie': cookies_str
 }
 return super(MySpider, self).fetch(
 url, headers=headers,
 #proxy_host="127.0.0.1", proxy_port=8787, # for proxy
 )
 defhandle_html(self, url, html):
 print(url)
 #print(BeautifulSoup(html, 'lxml').find('title'))
defmain():
 st = time.time()
 urls = []
 n = 1000
 for page in range(1, n):
 urls.append('http://www.jb51.net/article/%s.htm' % page)
 s = MySpider(urls, 10)
 s.run()
 print(time.time()-st)
 print(60.0/(time.time()-st)*1000, 'per minute')
 print(60.0/(time.time()-st)*1000/60.0, 'per second')
if __name__ == '__main__':
 main()

写爬虫会遇到的一些工具函数

#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
chrome有个功能,对于请求可以直接右键copy as curl,然后在命令行里边用curl
模拟发送请求。现在需要把此curl字符串处理成requests库可以传入的参数格式,
http://stackoverflow.com/questions/23118249/whats-the-difference-between-request-payload-vs-form-data-as-seen-in-chrome
"""
importre
fromfunctoolsimport wraps
importtraceback
importrequests
defencode_to_dict(encoded_str):
""" 将encode后的数据拆成dict
 >>> encode_to_dict('name=foo')
 {'name': foo'}
 >>> encode_to_dict('name=foo&val=bar')
 {'name': 'foo', 'val': 'var'}
 """
 pair_list = encoded_str.split('&')
 d = {}
 for pair in pair_list:
 if pair:
 key = pair.split('=')[0]
 val = pair.split('=')[1]
 d[key] = val
 return d
defparse_curl_str(s):
"""convert chrome curl string to url, headers_dict and data"""
 pat = re.compile("'(.*?)'")
 str_list = [i.strip() for i in re.split(pat, s)] # 拆分curl请求字符串
 url = ''
 headers_dict = {}
 data = ''
 for i in range(0, len(str_list)-1, 2):
 arg = str_list[i]
 string = str_list[i+1]
 if arg.startswith('curl'):
 url = string
 elif arg.startswith('-H'):
 header_key = string.split(':', 1)[0].strip()
 header_val = string.split(':', 1)[1].strip()
 headers_dict[header_key] = header_val
 elif arg.startswith('--data'):
 data = string
 return url, headers_dict, data
defretry(retries=3):
"""一个失败请求重试,或者使用下边这个功能强大的retrying
 pip install retrying
 https://github.com/rholder/retrying
 :param retries: number int of retry times.
 """
 def_retry(func):
 @wraps(func)
 def_wrapper(*args, **kwargs):
 index = 0
 while index < retries:
 index += 1
 try:
 response = func(*args, **kwargs)
 if response.status_code == 404:
 print(404)
 break
 elif response.status_code != 200:
 print(response.status_code)
 continue
 else:
 break
 except Exception as e:
 traceback.print_exc()
 response = None
 return response
 return _wrapper
 return _retry
_get = requests.get
@retry(5)
defget(*args, **kwds):
 if 'timeout' not in kwds:
 kwds['timeout'] = 10
 if 'headers' not in kwds:
 headers = {
 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36',
 }
 kwds['headers'] = headers
 return _get(*args, **kwds)
requests.get = get
defretry_get_html(*args, **kwargs):
 try:
 return get(*args, **kwargs).content
 except AttributeError:
 return ''
deflazy_property(fn):
 attr_name = '_lazy_' + fn.__name__
 @property
 def_lazy_property(self):
 if not hasattr(self, attr_name):
 setattr(self, attr_name, fn(self))
 return getattr(self, attr_name)
 return _lazy_property
defmy_ip():
 # url = 'https://api.ipify.org?format=json'
 url = 'http://httpbin.org/ip'
 return requests.get(url).text
defform_data_to_dict(s):
"""form_data_to_dict s是从chrome里边复制得到的form-data表单里的字符串,
 注意*必须*用原始字符串r""
 :param s: form-data string
 """
 arg_list = [line.strip() for line in s.split('\n')]
 d = {}
 for i in arg_list:
 if i:
 k = i.split(':', 1)[0].strip()
 v = ''.join(i.split(':', 1)[1:]).strip()
 d[k] = v
 return d
if __name__ == '__main__':
 importsys
 frompprintimport pprint
 try:
 curl_str = sys.argv[1] # 用三引号括起来作为参数
 url, headers_dict, data = parse_curl_str(curl_str)
 print(url)
 pprint(headers_dict)
 print(data)
 except IndexError:
 exit(0)

如何使用代理

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# requests proxy demo
importrequests
# install lantern first, 这是使用lantern的代理地址
proxies = {
 "http": "http://127.0.0.1:8787",
 "https": "http://127.0.0.1:8787",
}
url = 'http://httpbin.org/ip'
r = requests.get(url, proxies=proxies)
print(r.text)
# requests from version 2.10.0 support socks proxy
# pip install -U requests[socks]
proxies = {'http': "socks5://myproxy:9191"}
requests.get('http://example.org', proxies=proxies)
# tornado proxy demo
# sudo apt-get install libcurl-dev librtmp-dev
# pip install tornado pycurl
fromtornadoimport httpclient, ioloop
config = {
 'proxy_host': 'YOUR_PROXY_HOSTNAME_OR_IP_ADDRESS',
 'proxy_port': 3128
}
httpclient.AsyncHTTPClient.configure(
 "tornado.curl_httpclient.CurlAsyncHTTPClient")
defhandle_request(response):
 if response.error:
 print "Error:", response.error
 else:
 print response.body
 ioloop.IOLoop.instance().stop()
http_client = httpclient.AsyncHTTPClient()
http_client.fetch("http://twitter.com/",
 handle_request, **config)
ioloop.IOLoop.instance().start()

使用线程池

#!/usr/bin/env python
# -*- coding:utf-8 -*-
importconcurrent.futures
importbs4
importrequests
classThreadPoolCrawler(object):
 def__init__(self, urls, concurrency=10, **kwargs):
 self.urls = urls
 self.concurrency = concurrency
 self.results = []
 defhandle_response(self, url, response):
 pass
 defget(self, *args, **kwargs):
 return requests.get(*args, **kwargs)
 defrun(self):
 with concurrent.futures.ThreadPoolExecutor(max_workers=self.concurrency) as executor:
 future_to_url = {
 executor.submit(self.get, url): url for url in self.urls
 }
 for future in concurrent.futures.as_completed(future_to_url):
 url = future_to_url[future]
 try:
 response = future.result()
 except Exception as e:
 importtraceback
 traceback.print_exc()
 else:
 self.handle_response(url, response)
classTestCrawler(ThreadPoolCrawler):
 defhandle_response(self, url, response):
 soup = bs4.BeautifulSoup(response.text, 'lxml')
 title = soup.find('title')
 self.results.append({url: title})
defmain():
 importtime
 urls = ['http://localhost:8000'] * 300
 for nums in [2, 5, 10, 15, 20, 50, 70, 100]:
 beg = time.time()
 s = TestCrawler(urls, nums)
 s.run()
 print(nums, time.time()-beg)
if __name__ == '__main__':
 main()

使用tor代理ip

#!/usr/bin/env python
# -*- coding:utf-8 -*-
 # http://ningning.today/2016/03/07/python/python-requests-tor-crawler/
importos
importrequests
importrequesocks
#url = 'https://api.ipify.org?format=json'
url = 'http://httpbin.org/ip'
defgetip_requests(url):
 print "(+) Sending request with plain requests..."
 r = requests.get(url)
 print "(+) IP is: " + r.text.replace("\n", "")
defgetip_requesocks(url):
 print "(+) Sending request with requesocks..."
 session = requesocks.session()
 session.proxies = {'http': 'socks5://127.0.0.1:9050',
 'https': 'socks5://127.0.0.1:9050'}
 r = session.get(url)
 print "(+) IP is: " + r.text.replace("\n", "")
deftor_requests():
 proxies = {
 'http': 'socks5://127.0.0.1:9050',
 'https': 'socks5://127.0.0.1:9050',
 }
 r = requests.get(url, proxies=proxies)
 print r.text
defmain():
 print "Running tests..."
 getip_requests(url)
 getip_requesocks(url)
 os.system("""(echo authenticate '"yourpassword"'; echo signal newnym; echo quit) | nc localhost 9051""")
 getip_requesocks(url)
if __name__ == "__main__":
 main()
 #tor_requests()