香港云主机最佳企业级服务商!

ADSL拨号VPS包含了中国大陆(联通,移动,电信,)

中国香港,国外拨号VPS。

当前位置:云主机 > python >

电信ADSL拨号VPS
联通ADSL拨号VPS
移动ADSL拨号VPS

python使用adbapi实现MySQL数据库的异步存储


时间:2022-04-02 10:24 作者:admin610456


之前一直在写有关scrapy爬虫的事情,今天我们看看使用scrapy如何把爬到的数据放在mysql/' target='_blank'>mysql数据库中保存。

有关python/' target='_blank'>python操作MySQL数据库的内容,网上已经有很多内容可以参考了,但都是在同步的操作MySQL数据库。在数据量不大的情况下,这种方法固然可以,但是一旦数据量增长后,MySQL就会出现崩溃的情况,因为网上爬虫的速度要远远高过往数据库中插入数据的速度。为了避免这种情况发生,我们就需要使用异步的方法来存储数据,爬虫与数据存储互不影响。

为了显示方便,我们把程序设计的简单一点,只是爬一页的数据。我们今天选择伯乐在线这个网站来爬取,只爬取第一页的数据。

首先我们还是要启动一个爬虫项目,然后自己建了一个爬虫的文件jobbole.py。我们先来看看这个文件中的代码

# -*- coding: utf-8 -*-import ioimport sysimport scrapyimport reimport datetimefrom scrapy.http import Requestfrom urllib import parsefrom ArticleSpider.items import JobboleArticleItem, ArticleItemLoaderfrom scrapy.loader import ItemLoadersys.stdout = io.TextIOWrapper(sys.stdout.buffer,encoding='utf-8') class JobboleSpider(scrapy.Spider): """docstring for JobboleSpider""" name = "jobbole" allowed_domain = ["blog.jobbole.com"] start_urls = ['http://blog.jobbole.com/all-posts/']  def parse(self, response): """ 1.获取列表页中的文章url """ # 解析列表汇中所有文章url并交给scrapy下载器并进行解析 post_nodes = response.css("#archive .floated-thumb .post-thumb a") for post_node in post_nodes: image_url = post_node.css("img::attr(src)").extract_first("")# 这里取出每篇文章的封面图,并作为meta传入Request post_url = post_node.css("::attr(href)").extract_first("") yield Request(url = parse.urljoin(response.url, post_url), meta = {"front_image_url":image_url}, callback = self.parse_detail)  def parse_detail(self, response): article_item = JobboleArticleItem() # 通过ItemLoader加载Item # 通过add_css后的返回值都是list型,所有我们再items.py要进行处理 item_loader = ArticleItemLoader(item = JobboleArticleItem(), response = response) item_loader.add_css("title", ".entry-header h1::text") item_loader.add_value("url", response.url) # item_loader.add_value("url_object_id", get_md5(response.url)) item_loader.add_value("url_object_id", response.url) item_loader.add_css("create_date", "p.entry-meta-hide-on-mobile::text") item_loader.add_value("front_image_url", [front_image_url]) item_loader.add_css("praise_nums", ".vote-post-up h10::text") item_loader.add_css("comment_nums", "a[href='#article-comment'] span::text") item_loader.add_css("fav_nums", ".bookmark-btn::text") item_loader.add_css("tags", "p.entry-meta-hide-on-mobile a::text") item_loader.add_css("content", "div.entry")  article_item = item_loader.load_item() print(article_item["tags"])  yield article_item pass

这里我把代码进行了简化,首先对列表页发出请求,这里只爬取一页数据,然后分析每一页的url,并且交给scrapy对每一个url进行请求,得到每篇文章的详情页,把详情页的相关内容放在MySQL数据库中。
这里使用itemloader来进行页面的解析,这样解析有个最大的好处就是可以把解析规则存放在数据库中,实现对解析规则的动态加载。但是要注意一点是使用itemloader中css方式和xpath方式得到的数据都是list型,因此还需要在items.py中再对相对应的数据进行处理。

接下来我们就来看看items.py是如何处理list数据的。

# -*- coding: utf-8 -*- # Define here the models for your scraped items## See documentation in:# https://doc.scrapy.org/en/latest/topics/items.htmlimport datetimeimport re  import scrapyfrom scrapy.loader import ItemLoaderfrom scrapy.loader.processors import MapCompose, TakeFirst,Joinfrom ArticleSpider.utils.common import get_md5  def convert_date(value): try: create_date = datetime.datetime.strptime(create_date, "%Y/%m/%d").date() except Exception as e: create_date = datetime.datetime.now().date() return create_date def get_nums(value): match_re = re.match(".*?(\d+).*", value) if match_re: nums = int(match_re.group(1)) else: nums = 0  return nums def remove_comment_tags(value): # 去掉tags中的评论内容 if "评论" in value: # 这里做了修改,如果返回"",则在list中仍然会占位,会变成类似于["程序员",,"解锁"]这样 # return "" return None  else: return value def return_value(value): return  class ArticleItemLoader(ItemLoader): """docstring for AriticleItemLoader""" # 自定义ItemLoader default_output_processor = TakeFirst() class ArticlespiderItem(scrapy.Item): # define the fields for your item here like: # name = scrapy.Field() pass class JobboleArticleItem(scrapy.Item): """docstring for ArticlespiderItem""" title = scrapy.Field() create_date = scrapy.Field( input_processor = MapCompose(convert_date) ) url = scrapy.Field() url_object_id = scrapy.Field( output_processor = MapCompose(get_md5) ) # 这里注意front_image_url还是一个list,在进行sql语句时还需要处理 front_image_url = scrapy.Field( output_processor = MapCompose(return_value) ) front_image_path = scrapy.Field() praise_nums = scrapy.Field( input_processor = MapCompose(get_nums) ) comment_nums = scrapy.Field( input_processor = MapCompose(get_nums) ) fav_nums = scrapy.Field( input_processor = MapCompose(get_nums) ) # tags要做另行处理,因为tags我们需要的就是list tags = scrapy.Field( input_processor = MapCompose(remove_comment_tags), output_processor = Join(",") ) content = scrapy.Field()

