tiempo temporizador run funcion every ejecutar cierto cada python timer

temporizador - python run function every x seconds



¿Cuál es la mejor manera de ejecutar repetidamente una función cada x segundos en Python? (13)

Quiero ejecutar repetidamente una función en Python cada 60 segundos para siempre (como un NSTimer en Objective C). Este código se ejecutará como un daemon y es efectivamente como llamar al script python cada minuto usando un cron, pero sin requerir que sea configurado por el usuario.

En esta pregunta sobre un cron implementado en Python , la solución parece efectivamente solo sleep() durante x segundos. No necesito una funcionalidad tan avanzada así que quizás algo así funcionaría

while True: # Code executed here time.sleep(60)

¿Hay algún problema previsible con este código?


Aquí hay una actualización del código de MestreLion que evita drifiting en el tiempo:

import threading import time class RepeatedTimer(object): def __init__(self, interval, function, *args, **kwargs): self._timer = None self.interval = interval self.function = function self.args = args self.kwargs = kwargs self.is_running = False self.next_call = time.time() self.start() def _run(self): self.is_running = False self.start() self.function(*self.args, **self.kwargs) def start(self): if not self.is_running: self.next_call += self.interval self._timer = threading.Timer(self.next_call - time.time(), self._run) self._timer.start() self.is_running = True def stop(self): self._timer.cancel() self.is_running = False


Es posible que desee considerar Twisted que es una biblioteca de red python que implementa el Patrón de Reactor .

from twisted.internet import task from twisted.internet import reactor timeout = 60.0 # Sixty seconds def doWork(): #do work here pass l = task.LoopingCall(doWork) l.start(timeout) # call every sixty seconds reactor.run()

Si bien "while True: sleep (60)" probablemente funcionará Twisted probablemente ya implemente muchas de las características que eventualmente necesitará (daemonización, registro o manejo de excepciones según lo señalado por bobince) y probablemente sea una solución más robusta


La manera más fácil que creo ser:

import time def executeSomething(): #code here time.sleep(60) while True: executeSomething()

De esta forma, se ejecuta el código, luego espera 60 segundos, luego se ejecuta nuevamente, espera, se ejecuta, etc. No es necesario complicar las cosas: D


La principal diferencia entre eso y cron es que una excepción matará al daemon para siempre. Es posible que desee envolver con un receptor y registrador de excepciones.


Me enfrenté a un problema similar hace algún tiempo. ¿Puede ser http://cronus.readthedocs.org podría ayudar?

Para v0.2, el siguiente fragmento funciona

import cronus.beat as beat beat.set_rate(2) # 2 Hz while beat.true(): # do some time consuming work here beat.sleep() # total loop duration would be 0.5 sec


Si quieres una forma no bloqueante para ejecutar tu función periódicamente, en lugar de bloquear un ciclo infinito, usaría un temporizador con rosca. De esta forma, su código puede seguir ejecutándose y realizar otras tareas y aún así tener su función llamada cada n segundos. Utilizo esta técnica mucho para imprimir información de progreso en tareas largas, intensivas de CPU / Disco / Red.

Este es el código que publiqué en una pregunta similar, con el control de inicio () y detención ():

from threading import Timer class RepeatedTimer(object): def __init__(self, interval, function, *args, **kwargs): self._timer = None self.interval = interval self.function = function self.args = args self.kwargs = kwargs self.is_running = False self.start() def _run(self): self.is_running = False self.start() self.function(*self.args, **self.kwargs) def start(self): if not self.is_running: self._timer = Timer(self.interval, self._run) self._timer.start() self.is_running = True def stop(self): self._timer.cancel() self.is_running = False

Uso:

from time import sleep def hello(name): print "Hello %s!" % name print "starting..." rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start() try: sleep(5) # your long-running job goes here... finally: rt.stop() # better in a try/finally block to make sure the program ends!

caracteristicas:

  • Biblioteca estándar solamente, sin dependencias externas
  • start() y stop() son seguros para llamar varias veces, incluso si el temporizador ya ha comenzado / detenido
  • la función a ser llamada puede tener argumentos posicionales y nombrados
  • Puede cambiar el interval cualquier momento, será efectivo después de la próxima ejecución. ¡Lo mismo para args , kwargs e incluso function !

Simplemente bloquee su ciclo de tiempo en el reloj del sistema. Fácil.

import time starttime=time.time() while True: print "tick" time.sleep(60.0 - ((time.time() - starttime) % 60.0))


Una posible respuesta:

import time t=time.time() while True: if time.time()-t>10: #run your task here t=time.time()


Use el módulo sched , que implementa un programador de eventos de propósito general.

import sched, time s = sched.scheduler(time.time, time.sleep) def do_something(sc): print "Doing stuff..." # do your stuff s.enter(60, 1, do_something, (sc,)) s.enter(60, 1, do_something, (s,)) s.run()


Utilizo el método Tkinter after (), que no "roba el juego" (como el módulo sched que se presentó anteriormente), es decir, permite que otras cosas se ejecuten en paralelo:

