www portable python multithreading twisted redis

python - portable - redis version



Twisted: ¿por qué es que pasar una devolución de llamada diferida a un hilo diferido hace que el hilo se bloquee de repente? (3)

Intenté sin éxito usar txredis (la API no bloqueante retorcida para redis) para una cola de mensajes persistente que estoy tratando de configurar con un proyecto de scrapy en el que estoy trabajando. Descubrí que aunque el cliente no estaba bloqueando, se volvió mucho más lento de lo que podría haber sido porque lo que debería haber sido un evento en el bucle del reactor se dividió en miles de pasos.

Entonces, en cambio, intenté hacer uso de redis-py (la api de bloqueo retorcida normal) y envolver la llamada en un hilo diferido. Funciona muy bien, sin embargo, quiero realizar un diferimiento interno cuando realizo una llamada a redis, ya que me gustaría configurar el agrupamiento de conexiones en un intento de agilizar aún más las cosas.

Debajo está mi interpretación de algún código de muestra tomado de los documentos retorcidos para un hilo diferido para ilustrar mi caso de uso:

#!/usr/bin/env python from twisted.internet import reactor,threads from twisted.internet.task import LoopingCall import time def main_loop(): print ''doing stuff in main loop.. do not block me!'' def aBlockingRedisCall(): print ''doing lookup... this may take a while'' time.sleep(10) return ''results from redis'' def result(res): print res def main(): lc = LoopingCall(main_loop) lc.start(2) d = threads.deferToThread(aBlockingRedisCall) d.addCallback(result) reactor.run() if __name__==''__main__'': main()

Y aquí está mi alteración para la agrupación de conexiones que hace que el código en el bloqueo diferido de subprocesos:

#!/usr/bin/env python from twisted.internet import reactor,defer from twisted.internet.task import LoopingCall import time def main_loop(): print ''doing stuff in main loop.. do not block me!'' def aBlockingRedisCall(x): if x<5: #all connections are busy, try later print ''%s is less than 5, get a redis client later'' % x x+=1 d = defer.Deferred() d.addCallback(aBlockingRedisCall) reactor.callLater(1.0,d.callback,x) return d else: print ''got a redis client; doing lookup.. this may take a while'' time.sleep(10) # this is now blocking.. any ideas? d = defer.Deferred() d.addCallback(gotFinalResult) d.callback(x) return d def gotFinalResult(x): return ''final result is %s'' % x def result(res): print res def aBlockingMethod(): print ''going to sleep...'' time.sleep(10) print ''woke up'' def main(): lc = LoopingCall(main_loop) lc.start(2) d = defer.Deferred() d.addCallback(aBlockingRedisCall) d.addCallback(result) reactor.callInThread(d.callback, 1) reactor.run() if __name__==''__main__'': main()

Entonces mi pregunta es, ¿alguien sabe por qué mi alteración hace que el hilo diferido se bloquee y / o alguien puede sugerir una mejor solución?


Bueno, como dicen los retorcidos :

Los diferidos no hacen que el código mágicamente no bloquee

Siempre que esté usando un código de bloqueo, como sleep , debe aplazarlo a un nuevo hilo.

#!/usr/bin/env python from twisted.internet import reactor,defer, threads from twisted.internet.task import LoopingCall import time def main_loop(): print ''doing stuff in main loop.. do not block me!'' def aBlockingRedisCall(x): if x<5: #all connections are busy, try later print ''%s is less than 5, get a redis client later'' % x x+=1 d = defer.Deferred() d.addCallback(aBlockingRedisCall) reactor.callLater(1.0,d.callback,x) return d else: print ''got a redis client; doing lookup.. this may take a while'' def getstuff( x ): time.sleep(3) return "stuff is %s" % x # getstuff is blocking, so you need to push it to a new thread d = threads.deferToThread(getstuff, x) d.addCallback(gotFinalResult) return d def gotFinalResult(x): return ''final result is %s'' % x def result(res): print res def aBlockingMethod(): print ''going to sleep...'' time.sleep(10) print ''woke up'' def main(): lc = LoopingCall(main_loop) lc.start(2) d = defer.Deferred() d.addCallback(aBlockingRedisCall) d.addCallback(result) reactor.callInThread(d.callback, 1) reactor.run() if __name__==''__main__'': main()

En el caso de que la redis api no sea muy compleja, podría ser más natural reescribirla utilizando twisted.web, en lugar de simplemente llamar a la API de bloqueo en muchas hebras.



También hay un cliente actualizado de Redis para twisted que ya admite el nuevo protocolo y las funciones de Redis 2.x. Definitivamente debes intentarlo. Se llama txredisapi.

Para la cola de mensajes persistentes, recomendaría RestMQ. Un sistema de cola de mensajes basado en redis construido sobre cyclone y txredisapi.

http://github.com/gleicon/restmq

Aclamaciones