start - python ppt español
¿Cómo hacer de manera eficiente muchas tareas un poco más tarde en Python? (10)
¿Has mirado el módulo de multiprocessing
? Viene de serie con Python. Es similar al módulo de threading
, pero ejecuta cada tarea en un proceso. Puede usar un objeto Pool()
para configurar un grupo de trabajadores, luego usar el método .map()
para llamar a una función con los diversos argumentos de tarea en cola.
Tengo un proceso que necesita realizar un montón de acciones "más tarde" (después de 10-60 segundos por lo general). El problema es que esas acciones "posteriores" pueden ser muchas (1000), por lo que el uso de un Thread
por tarea no es viable. Sé por la existencia de herramientas como gevent y eventlet , pero uno de los problemas es que el proceso usa zeromq para la comunicación, por lo que necesitaría algo de integración (el eventlet ya lo tiene).
Lo que me pregunto es ¿Cuáles son mis opciones? Por lo tanto, las sugerencias son bienvenidas, en las líneas de las bibliotecas (si ha usado alguna de las mencionadas, comparta sus experiencias), las técnicas ( el soporte "coroutine" de Python , use un hilo que duerme durante un tiempo y verifica una cola), cómo para hacer uso de la encuesta de zeromq o eventloop para hacer el trabajo, o algo más.
Bueno, en mi opinión, podría usar algo llamado "multitarea cooperativa". Es cosa de base retorcida y es realmente genial. Solo mire la presentación PyCon de 2010: http://blip.tv/pycon-us-videos-2009-2010-2011/pycon-2010-cooperative-multitasking-with-twisted-getting-things-done-concurrently-11-3352182
Bueno, necesitarás una cola de transporte para hacer esto también ...
Esta respuesta tiene en realidad dos sugerencias: la primera y la otra que descubrí después de la primera.
programar
Sospecho que estás buscando el módulo de programación .
EDITAR : mi simple sugerencia me pareció poco útil después de haberla leído. Así que decidí probar el módulo de programación para ver si puede funcionar como sugerí. Aquí viene mi prueba: lo usaría con un único hilo, más o menos de esta manera:
class SchedulingThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.scheduler = sched.scheduler(time.time, time.sleep)
self.queue = []
self.queue_lock = threading.Lock()
self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())
def run(self):
self.scheduler.run()
def schedule(self, function, delay):
with self.queue_lock:
self.queue.append((delay, 1, function, ()))
def _schedule_in_scheduler(self):
with self.queue_lock:
for event in self.queue:
self.scheduler.enter(*event)
print "Registerd event", event
self.queue = []
self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())
Primero, crearía una clase de hilo que tendría su propio planificador y una cola. Al menos un evento se registraría en el programador: uno para invocar un método para programar eventos de la cola.
class SchedulingThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.scheduler = sched.scheduler(time.time, time.sleep)
self.queue = []
self.queue_lock = threading.Lock()
self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())
El método para programar eventos de la cola bloquearía la cola, programaría cada evento, vaciaría la cola y programaría nuevamente, para buscar nuevos eventos en el futuro. Tenga en cuenta que el período para buscar nuevos eventos es corto (un segundo), puede cambiarlo:
def _schedule_in_scheduler(self):
with self.queue_lock:
for event in self.queue:
self.scheduler.enter(*event)
print "Registerd event", event
self.queue = []
self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())
La clase también debe tener un método para programar eventos de usuario. Naturalmente, este método debería bloquear la cola al actualizarla:
def schedule(self, function, delay):
with self.queue_lock:
self.queue.append((delay, 1, function, ()))
Finalmente, la clase debe invocar el método principal del planificador:
def run(self):
self.scheduler.run()
Aquí viene un ejemplo de uso:
def print_time():
print "scheduled:", time.time()
if __name__ == "__main__":
st = SchedulingThread()
st.start()
st.schedule(print_time, 10)
while True:
print "main thread:", time.time()
time.sleep(5)
st.join()
Su salida en mi máquina es:
$ python schedthread.py
main thread: 1311089765.77
Registerd event (10, 1, <function print_time at 0x2f4bb0>, ())
main thread: 1311089770.77
main thread: 1311089775.77
scheduled: 1311089776.77
main thread: 1311089780.77
main thread: 1311089785.77
Este código es solo un ejemplo rápido, puede que necesite algo de trabajo. Sin embargo, debo confesar que estoy un poco fascinado por el módulo de programación, así que lo sugerí. Es posible que desee buscar otras sugerencias también :)
APScheduler
Buscando en Google soluciones como la que publiqué , encontré este increíble módulo APScheduler . Es tan práctico y útil que apuesto a que es su solución. Mi ejemplo anterior sería mucho más simple con este módulo:
from apscheduler.scheduler import Scheduler
import time
sch = Scheduler()
sch.start()
@sch.interval_schedule(seconds=10)
def print_time():
print "scheduled:", time.time()
sch.unschedule_func(print_time)
while True:
print "main thread:", time.time()
time.sleep(5)
(Desafortunadamente, no encontré la forma de programar un evento para que se ejecute solo una vez, por lo que el evento de la función se debe anular a sí mismo. Apuesto a que se puede resolver con algún decorador).
Otra opción es usar los enlaces Phyton GLib , en particular sus funciones de timeout
.
Es una buena opción siempre que no desee utilizar múltiples núcleos y que la dependencia de GLib no sea un problema. Maneja todos los eventos en el mismo hilo que evita problemas de sincronización. Además, su marco de eventos también se puede usar para ver y manejar eventos basados en IO (es decir, sockets).
ACTUALIZAR:
Aquí hay una sesión en vivo usando GLib:
>>> import time
>>> import glib
>>>
>>> def workon(thing):
... print("%s: working on %s" % (time.time(), thing))
... return True # use True for repetitive and False for one-time tasks
...
>>> ml = glib.MainLoop()
>>>
>>> glib.timeout_add(1000, workon, "this")
2
>>> glib.timeout_add(2000, workon, "that")
3
>>>
>>> ml.run()
1311343177.61: working on this
1311343178.61: working on that
1311343178.61: working on this
1311343179.61: working on this
1311343180.61: working on this
1311343180.61: working on that
1311343181.61: working on this
1311343182.61: working on this
1311343182.61: working on that
1311343183.61: working on this
Sencillo. Puedes heredar tu clase de Thread y crear una instancia de tu clase con Param como timeout, por lo que, para cada instancia de tu clase, puedes decir timeout que hará que tu hilo espere ese tiempo.
Si tiene un montón de tareas que deben realizarse más adelante y desea que continúen, incluso si cierra el programa de llamadas o sus trabajadores, debería mirar el Celery , lo que facilita enormemente la creación de nuevas tareas. Ejecútelas en la máquina que desee y espere los resultados.
Desde la página de Apio, "Esta es una tarea simple que agrega dos números:"
from celery.task import task
@task
def add(x, y):
return x + y
Puede ejecutar la tarea en segundo plano o esperar a que finalice:
>>> result = add.delay(8, 8)
>>> result.wait() # wait for and return the result
16
Suponiendo que su proceso tiene un bucle de ejecución que puede recibir señales y el tiempo de cada acción está dentro de los límites de la operación secuencial, use señales y posix alarma ()
signal.alarm(time)
If time is non-zero, this function requests that a
SIGALRM signal be sent to the process in time seconds.
Esto depende de lo que entiendas por " esas acciones" posteriores "pueden ser muchas " y si tu proceso ya usa señales. Debido a la formulación de la pregunta, no está claro por qué se necesitaría un paquete externo de Python.
Tu escribiste:
uno de los problemas es que el proceso utiliza zeromq para la comunicación, por lo que necesitaría algo de integración (eventlet ya lo tiene)
Parece que su elección estará fuertemente influenciada por estos detalles, que son un poco confusos: cómo se está utilizando zeromq para la comunicación, cuántos recursos requerirá la integración y cuáles son sus requisitos y los recursos disponibles.
Hay un proyecto llamado django-ztask que usa zeromq
y proporciona un decorador de task
similar al de apio. Sin embargo, es (obviamente) específico de Django y por lo tanto puede no ser adecuado en su caso. No lo he usado, prefiero el celery mí mismo.
He estado utilizando apio para un par de proyectos (estos se alojan en ep.io PaaS hosting, lo que proporciona una forma fácil de usarlo).
El apio parece una solución bastante flexible, que permite retrasar tareas, devoluciones de llamadas, caducidad y reintento de tareas, limitar la tasa de ejecución de tareas, etc. Se puede utilizar con Redis, Beanstalk, CouchDB, MongoDB o una base de datos SQL.
Código de ejemplo (definición de tarea y ejecución asíncrona después de un retraso):
from celery.decorators import task
@task
def my_task(arg1, arg2):
pass # Do something
result = my_task.apply_async(
args=[sth1, sth2], # Arguments that will be passed to `my_task()` function.
countdown=3, # Time in seconds to wait before queueing the task.
)
Véase también una sección en documentos de apio .
considere usar una cola de prioridad con uno o más subprocesos de trabajo para atender las tareas. El subproceso principal puede agregar trabajo a la cola, con una marca de tiempo de lo más pronto que se debe revisar. Los subprocesos de trabajo emergen de la cola, duermen hasta que se alcanza el tiempo de prioridad, hacen el trabajo y luego sacan otro elemento de la cola.
¿Qué tal una respuesta más completa? mklauber hace un buen punto. Si existe la posibilidad de que todos sus trabajadores estén durmiendo cuando tiene un trabajo nuevo y más urgente, entonces una cola. La prioridad queue.PriorityQueue
no es realmente la solución, aunque una "cola de prioridad" sigue siendo la técnica a utilizar, que está disponible en el sitio. módulo heapq
. En su lugar, haremos uso de una primitiva de sincronización diferente; una variable de condición, que en python se escribe threading.Condition
.
El enfoque es bastante simple, eche un vistazo al montón, y si el trabajo es actual, quítelo y haga ese trabajo. Si hubo trabajo, pero está programado para el futuro, solo espere la condición hasta entonces, o si no hay trabajo, duerma para siempre.
El productor hace una parte justa del trabajo; Cada vez que agrega un nuevo trabajo, notifica la condición, por lo que si hay trabajadores dormidos, se despertarán y volverán a revisar la cola para un trabajo más nuevo.
import heapq, time, threading
START_TIME = time.time()
SERIALIZE_STDOUT = threading.Lock()
def consumer(message):
"""the actual work function. nevermind the locks here, this just keeps
the output nicely formatted. a real work function probably won''t need
it, or might need quite different synchronization"""
SERIALIZE_STDOUT.acquire()
print time.time() - START_TIME, message
SERIALIZE_STDOUT.release()
def produce(work_queue, condition, timeout, message):
"""called to put a single item onto the work queue."""
prio = time.time() + float(timeout)
condition.acquire()
heapq.heappush(work_queue, (prio, message))
condition.notify()
condition.release()
def worker(work_queue, condition):
condition.acquire()
stopped = False
while not stopped:
now = time.time()
if work_queue:
prio, data = work_queue[0]
if data == ''stop'':
stopped = True
continue
if prio < now:
heapq.heappop(work_queue)
condition.release()
# do some work!
consumer(data)
condition.acquire()
else:
condition.wait(prio - now)
else:
# the queue is empty, wait until notified
condition.wait()
condition.release()
if __name__ == ''__main__'':
# first set up the work queue and worker pool
work_queue = []
cond = threading.Condition()
pool = [threading.Thread(target=worker, args=(work_queue, cond))
for _ignored in range(4)]
map(threading.Thread.start, pool)
# now add some work
produce(work_queue, cond, 10, ''Grumpy'')
produce(work_queue, cond, 10, ''Sneezy'')
produce(work_queue, cond, 5, ''Happy'')
produce(work_queue, cond, 10, ''Dopey'')
produce(work_queue, cond, 15, ''Bashful'')
time.sleep(5)
produce(work_queue, cond, 5, ''Sleepy'')
produce(work_queue, cond, 10, ''Doc'')
# and just to make the example a bit more friendly, tell the threads to stop after all
# the work is done
produce(work_queue, cond, float(''inf''), ''stop'')
map(threading.Thread.join, pool)
Pyzmq tiene una implementación de ioloop
con una api similar a la del tornado ioloop. Implementa un DelayedCallback
que puede ayudarte.