spider - Ejecutar una araña Scrapy en una tarea de apio
scrapy fetch (4)
Esto ya no funciona , la API de scrapy ha cambiado.
Ahora la documentación muestra una forma de " Ejecutar Scrapy desde un script " pero obtengo el error ReactorNotRestartable
.
Mi tarea:
from celery import Task
from twisted.internet import reactor
from scrapy.crawler import Crawler
from scrapy import log, signals
from scrapy.utils.project import get_project_settings
from .spiders import MySpider
class MyTask(Task):
def run(self, *args, **kwargs):
spider = MySpider
settings = get_project_settings()
crawler = Crawler(settings)
crawler.signals.connect(reactor.stop, signal=signals.spider_closed)
crawler.configure()
crawler.crawl(spider)
crawler.start()
log.start()
reactor.run()
El reactor Twisted no se puede reiniciar, por lo que una vez que una araña termina de funcionar y el crawler
detiene el reactor implícitamente, ese trabajador es inútil.
Como se publicó en las respuestas a esa otra pregunta, todo lo que tiene que hacer es matar al trabajador que ejecutó su araña y reemplazarla por una nueva, lo que evita que el reactor se inicie y se detenga más de una vez. Para hacer esto, solo establece:
CELERYD_MAX_TASKS_PER_CHILD = 1
La desventaja es que no está realmente utilizando el reactor Twisted en todo su potencial y desperdiciando recursos ejecutando múltiples reactores, ya que un reactor puede ejecutar múltiples arañas a la vez en un solo proceso. Un mejor enfoque es ejecutar un reactor por trabajador (o incluso un reactor en todo el mundo) y no permita que el crawler
toque.
Estoy trabajando en esto para un proyecto muy similar, así que actualizaré esta publicación si hago algún progreso.
El reactor retorcido no se puede reiniciar. Una solución para esto es permitir que la tarea de apio cree un nuevo proceso secundario para cada rastreo que desee ejecutar, como se propone en la siguiente publicación:
Ejecutar arañas Scrapy en una tarea de apio
Esto evita el "reactor no puede ser un problema reiniciable" al utilizar el paquete de multiprocesamiento. Pero el problema con esto es que la solución ahora es obsoleta con la última versión de apio debido al hecho de que en su lugar se encontrará con otro problema donde un proceso de daemon no puede generar subprocesos. Entonces, para que la solución funcione, debes bajar a la versión de apio.
Sí, y la API de scrapy ha cambiado. Pero con modificaciones menores (importar Crawler en lugar de CrawlerProcess). Puede hacer que la solución funcione al bajar en la versión de apio.
El problema de apio se puede encontrar aquí: apio problema # 1709
Aquí está mi script de rastreo actualizado que funciona con versiones de apio más nuevas al utilizar billar en lugar de multiprocesamiento:
from scrapy.crawler import Crawler
from scrapy.conf import settings
from myspider import MySpider
from scrapy import log, project
from twisted.internet import reactor
from billiard import Process
from scrapy.utils.project import get_project_settings
class UrlCrawlerScript(Process):
def __init__(self, spider):
Process.__init__(self)
settings = get_project_settings()
self.crawler = Crawler(settings)
self.crawler.configure()
self.crawler.signals.connect(reactor.stop, signal=signals.spider_closed)
self.spider = spider
def run(self):
self.crawler.crawl(self.spider)
self.crawler.start()
reactor.run()
def run_spider(url):
spider = MySpider(url)
crawler = UrlCrawlerScript(spider)
crawler.start()
crawler.join()
Editar: Al leer el número de apio # 1709 , sugieren usar billar en lugar de multiproceso para que se levante la limitación del subproceso. En otras palabras, deberíamos probar el billiard y ver si funciona.
Editar 2: ¡ Sí, al usar billiard , mi script funciona con la última versión de apio! Ver mi script actualizado
Para evitar el error de ReactorNotRestartable al ejecutar Scrapy en Celery Tasks Queue, he usado threads. El mismo enfoque utilizado para ejecutar el reactor Twisted varias veces en una aplicación. Scrapy también usó Twisted, por lo que podemos hacer lo mismo.
Aquí está el código:
from threading import Thread
from scrapy.crawler import CrawlerProcess
import scrapy
class MySpider(scrapy.Spider):
name = ''my_spider''
class MyCrawler:
spider_settings = {}
def run_crawler(self):
process = CrawlerProcess(self.spider_settings)
process.crawl(MySpider)
Thread(target=process.start).start()
No se olvide de aumentar CELERYD_CONCURRENCY para el apio.
CELERYD_CONCURRENCY = 10
funciona bien para mí
Esto no está bloqueando el proceso en ejecución, pero de todos modos la mejor práctica de scrapy es procesar datos en devoluciones de llamadas. Solo hazlo de esta manera:
for crawler in process.crawlers:
crawler.spider.save_result_callback = some_callback
crawler.spider.save_result_callback_params = some_callback_params
Thread(target=process.start).start()
Yo diría que este enfoque es muy ineficiente si tiene que realizar muchas tareas. Porque el apio está enhebrado: ejecuta cada tarea dentro de su propio hilo. Digamos que con RabbitMQ como intermediario puede pasar> 10K q / s. ¡Con Aplery esto podría causar sobrecargas de 10K! Aconsejaría no usar apio aquí. ¡En lugar de eso, acceda al intermediario directamente!