image.png
基于公司最近的业务,需要分析网络舆情,得到较为准确的信息,需要开发一款通用式爬虫,支持贴吧、微信、百度、域名、指定贴吧、指定关键字等抓取。本人故开发此项爬虫代码。
1.开发依赖环境 scrapy_redis+chrome+selenium(phantomjs也可以)
2.开发环境安装(详情见我的上一篇文章)
3.架构流程详解
获取需要抓取的关键字、域名、指定贴吧名字等建立一个mysql初始任务数据表,然后将这些数据插入到mysql中。详情见下图:
image.png
image.png
可以根据自己的需求进行配置。
2.爬虫启动过程中会去mysql拿下发任务进行抓取,crawled=0表示未抓取的任务,爬虫成功启动后会将crawled字段置为1表示任务开始抓取。当此任务结束时,爬虫在调用close_spider函数时,会将crawled状态更新为2,表示此项任务抓取结束。
st_status表示动静态开关切换,0表示静态、1表示动态。
crawltasktime:任务下发时间。
engine:1.表示微信搜狗引擎、2.表示百度引擎、3.表示贴吧 0.表示抓取域名。
keyword:表示抓取指定关键字。
domain: 表示抓取的指定域名。
depth:爬虫抓取深度限制。
width:爬虫抓取宽度限制。
accesspoint:CMWAP、CMNET爬取模式,多种模式中用“,”分隔,默认为CMNET。
totalpages:爬虫抓取页面数量限制。
CrawlFrequency:对任务爬取的频率 0表示爬取一次,1表示无线循环抓取。
cycletime:爬虫爬取时间限制,单位秒
repeattimes:爬虫请求失败重复请求次数设置。
interval:爬虫重复拨测时间间隔。
company:任务所属公司设置。
3.将抓取的文本、图片的数据存入mongodb或者mysql数据库。
4.进行数据分析提取相关数据。
4.scheduler 代码开发详解
因为我们所有的任务都是使用的同一款爬虫,所以爬虫在没有请求的时候,我们不让他继续等待。故修改scrapy_redis源码,让爬虫在没有请求的情况下,自动等待15秒后关闭爬虫。避免浪费内存资源,节省空间!首先找到scrapy_redis的scheduler.py这个文件夹。这个文件里面的代码主要作用就是收集指纹和处理请求,保证一个请求按一定(settings配置中的规则)的规则去消耗。
/usr/local/lib/python2.7/dist- packages/scrapy_redis/scheduler.py这是源代码的路径。多的不说,直接上图。
# coding=utf-8import redis import importlib import six import datetime import time from scrapy.utils.misc import load_object import os from . import connection, defaults# TODO: add SCRAPY_JOB support.class Scheduler(object): """Redis-based scheduler Settings -------- SCHEDULER_PERSIST : bool (default: False) Whether to persist or clear redis queue. SCHEDULER_FLUSH_ON_START : bool (default: False) Whether to flush redis queue on start. SCHEDULER_IDLE_BEFORE_CLOSE : int (default: 0) How many seconds to wait before closing if no message is received. SCHEDULER_QUEUE_KEY : str Scheduler redis key. SCHEDULER_QUEUE_CLASS : str Scheduler queue class. SCHEDULER_DUPEFILTER_KEY : str Scheduler dupefilter redis key. SCHEDULER_DUPEFILTER_CLASS : str Scheduler dupefilter class. SCHEDULER_SERIALIZER : str Scheduler serializer. """ #++++ lostGetRequest = 0 def __init__(self, server, persist=False, flush_on_start=False, queue_key=defaults.SCHEDULER_QUEUE_KEY, queue_cls=defaults.SCHEDULER_QUEUE_CLASS, dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY, dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS, idle_before_close=0, serializer=None): """Initialize scheduler. Parameters ---------- server : Redis The redis server instance. persist : bool Whether to flush requests when closing. Default is False. flush_on_start : bool Whether to flush requests on start. Default is False. queue_key : str Requests queue key. queue_cls : str Importable path to the queue class. dupefilter_key : str Duplicates filter key. dupefilter_cls : str Importable path to the dupefilter class. idle_before_close : int Timeout before giving up. """ if idle_before_close < 0: raise TypeError("idle_before_close cannot be negative") self.server = server self.persist = persist self.flush_on_start = flush_on_start self.queue_key = queue_key self.queue_cls = queue_cls self.dupefilter_cls = dupefilter_cls self.dupefilter_key = dupefilter_key self.idle_before_close = idle_before_close self.serializer = serializer self.stats = None def __len__(self): return len(self.queue) @classmethod def from_settings(cls, settings): kwargs = { 'persist': settings.getbool('SCHEDULER_PERSIST'), 'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'), 'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'), } # If these values are missing, it means we want to use the defaults. optional = { # TODO: Use custom prefixes for this settings to note that are # specific to scrapy-redis. 'queue_key': 'SCHEDULER_QUEUE_KEY', 'queue_cls': 'SCHEDULER_QUEUE_CLASS', 'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY', # We use the default setting name to keep compatibility. 'dupefilter_cls': 'DUPEFILTER_CLASS', 'serializer': 'SCHEDULER_SERIALIZER', } for name, setting_name in optional.items(): val = settings.get(setting_name) if val: kwargs[name] = val # Support serializer as a path to a module. if isinstance(kwargs.get('serializer'), six.string_types): kwargs['serializer'] = importlib.import_module(kwargs['serializer']) server = connection.from_settings(settings) # Ensure the connection is working. server.ping() return cls(server=server, **kwargs) @classmethod def from_crawler(cls, crawler): instance = cls.from_settings(crawler.settings) # FIXME: for now, stats are only supported from this constructor instance.stats = crawler.stats return instance def open(self, spider): self.spider = spider print 'spider.name is %s' %(spider.redis_key) try: self.queue = load_object(self.queue_cls)( server=self.server, spider=spider, #key=self.queue_key % {'spider': spider.name}, key=self.queue_key % {'spider': spider.host}, serializer=self.serializer, ) except TypeError as e: raise ValueError("Failed to instantiate queue class '%s': %s", self.queue_cls, e) try: self.df = load_object(self.dupefilter_cls)( server=self.server, #key=self.dupefilter_key % {'spider': spider.name}, key=self.dupefilter_key % {'spider': spider.host}, debug=spider.settings.getbool('DUPEFILTER_DEBUG'), ) except TypeError as e: raise ValueError("Failed to instantiate dupefilter class '%s': %s", self.dupefilter_cls, e) if self.flush_on_start: self.flush() # notice if there are requests already in the queue to resume the crawl if len(self.queue): spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue)) def close(self, reason): if not self.persist: self.flush() def flush(self): self.df.clear() self.queue.clear() def enqueue_request(self, request): if not request.dont_filter and self.df.request_seen(request): self.df.log(request, self.spider) return False if self.stats: self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider) self.queue.push(request) return True def next_request(self): block_pop_timeout = self.idle_before_close request = self.queue.pop(block_pop_timeout) if request and self.stats: self.lostGetRequest = 0 self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider) if request is None: self.lostGetRequest += 1 print "request is None, lostGetRequest = {%s}, time = {%s}" %(self.lostGetRequest,datetime.datetime.now()) if self.lostGetRequest >= 15: print "request is None, close spider." self.spider.crawler.engine.close_spider(self.spider, 'queue is empty') return request def has_pending_requests(self): return len(self) > 0
代码很简单,当spider被初始化时,同时会初始化一个对应的scheduler对象,这个调度器对象通过读取settings,配置好自己的调度容器queue和判重工具dupefilter。爬虫在运行过程过,会源源不断的通过引擎来和调度器协调工作,调度器将引擎分发的请求经过一系列的入队列、指纹收集等处理后,按settings中设置的爬取策略分发给下载中间件去下载。而这份代码我们主要设置了一个类变量用来进行计时。当next_request这个函数处理的请求不为空,我们就会重置他的状态为0,当请求为空,我们就要进行一个15秒的计时。当计时结束,引发异常调用close_spider将爬虫进行关闭。此代码还修改了scrapy_redis默认的dupfilter和request 两个key。
5.spider开发代码详解
# -*- coding: utf-8 -*-import scrapyfrom scrapy.http import Request, HtmlResponsefrom bs4 import BeautifulSoupimport timefrom urllib import urlencodeimport reimport sys reload(sys) sys.setdefaultencoding('utf-8')import redisfrom redistest.settings import REDIS_PORT, REDIS_HOSTimport hashlibimport randomimport osimport urllibdef soup_text(body): try: soup = BeautifulSoup(body, 'lxml') for script in soup(["script", "style"]): script.extract() line = re.compile(r'\s+') line = line.sub(r'', soup.body.getText()) #p2 = re.compile(u'[^\u4e00-\u9fa5]') # 中GDAC\u4e00\u9fa5 #str2 = p2.sub(r'', line) outStr = line.strip(',') except: outStr = '' return outStrdef rand5(): randnum = "" for i in range(0, 5): randnum += str(random.randint(0, 9)) return randnumclass BaiduSpider(scrapy.Spider): name = 'redistest' params = '' keyword = '' allowed_domains = [] sousu = '' start_urls = [] datadupefilter = '' filepath = '' def __init__(self, param=None, *args, **kwargs): super(BaiduSpider, self).__init__(*args, **kwargs) self.data_conn = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=8) self.yuqing = self.name+':item' self.datadupefilter = 'datadupefilter' self.params = eval(param) if self.params['keyword'] != '0': self.keyword = self.params['keyword'].decode('utf-8') print self.params self.sousu = self.params['engine'] self.filepath = "/home/YuQing/scrapy_yuqing/content" if not os.path.exists(self.filepath): os.mkdir(self.filepath) if self.sousu == 1: self.start_urls = ['http://weixin.sogou.com'] self.params['redis_key'] = None elif self.sousu == 2: self.start_urls = ['https://www.baidu.com'] self.params['redis_key'] = None elif self.sousu == 0: self.params['redis_key'] = None self.start_urls = ['https://tieba.baidu.com/f?ie=utf-8&kw=%s&fr=search&' % self.params['keyword']] else: if self.params['redis_key']: self.start_urls = [self.params['redis_key']] if self.params['crosssitecrawl'] == 0 and self.params['redis_key']: proto, rest = urllib.splittype(self.params['redis_key']) host, rest = urllib.splithost(rest) self.allowed_domains = [host.replace('www.','')] print self.start_urls,self.sousu,self.params['redis_key'],self.allowed_domains def make_requests_from_url(self, url): if self.params['st_status'] == 1: return Request(url, meta={'keyword': self.keyword, 'engine':self.sousu, 'phantomjs':True}) else: return Request(url) def parse(self, response): # 判断页面的返回状态 if int(response.status) >= 200 and int(response.status) < 400: if not self.params['redis_key'] and self.sousu: a_list = response.xpath('//h3/a/@href').extract() for url in a_list: if url.startswith('http://') != True and url.startswith('https://') !=True: url = response.urljoin(url) yield scrapy.Request(url=url, meta={'url':response.url}, callback=self.parse_url) if response.meta.has_key('page') != True and self.sousu == 2: flag = 1 for next_url in response.xpath('//div[@id="page"]/a/@href').extract(): if next_url.startswith('http://') != True and next_url.startswith('https://') !=True: nextUrl = self.start_urls[0] + next_url regex = 'pn=(\d+)' page_number = re.compile(regex).search(nextUrl).group(1) if page_number and flag: flag = 0 # 抓取前50页 for page in range(10,500,10): next_page = 'pn=' + str(page) old_page = re.compile(regex).search(nextUrl).group() nextUrl = nextUrl.replace(old_page, next_page) yield scrapy.Request(url=nextUrl, meta={'page':page}, callback=self.parse) if response.meta.has_key('page') != True and self.sousu == 1: flag = 1 for next_url in response.xpath('//div[@class="p-fy"]/a/@href').extract(): if next_url.startswith('http://') != True and next_url.startswith('https://') !=True: nextUrl = self.start_urls[0] + '/weixin' + next_url regex = 'page=(\d+)' page_number = re.compile(regex).search(nextUrl).group(1) if page_number and flag: flag = 0 for page in range(2,3): next_page = 'page=' + str(page) old_page = re.compile(regex).search(nextUrl).group() nextUrl = nextUrl.replace(old_page, next_page) yield scrapy.Request(url=nextUrl, meta={'page':page}, callback=self.parse) # 支持贴吧抓取 elif not self.params['redis_key'] and not self.sousu: for page in range(0, 200, 50): pn = urlencode({'pn':page}) next_page = response.url + pn yield scrapy.Request(next_page, callback=self.parse_tieba) else: self.parse_url(response) print response.url a_list = response.xpath('//a/@href').extract() for linkstr in a_list: if linkstr.startswith('http://') != True and linkstr.startswith('https://') !=True: linkstr = response.urljoin(linkstr) if 'about:blank' != linkstr and linkstr.lower().endswith('.rar') != True and linkstr.lower().endswith('.apk') != True and linkstr.startswith('tel') != True and linkstr.lower().endswith('.css') != True and linkstr.lower().endswith('.js') != True: yield scrapy.Request(url=linkstr, meta={'url':response.url}, callback=self.parse) def parse_tieba(self, response): with open('aa', 'a') as f: f.write(response.url+'\n') regex = 'href="(.*?)"' a_list = re.compile(regex).findall(response.body) for url in a_list: if len(url) >= 5 and 'javascript' not in url and 'css' not in url and url.startswith('/p'): if url.startswith('http:') != True and url.startswith('https:') != True: url = response.urljoin(url) yield scrapy.Request(url, meta={'url':response.url}, callback=self.parse_url) def parse_dupefilter(self, response): try: a_list = response.xpath("//a/@href").extract() data = soup_text(response) data = data + str(a_list) except Exception, e: data = str(e) print data if data: m = hashlib.md5() m.update(data) data_md5 = m.hexdigest() return data_md5 else: return '' def parse_text(self, response): item = {} try: father_url = response.meta["url"] except: father_url = "''" try: item['title'] = response.xpath('//title/text()').extract_first().replace('\r\n','').replace('\n','') except: item['title'] = "''" item['url'] = response.url item['domain'] = '' item['crawl_time'] = time.strftime('%Y%m%d%H%M%S') item['keyword'] = '' item['Type_result'] = '' item['type'] = 'html' item['filename'] = 'yq_' + str(int(time.time())) + '_0' + str(rand5())+'.txt' item['referver'] = father_url item['like'] = '' item['transpond'] = '' item['comment'] = '' item['publish_time'] = '' return item def parse_url(self, response): # 以内容做指纹 data_md5 = self.parse_dupefilter(response) if self.data_conn.sadd(self.datadupefilter, data_md5): content = soup_text(response.body) print content item = self.parse_text(response) self.data_conn.lpush(self.yuqing, item) yuqing_file = os.path.join(self.filepath, item['filename']) with open(yuqing_file, 'w') as b: b.write(content) def pang_bo(self, response): # 过略掉百度网页 if 'baidu.com' not in response.url: item = self.parse_text(response) content = soup_text(response.body) if len(content) > 3000: content = content[:3000] body = item['url']+','+item['crawl_time']+','+item['title'].replace(',','') +','+content+'\n' filename = time.strftime('%Y%m%d%H')+'.csv' with open(filename, 'a') as f: f.write(body)
启动爬虫的时候我们会以 scrapy crawl spider -a="{}" 的形式,给他传递一个json字符串,在spider 的init中,我们继承了BaiduSpider原有的初始化,并重写我们需要的东西。在init方法中,我们提取到我们从外面传入的数据,并进行处理。连接数据库、引擎选择、跨域开关设置、redis_key配置等。
修改make_request_from_url这个函数,此函数会传递一个url参数给Request对象。在这个过程过,我们会根据传递的动静态开关,给Request对象添加一个meta字典。这样我们在middlewares中间件中,通过process_request方法,提取到该请求携带的参数,来启动selenium+chrome来进行抓取。process_request函数是一个类方法,它默认携带request和spider两个对象。我们可以通过request.meta来获取刚刚传递的数据,详情如下图:
image.png
作者:可爱的小虫虫
链接:https://www.jianshu.com/p/8a1a1802fa92