首先我们看到定义了一个类ArticleItemloader,在这个类中只有一句话,就是对于每个items都默认采用list中的第一个元素,这样我们就可以把每个items中的第一个元素取出来。但是要注意,有些items我们是必须要用list型的,比如我们给ImagePipeline的数据就要求必须是list型,这样我们就需要对front_image_url单独进行处理。这里我们做了一个小技巧,对front_image_url什么都不错,因为我们传过来的front_image_url就是list型
在items的Field中有两个参数,一个是input_processor,另一个是output_processor,这两个参数可以帮助我们对items的list中的每个元素进行处理,比如有些需要用md5进行加密,有些需要用正则表达式进行筛选或者排序等等。

在进行mysql的pipeline之前,我们需要设计数据库,下面是我自己设计的数据库的字段,仅供参考

这里我把url_object_id作为该表的主键,由于它不会重复,所以适合做主键。

下面我们来看看数据库的pipeline。

# -*- coding: utf-8 -*- # Define your item pipelines here## Don't forget to add your pipeline to the ITEM_PIPELINES setting# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.htmlimport codecsimport jsonfrom twisted.enterprise import adbapiimport MySQLdbimport MySQLdb.cursors  class MysqlTwistedPipeline(object): """docstring for MysqlTwistedPipeline""" #采用异步的机制写入mysql def __init__(self, dbpool): self.dbpool = dbpool  @classmethod def from_settings(cls, settings): dbparms = dict( host = settings["MYSQL_HOST"], db = settings["MYSQL_DBNAME"], user = settings["MYSQL_USER"], passwd = settings["MYSQL_PASSWORD"], charset='utf8', cursorclass=MySQLdb.cursors.DictCursor, use_unicode=True, ) dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)  return cls(dbpool)  def process_item(self, item, spider): #使用twisted将mysql插入变成异步执行 query = self.dbpool.runInteraction(self.do_insert, item) query.addErrback(self.handle_error, item, spider) #处理异常 return item  def handle_error(self, failure, item, spider): # 处理异步插入的异常 print (failure)  def do_insert(self, cursor, item): #执行具体的插入 #根据不同的item 构建不同的sql语句并插入到mysql中 # insert_sql, params = item.get_insert_sql() # print (insert_sql, params) # cursor.execute(insert_sql, params) insert_sql = """ insert into jobbole_article(title, url, create_date, fav_nums, url_object_id) VALUES (%s, %s, %s, %s, %s) """ # 可以只使用execute,而不需要再使用commit函数 cursor.execute(insert_sql, (item["title"], item["url"], item["create_date"], item["fav_nums"], item["url_object_id"]))

在这里我们只是演示一下,我们只向数据库中插入5个字段的数据,分别是title,url,create_date,fav_nums,url_object_id。

当然你也可以再加入其它的字段。

首先我们看看from_settings这个函数,它可以从settings.py文件中取出我们想想要的数据,这里我们把数据库的host,dbname,username和password都放在settings.py中。实际的插入语句还是在process_item中进行,我们自己定义了一个函数do_insert,然后把它传给dbpool中用于插入真正的数据。

最后我们来看看settings.py中的代码,这里就很简单了。

MYSQL_HOST = "localhost"MYSQL_DBNAME = "article_wilson"MYSQL_USER = "root"MYSQL_PASSWORD = "root"

其实这里是和pipeline中的代码是想对应的,别忘了把在settings.py中把pipeline打开。

ITEM_PIPELINES = { # 'ArticleSpider.pipelines.ArticlespiderPipeline': 300, # 'ArticleSpider.pipelines.JsonWithEncodingPipeline': 1  # # 'scrapy.pipelines.images.ImagePipeline': 1, # 'ArticleSpider.pipelines.JsonExporterPipleline': 1 # 'ArticleSpider.pipelines.ArticleImagePipeline': 2 # 'ArticleSpider.pipelines.MysqlPipeline': 1 'ArticleSpider.pipelines.MysqlTwistedPipeline': 1}

好了,现在我们可以跑一程序吧。

scrapy crawl jobbole

下面是运行结果的截图

好了,以上就是今天的全部内容了。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

(责任编辑:admin)






帮助中心
会员注册
找回密码
新闻中心
快捷通道
域名登录面板
虚机登录面板
云主机登录面板
关于我们
关于我们
联系我们
联系方式

售前咨询:17830004266(重庆移动)

企业QQ:383546523

《中华人民共和国工业和信息化部》 编号:ICP备00012341号

Copyright © 2002 -2018 香港云主机 版权所有
声明:香港云主机品牌标志、品牌吉祥物均已注册商标,版权所有,窃用必究

云官方微信

在线客服

  • 企业QQ: 点击这里给我发消息
  • 技术支持:383546523

  • 公司总台电话:17830004266(重庆移动)
  • 售前咨询热线:17830004266(重庆移动)