Item Pipeline¶
After an item has been scraped by a spider, it is sent to the Item Pipeline which processes it through several components that are executed sequentially.
Each item pipeline component (sometimes referred as just “Item Pipeline”) is a Python class that implements a simple method. They receive an item and perform an action over it, also deciding if the item should continue through the pipeline or be dropped and no longer processed.
Typical uses of item pipelines are:
- cleansing HTML data
- validating scraped data (checking that the items contain certain fields)
- checking for duplicates (and dropping them)
- storing the scraped item in a database
编写你自己的item pipeline¶
Each item pipeline component is a Python class that must implement the following method:
-
process_item
(self, item, spider)¶ 每个item pipeline组件都需要调用该方法。
process_item()
必须:返回一个带有数据的字典,返回一个Item
(或任何继承类)对象, 返回一个Twisted Deferred,或抛出一个DropItem
异常。丢弃的item将不会被之后的pipeline组件所处理。Parameters:
Additionally, they may also implement the following methods:
-
open_spider
(self, spider)¶ This method is called when the spider is opened.
Parameters: spider ( Spider
object) – the spider which was opened
Item pipeline示例¶
验证价格,同时丢弃没有价格的item¶
Let’s take a look at the following hypothetical pipeline that adjusts the price
attribute for those items that do not include VAT (price_excludes_vat
attribute), and drops those items which don’t contain a price:
from scrapy.exceptions import DropItem
class PricePipeline(object):
vat_factor = 1.15
def process_item(self, item, spider):
if item['price']:
if item['price_excludes_vat']:
item['price'] = item['price'] * self.vat_factor
return item
else:
raise DropItem("Missing price in %s" % item)
将item写入JSON文件¶
The following pipeline stores all scraped items (from all spiders) into a single items.jl
file, containing one item per line serialized in JSON format:
import json
class JsonWriterPipeline(object):
def __init__(self):
self.file = open('items.jl', 'wb')
def process_item(self, item, spider):
line = json.dumps(dict(item)) + "\n"
self.file.write(line)
return item
注
JsonWriterPipeline的目的是为了介绍如何编写item pipeline。如果你想要将所有爬取的item都保存到同一个JSON文件, 你需要使用 Feed exports 。
将条目写入 MongoDB¶
在这个例子中我们用pymongo模块把条目写入MongoDB。MongoDB地址和数据库名称在Scrapy设置中指定; MongoDB集合以item类命名。
此示例的要点是说明如何使用from_crawler()
方法以及如何正确清理资源。
Note
Previous example (JsonWriterPipeline) doesn’t clean up resources properly. Fixing it is left as an exercise for the reader.
import pymongo
class MongoPipeline(object):
collection_name = 'scrapy_items'
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI'),
mongo_db=crawler.settings.get('MONGO_DATABASE', 'items')
)
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close_spider(self, spider):
self.client.close()
def process_item(self, item, spider):
self.db[self.collection_name].insert(dict(item))
return item
Take screenshot of item¶
此示例演示如何从process_item()
方法返回Deferred。它使用Splash呈现项目网址的屏幕截图。Pipeline 请求本地运行Splash的实例。在请求被下载并且Deferred回调触发后,它将项目保存到一个文件并将文件名添加到项目。
import scrapy
import hashlib
from urllib.parse import quote
class ScreenshotPipeline(object):
"""Pipeline that uses Splash to render screenshot of
every Scrapy item."""
SPLASH_URL = "http://localhost:8050/render.png?url={}"
def process_item(self, item, spider):
encoded_item_url = quote(item["url"])
screenshot_url = self.SPLASH_URL.format(encoded_item_url)
request = scrapy.Request(screenshot_url)
dfd = spider.crawler.engine.download(request, spider)
dfd.addBoth(self.return_item, item)
return dfd
def return_item(self, response, item):
if response.status != 200:
# Error happened, return item.
return item
# Save screenshot to file, filename will be hash of url.
url = item["url"]
url_hash = hashlib.md5(url.encode("utf8")).hexdigest()
filename = "{}.png".format(url_hash)
with open(filename, "wb") as f:
f.write(response.body)
# Store filename in item.
item["screenshot_filename"] = filename
return item
去重¶
A filter that looks for duplicate items, and drops those items that were already processed. Let’s say that our items have a unique id, but our spider returns multiples items with the same id:
from scrapy.exceptions import DropItem
class DuplicatesPipeline(object):
def __init__(self):
self.ids_seen = set()
def process_item(self, item, spider):
if item['id'] in self.ids_seen:
raise DropItem("Duplicate item found: %s" % item)
else:
self.ids_seen.add(item['id'])
return item
启用一个Item Pipeline组件¶
To activate an Item Pipeline component you must add its class to the ITEM_PIPELINES
setting, like in the following example:
ITEM_PIPELINES = {
'myproject.pipelines.PricePipeline': 300,
'myproject.pipelines.JsonWriterPipeline': 800,
}
分配给每个类的整型值,确定了他们运行的顺序,item按数字从低到高的顺序。通常将这些数字定义在0-1000范围内。