Item Pipeline

当Item在Spider中被收集之后,它将会被传递到Item Pipeline,一些组件会按照一定的顺序执行对Item的处理。

每个item pipeline组件(有时称之为“Item Pipeline”)是实现了简单方法的Python类。他们接收到Item并通过它执行一些行为,同时也决定此Item是否继续通过pipeline,或是被丢弃而不再进行处理。

以下是item pipeline的一些典型应用:

  • 清理HTML数据
  • 验证爬取的数据(检查item包含某些字段)
  • 查重(并丢弃)
  • 将爬取结果保存到数据库中

编写你自己的item pipeline

每个item pipiline组件是一个独立的Python类,同时必须实现以下方法:

processitem(_self, item, spider)

每个item pipeline组件都需要调用该方法,这个方法必须返回一个具有数据的dict,或是 Item (或任何继承类)对象,或是抛出 DropItem 异常,被丢弃的item将不会被之后的pipeline组件所处理。

参数:
- item (Item 对象或者一个dict) – 被爬取的item
- spider (Spider 对象) – 爬取该item的spider


此外,他们也可以实现以下方法:
openspider(_self, spider)

当spider被开启时,这个方法被调用。

参数:spider (Spider 对象) – 被开启的spider

closespider(_self, spider)

当spider被关闭时,这个方法被调用

参数:spider (Spider 对象) – 被关闭的spider

fromcrawler(_cls, crawler)

If present, this classmethod is called to create a pipeline instancefrom a Crawler. It must return a new instanceof the pipeline. Crawler object provides access to all Scrapy corecomponents like settings and signals; it is a way for pipeline toaccess them and hook its functionality into Scrapy.

参数:crawler (Crawler object) – crawler that uses this pipeline

Item pipeline 样例

验证价格,同时丢弃没有价格的item

让我们来看一下以下这个假设的pipeline,它为那些不含税(price_excludes_vat 属性)的item调整了 price 属性,同时丢弃了那些没有价格的item:

  1. from scrapy.exceptions import DropItem
  2.  
  3. class PricePipeline(object):
  4.  
  5. vat_factor = 1.15
  6.  
  7. def process_item(self, item, spider):
  8. if item['price']:
  9. if item['price_excludes_vat']:
  10. item['price'] = item['price'] * self.vat_factor
  11. return item
  12. else:
  13. raise DropItem("Missing price in %s" % item)

将item写入JSON文件

以下pipeline将所有(从所有spider中)爬取到的item,存储到一个独立地 items.jl 文件,每行包含一个序列化为JSON格式的item:

  1. import json
  2.  
  3. class JsonWriterPipeline(object):
  4.  
  5. def __init__(self):
  6. self.file = open('items.jl', 'wb')
  7.  
  8. def process_item(self, item, spider):
  9. line = json.dumps(dict(item)) + "\n"
  10. self.file.write(line)
  11. return item

注解

JsonWriterPipeline的目的只是为了介绍怎样编写item pipeline,如果你想要将所有爬取的item都保存到同一个JSON文件,你需要使用 Feed exports

Write items to MongoDB

In this example we’ll write items to MongoDB using pymongo.MongoDB address and database name are specified in Scrapy settings;MongoDB collection is named after item class.

The main point of this example is to show how to use from_crawler()method and how to clean up the resources properly.

注解

Previous example (JsonWriterPipeline) doesn’t clean up resources properly.Fixing it is left as an exercise for the reader.

  1. import pymongo
  2.  
  3. class MongoPipeline(object):
  4.  
  5. collection_name = 'scrapy_items'
  6.  
  7. def __init__(self, mongo_uri, mongo_db):
  8. self.mongo_uri = mongo_uri
  9. self.mongo_db = mongo_db
  10.  
  11. @classmethod
  12. def from_crawler(cls, crawler):
  13. return cls(
  14. mongo_uri=crawler.settings.get('MONGO_URI'),
  15. mongo_db=crawler.settings.get('MONGO_DATABASE', 'items')
  16. )
  17.  
  18. def open_spider(self, spider):
  19. self.client = pymongo.MongoClient(self.mongo_uri)
  20. self.db = self.client[self.mongo_db]
  21.  
  22. def close_spider(self, spider):
  23. self.client.close()
  24.  
  25. def process_item(self, item, spider):
  26. self.db[self.collection_name].insert(dict(item))
  27. return item

去重

一个用于去重的过滤器,丢弃那些已经被处理过的item。让我们假设我们的item有一个唯一的id,但是我们spider返回的多个item中包含有相同的id:

  1. from scrapy.exceptions import DropItem
  2.  
  3. class DuplicatesPipeline(object):
  4.  
  5. def __init__(self):
  6. self.ids_seen = set()
  7.  
  8. def process_item(self, item, spider):
  9. if item['id'] in self.ids_seen:
  10. raise DropItem("Duplicate item found: %s" % item)
  11. else:
  12. self.ids_seen.add(item['id'])
  13. return item

启用一个Item Pipeline组件

为了启用一个Item Pipeline组件,你必须将它的类添加到 ITEM_PIPELINES 配置,就像下面这个例子:

  1. ITEM_PIPELINES = {
  2. 'myproject.pipelines.PricePipeline': 300,
  3. 'myproject.pipelines.JsonWriterPipeline': 800,
  4. }

分配给每个类的整型值,确定了他们运行的顺序,item按数字从低到高的顺序,通过pipeline,通常将这些数字定义在0-1000范围内。