pubsub lpush hkeys exist commands python redis

python - lpush - redis instructions



¿Es posible el bloqueo de redis pubsub? (10)

Aquí hay una solución sin bloqueo sin hilos:

fd = ps.connection._sock.fileno(); rlist,, = select.select([fd], [], [], 0) # or replace 0 with None to block if rlist: for rfd in rlist: if fd == rfd: message = ps.get_message()

ps.get_message() es suficiente por sí solo, pero uso este método para poder esperar en varios fds en lugar de solo la conexión redis.

Quiero usar redis ''pubsub para transmitir algunos mensajes, pero no quiero que me bloqueen con listen , como el código a continuación:

import redis rc = redis.Redis() ps = rc.pubsub() ps.subscribe([''foo'', ''bar'']) rc.publish(''foo'', ''hello world'') for item in ps.listen(): if item[''type''] == ''message'': print item[''channel''] print item[''data'']

El último for sección se bloqueará. Solo quiero comprobar si un canal dado tiene datos, ¿cómo puedo lograr esto? ¿Hay un check como método?


El enfoque más eficiente sería basado en greenlet en lugar de en hilos. Como un marco de concurrencia basado en greenlet, gevent ya está bastante establecido en el mundo Python. Una integración perfecta con redis-py sería, por lo tanto, maravillosa. Eso es exactamente lo que se está discutiendo en este tema en github:

https://github.com/andymccurdy/redis-py/issues/310


El pub / sub de Redis envía mensajes a los clientes suscritos (escuchando) en un canal. Si no está escuchando, perderá el mensaje (de ahí la llamada de bloqueo). Si desea que no esté bloqueado, le recomiendo usar una cola (redis también es bastante bueno). Si tiene que usar pub / sub, puede usar como sugerencia gevent para tener una escucha asíncrona y bloqueadora, enviar mensajes a una cola y usar un consumidor separado para procesar mensajes de esa cola de manera no bloqueable.


Este es un ejemplo de trabajo para enlazar el oyente de bloqueo.

import sys import cmd import redis import threading def monitor(): r = redis.Redis(YOURHOST, YOURPORT, YOURPASSWORD, db=0) channel = sys.argv[1] p = r.pubsub() p.subscribe(channel) print ''monitoring channel'', channel for m in p.listen(): print m[''data''] class my_cmd(cmd.Cmd): """Simple command processor example.""" def do_start(self, line): my_thread.start() def do_EOF(self, line): return True if __name__ == ''__main__'': if len(sys.argv) == 1: print "missing argument! please provide the channel name." else: my_thread = threading.Thread(target=monitor) my_thread.setDaemon(True) my_cmd().cmdloop()


La nueva versión de redispy es compatible con pubsub asíncrono, consulte https://github.com/andymccurdy/redis-py para obtener más detalles. Aquí hay un ejemplo de la documentación en sí:

while True: message = p.get_message() if message: # do something with the message time.sleep(0.001) # be nice to the system :)


La respuesta aceptada está obsoleta, ya que redis-py recomienda utilizar el get_message() no bloqueante. Pero también proporciona una manera de usar hilos fácilmente.

https://pypi.python.org/pypi/redis

Hay tres estrategias diferentes para leer los mensajes.

Detrás de escena, get_message () usa el módulo ''select'' del sistema para sondear rápidamente el socket de la conexión. Si hay datos disponibles para leer, get_message () los leerá, formateará el mensaje y lo devolverá o lo pasará a un controlador de mensajes. Si no hay datos para leer, get_message () devolverá inmediatamente Ninguno. Esto hace que sea trivial integrarlo en un bucle de evento existente dentro de su aplicación.

while True: message = p.get_message() if message: # do something with the message time.sleep(0.001) # be nice to the system :)

Las versiones anteriores de redis-py solo leen mensajes con pubsub.listen (). listen () es un generador que bloquea hasta que un mensaje esté disponible. Si su aplicación no necesita hacer nada más que recibir y actuar sobre los mensajes recibidos de redis, listen () es una manera fácil de comenzar a correr.

for message in p.listen(): # do something with the message

La tercera opción ejecuta un bucle de evento en un hilo separado. pubsub.run_in_thread () crea un nuevo hilo e inicia el bucle de eventos. El objeto de subproceso se devuelve al llamador de run_in_thread (). La persona que llama puede usar el método thread.stop () para cerrar el bucle y el hilo de eventos. Detrás de escena, esto es simplemente un envoltorio alrededor de get_message () que se ejecuta en un hilo separado, esencialmente creando un pequeño bucle de eventos no bloqueante para ti. run_in_thread () toma un argumento opcional sleep_time. Si se especifica, el bucle de eventos llamará a time.sleep () con el valor en cada iteración del bucle.

Nota: ya que estamos ejecutando en un hilo separado, no hay manera de manejar los mensajes que no se manejan automáticamente con los manejadores de mensajes registrados. Por lo tanto, redis-py le impide llamar a run_in_thread () si está suscrito a patrones o canales que no tienen adjuntos controladores de mensajes.

p.subscribe(**{''my-channel'': my_handler}) thread = p.run_in_thread(sleep_time=0.001) # the event loop is now running in the background processing messages # when it''s time to shut it down... thread.stop()

Entonces, para responder a su pregunta, simplemente consulte get_message cuando desee saber si ha llegado un mensaje.


No creo que sea posible. Un canal no tiene ningún "datos actuales", se suscribe a un canal y comienza a recibir mensajes que están siendo enviados por otros clientes en el canal, por lo tanto, es una API de bloqueo. Además, si observa la documentación de los Comandos de Redis para publicación / sub, se lo aclararía.


Para alcanzar un código de no bloqueo, debe hacer otro tipo de código de paradigma. No es difícil, usar un nuevo hilo para escuchar todos los cambios y dejar el hilo principal para hacer otras cosas.

Además, necesitará algún mecanismo para intercambiar datos entre el hilo principal y el hilo del suscriptor de redis.


Puede usar el parche gevent, gevent monkey para crear una aplicación redis pubsub sin bloqueo.


Si está pensando en un procesamiento asincrónico no bloqueante, probablemente esté usando (o debería usar) framework / servidor asíncrono.

ACTUALIZACIÓN: Han pasado 5 años desde la respuesta original, mientras tanto, Python obtuvo soporte nativo asíncrono de IO . Ahora hay AIORedis, un cliente asíncrono de Redis IO .