Item Pipeline

After an item has been scraped by a spider, it is sent to the Item Pipeline which processes it through several components that are executed sequentially.

Each item pipeline component (sometimes referred as just “Item Pipeline”) is a Python class that implements a simple method. They receive an item and perform an action over it, also deciding if the item should continue through the pipeline or be dropped and no longer processed.

Typical uses of item pipelines are:

  • cleansing HTML data
  • validating scraped data (checking that the items contain certain fields)
  • checking for duplicates (and dropping them)
  • storing the scraped item in a database

编写你自己的item pipeline

Each item pipeline component is a Python class that must implement the following method:

process_item(self, item, spider)

每个item pipeline组件都需要调用该方法。process_item()必须:返回一个带有数据的字典,返回一个Item(或任何继承类)对象, 返回一个Twisted Deferred,或抛出一个DropItem异常。丢弃的item将不会被之后的pipeline组件所处理。

Parameters:
  • item (Item object or a dict) – the item scraped
  • spiderSpider对象) —— 爬取该item的Spider

Additionally, they may also implement the following methods:

open_spider(self, spider)

This method is called when the spider is opened.

Parameters:spider (Spider object) – the spider which was opened
close_spider(self, spider)

This method is called when the spider is closed.

Parameters:spider (Spider object) – the spider which was closed
from_crawler(cls, crawler)

如果存在,则调用此类方法以从Crawler创建pipeline实例。它必须返回pipeline的一个新实例。Crawler对象提供对所有Scrapy核心组件(如设置和信号)的访问;它是pipline访问它们并将其功能hook到Scrapy中的一种方式。

Parameters:crawler (Crawler object) – crawler that uses this pipeline

Item pipeline示例

验证价格,同时丢弃没有价格的item

Let’s take a look at the following hypothetical pipeline that adjusts the price attribute for those items that do not include VAT (price_excludes_vat attribute), and drops those items which don’t contain a price:

from scrapy.exceptions import DropItem

class PricePipeline(object):

    vat_factor = 1.15

    def process_item(self, item, spider):
        if item['price']:
            if item['price_excludes_vat']:
                item['price'] = item['price'] * self.vat_factor
            return item
        else:
            raise DropItem("Missing price in %s" % item)

将item写入JSON文件

The following pipeline stores all scraped items (from all spiders) into a single items.jl file, containing one item per line serialized in JSON format:

import json

class JsonWriterPipeline(object):

    def __init__(self):
        self.file = open('items.jl', 'wb')

    def process_item(self, item, spider):
        line = json.dumps(dict(item)) + "\n"
        self.file.write(line)
        return item

JsonWriterPipeline的目的是为了介绍如何编写item pipeline。如果你想要将所有爬取的item都保存到同一个JSON文件, 你需要使用 Feed exports

将条目写入 MongoDB

在这个例子中我们用pymongo模块把条目写入MongoDBMongoDB地址和数据库名称在Scrapy设置中指定; MongoDB集合以item类命名。

此示例的要点是说明如何使用from_crawler()方法以及如何正确清理资源。

Note

Previous example (JsonWriterPipeline) doesn’t clean up resources properly. Fixing it is left as an exercise for the reader.

import pymongo

class MongoPipeline(object):

    collection_name = 'scrapy_items'

    def __init__(self, mongo_uri, mongo_db):
        self.mongo_uri = mongo_uri
        self.mongo_db = mongo_db

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            mongo_uri=crawler.settings.get('MONGO_URI'),
            mongo_db=crawler.settings.get('MONGO_DATABASE', 'items')
        )

    def open_spider(self, spider):
        self.client = pymongo.MongoClient(self.mongo_uri)
        self.db = self.client[self.mongo_db]

    def close_spider(self, spider):
        self.client.close()

    def process_item(self, item, spider):
        self.db[self.collection_name].insert(dict(item))
        return item

Take screenshot of item

此示例演示如何从process_item()方法返回Deferred它使用Splash呈现项目网址的屏幕截图。Pipeline 请求本地运行Splash的实例。在请求被下载并且Deferred回调触发后,它将项目保存到一个文件并将文件名添加到项目。

import scrapy
import hashlib
from urllib.parse import quote


class ScreenshotPipeline(object):
    """Pipeline that uses Splash to render screenshot of
    every Scrapy item."""

    SPLASH_URL = "http://localhost:8050/render.png?url={}"

    def process_item(self, item, spider):
        encoded_item_url = quote(item["url"])
        screenshot_url = self.SPLASH_URL.format(encoded_item_url)
        request = scrapy.Request(screenshot_url)
        dfd = spider.crawler.engine.download(request, spider)
        dfd.addBoth(self.return_item, item)
        return dfd

    def return_item(self, response, item):
        if response.status != 200:
            # Error happened, return item.
            return item

        # Save screenshot to file, filename will be hash of url.
        url = item["url"]
        url_hash = hashlib.md5(url.encode("utf8")).hexdigest()
        filename = "{}.png".format(url_hash)
        with open(filename, "wb") as f:
            f.write(response.body)

        # Store filename in item.
        item["screenshot_filename"] = filename
        return item

去重

A filter that looks for duplicate items, and drops those items that were already processed. Let’s say that our items have a unique id, but our spider returns multiples items with the same id:

from scrapy.exceptions import DropItem

class DuplicatesPipeline(object):

    def __init__(self):
        self.ids_seen = set()

    def process_item(self, item, spider):
        if item['id'] in self.ids_seen:
            raise DropItem("Duplicate item found: %s" % item)
        else:
            self.ids_seen.add(item['id'])
            return item

启用一个Item Pipeline组件

To activate an Item Pipeline component you must add its class to the ITEM_PIPELINES setting, like in the following example:

ITEM_PIPELINES = {
    'myproject.pipelines.PricePipeline': 300,
    'myproject.pipelines.JsonWriterPipeline': 800,
}

分配给每个类的整型值,确定了他们运行的顺序,item按数字从低到高的顺序。通常将这些数字定义在0-1000范围内。