Запустите Scrapy spider в Задаче Сельдерея

Это больше не работает , API Scrapy изменился.

Теперь в документации есть способ « Запустить Scrapy из сценария », но я получаю ошибку ReactorNotRestartable .

Мое задание:

 from celery import Task from twisted.internet import reactor from scrapy.crawler import Crawler from scrapy import log, signals from scrapy.utils.project import get_project_settings from .spiders import MySpider class MyTask(Task): def run(self, *args, **kwargs): spider = MySpider settings = get_project_settings() crawler = Crawler(settings) crawler.signals.connect(reactor.stop, signal=signals.spider_closed) crawler.configure() crawler.crawl(spider) crawler.start() log.start() reactor.run() 

Закрученный реактор нельзя перезапустить. Работа для этого состоит в том, чтобы позволить задаче сельдерея развить новый дочерний процесс для каждого обхода, который вы хотите выполнить, как предлагается в следующем сообщении:

Запуск пауков Scrapy в задаче Сельдерея

Это связано с тем, что «реактор не может быть перезапущен», используя пакет многопроцессорности. Но проблема в том, что обходной путь теперь устарел с последней версией сельдерея из-за того, что вместо этого вы столкнетесь с другой проблемой, когда процесс демона не может порождать subprocessы. Поэтому для того, чтобы обходной путь работал, вам нужно спуститься в версию сельдерея.

Да, и API-интерфейс scrapy изменился. Но с небольшими изменениями (импорт Crawler вместо CrawlerProcess). Вы можете заставить обходной путь работать, спустившись в версии по сельдерею.

Вопрос о сельдере можно найти здесь: Проблема сельдерея № 1709

Вот мой обновленный crawl-скрипт, который работает с новыми версиями сельдерея, используя бильярд вместо многопроцессорности:

 from scrapy.crawler import Crawler from scrapy.conf import settings from myspider import MySpider from scrapy import log, project from twisted.internet import reactor from billiard import Process from scrapy.utils.project import get_project_settings class UrlCrawlerScript(Process): def __init__(self, spider): Process.__init__(self) settings = get_project_settings() self.crawler = Crawler(settings) self.crawler.configure() self.crawler.signals.connect(reactor.stop, signal=signals.spider_closed) self.spider = spider def run(self): self.crawler.crawl(self.spider) self.crawler.start() reactor.run() def run_spider(url): spider = MySpider(url) crawler = UrlCrawlerScript(spider) crawler.start() crawler.join() 

Изменить: читая проблему сельдерея № 1709, они предлагают использовать бильярд вместо многопроцессорности, чтобы ограничение subprocessа было отменено. Другими словами, мы должны попробовать бильярд и посмотреть, работает ли он!

Редактировать 2: Да, используя бильярд , мой скрипт работает с новейшей сборкой сельдерея! См. Мой обновленный скрипт.

Реактор Twisted не может быть перезапущен, поэтому, как только один паук заканчивается, и crawler механизм неявно останавливает реактор, этот рабочий бесполезен.

Как указано в ответах на этот другой вопрос, все, что вам нужно сделать, это убить рабочего, который запустил ваш паук, и заменить его новым, что предотвращает запуск и остановку реактора более одного раза. Для этого просто установите:

 CELERYD_MAX_TASKS_PER_CHILD = 1 

Недостатком является то, что вы на самом деле не используете реактор Twisted до полного потенциала и тратите ресурсы на несколько реакторов, так как один реактор может запускать сразу несколько пауков за один процесс. Лучший подход – запустить один реактор на одного рабочего (или даже один реактор в глобальном масштабе) и не позволять crawler прикосновению его.

Я работаю над этим для очень похожего проекта, поэтому я буду обновлять этот пост, если я сделаю какой-то прогресс.

Чтобы избежать ошибки ReactorNotRestartable при запуске Scrapy в Queens Tasks Queue, я использовал streamи. Тот же подход использовался для запуска реактора Twisted несколько раз в одном приложении. Scrapy также использовал Twisted, поэтому мы можем сделать то же самое.

Вот код:

 from threading import Thread from scrapy.crawler import CrawlerProcess import scrapy class MySpider(scrapy.Spider): name = 'my_spider' class MyCrawler: spider_settings = {} def run_crawler(self): process = CrawlerProcess(self.spider_settings) process.crawl(MySpider) Thread(target=process.start).start() 

Не забудьте увеличить CELERYD_CONCURRENCY для сельдерея.

 CELERYD_CONCURRENCY = 10 

отлично работает для меня.

Это не препятствует процессу работы, но в любом случае наилучшая практика в области скрипирования – обработка данных в обратных вызовах. Просто сделайте так:

 for crawler in process.crawlers: crawler.spider.save_result_callback = some_callback crawler.spider.save_result_callback_params = some_callback_params Thread(target=process.start).start() 

Я бы сказал, что этот подход очень неэффективен, если у вас есть много задач для обработки. Поскольку сельдерей имеет резьбу – выполняет каждую задачу в своей собственной нити. Скажем, с RabbitMQ в качестве брокера вы можете пройти> 10K q / s. С сельдереем это потенциально может привести к накладным расходам 10K! Я бы посоветовал не использовать сельдерей здесь. Вместо этого обращайтесь к брокеру напрямую!

Давайте будем гением компьютера.