import Tkinter def do_something1(): global n1 n1 += 1 if n1 == 6: # (Optional condition) print "* do_something1() is done *"; return # Do your stuff here # ... print "do_something1() "+str(n1) tk.after(1000, do_something1) def do_something2(): global n2 n2 += 1 if n2 == 6: # (Optional condition) print "* do_something2() is done *"; return # Do your stuff here # ... print "do_something2() "+str(n2) tk.after(500, do_something2) tk = Tkinter.Tk(); n1 = 0; n2 = 0 do_something1() do_something2() tk.mainloop()

do_something1() y do_something2() pueden ejecutarse en paralelo y en cualquier velocidad de intervalo. Aquí, el 2º se ejecutará dos veces más rápido. Nótese también que he usado un contador simple como condición para finalizar cualquiera de las funciones. Puede usar cualquier otra contición que quiera o ninguna si tiene una función que ejecutar hasta que termine el programa (por ejemplo, un reloj).


Utilizo esto para causar 60 eventos por hora con la mayoría de los eventos ocurriendo en el mismo número de segundos después de todo el minuto:

import math import time import random TICK = 60 # one minute tick size TICK_TIMING = 59 # execute on 59th second of the tick TICK_MINIMUM = 30 # minimum catch up tick size when lagging def set_timing(): now = time.time() elapsed = now - info[''begin''] minutes = math.floor(elapsed/TICK) tick_elapsed = now - info[''completion_time''] if (info[''tick'']+1) > minutes: wait = max(0,(TICK_TIMING-(time.time() % TICK))) print (''standard wait: %.2f'' % wait) time.sleep(wait) elif tick_elapsed < TICK_MINIMUM: wait = TICK_MINIMUM-tick_elapsed print (''minimum wait: %.2f'' % wait) time.sleep(wait) else: print (''skip set_timing(); no wait'') drift = ((time.time() - info[''begin'']) - info[''tick'']*TICK - TICK_TIMING + info[''begin'']%TICK) print (''drift: %.6f'' % drift) info[''tick''] = 0 info[''begin''] = time.time() info[''completion_time''] = info[''begin''] - TICK while 1: set_timing() print(''hello world'') #random real world event time.sleep(random.random()*TICK_MINIMUM) info[''tick''] += 1 info[''completion_time''] = time.time()

Dependiendo de las condiciones reales, puede obtener marcas de longitud:

60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc.

pero al final de 60 minutos tendrás 60 tics; y la mayoría de ellos ocurrirá en el desplazamiento correcto al minuto que prefiera.

En mi sistema, obtengo una deriva típica de <1/20 de segundo hasta que surge la necesidad de corrección.

La ventaja de este método es la resolución de la deriva del reloj; que puede causar problemas si está haciendo cosas como agregar un artículo por marca y espera que se agreguen 60 elementos por hora. Si no se tiene en cuenta la deriva, las indicaciones secundarias, como los promedios móviles, pueden tener en cuenta los datos demasiado profundos en el pasado, lo que da como resultado una salida defectuosa.


por ejemplo, mostrar la hora local actual

import datetime import glib import logger def get_local_time(): current_time = datetime.datetime.now().strftime("%H:%M") logger.info("get_local_time(): %s",current_time) return str(current_time) def display_local_time(): logger.info("Current time is: %s", get_local_time()) return True # call every minute glib.timeout_add(60*1000, display_local_time)


import time, traceback def every(delay, task): next_time = time.time() + delay while True: time.sleep(max(0, next_time - time.time())) try: task() except Exception: traceback.print_exc() # skip tasks if we are behind schedule: next_time += (time.time() - next_time) // delay * delay + delay def foo(): print("foo", time.time()) every(5, foo)

Si desea hacer esto sin bloquear el código restante, puede usarlo para que se ejecute en su propio hilo:

import threading threading.Thread(target=lambda: every(5, foo)).start()

Esta solución combina varias características que raramente se encuentran combinadas en las otras soluciones:

  • Sin encadenamiento: la implementación común en cadena (para programar el próximo evento) que se encuentra en muchas respuestas es frágil en el aspecto de que si algo sale mal dentro del mecanismo de programación ( threading.Timer o lo que sea), esto terminará la cadena. No se realizarán más ejecuciones, incluso si el motivo del problema ya está solucionado. Un bucle simple y esperar con un simple sleep() es mucho más robusto en comparación.
  • Sin deriva: mi solución mantiene un seguimiento exacto de los tiempos en los que se supone que debe ejecutarse. No hay deriva en función del tiempo de ejecución (como en muchas otras soluciones).
  • Saltos: Mi solución omitirá tareas si una ejecución tomó demasiado tiempo (por ejemplo, hacer X cada cinco segundos, pero X tardó 6 segundos). Este es el comportamiento cron estándar (y por una buena razón). Muchas otras soluciones simplemente ejecutan la tarea varias veces seguidas sin demora. Para la mayoría de los casos (por ejemplo, tareas de limpieza) esto no se desea. Si lo desea, simplemente use next_time += delay lugar.