temporizador - python run function every x seconds
¿Cuál es la mejor manera de ejecutar repetidamente una función cada x segundos en Python? (13)
Quiero ejecutar repetidamente una función en Python cada 60 segundos para siempre (como un NSTimer en Objective C). Este código se ejecutará como un daemon y es efectivamente como llamar al script python cada minuto usando un cron, pero sin requerir que sea configurado por el usuario.
En esta pregunta sobre un cron implementado en Python , la solución parece efectivamente solo sleep() durante x segundos. No necesito una funcionalidad tan avanzada así que quizás algo así funcionaría
while True:
# Code executed here
time.sleep(60)
¿Hay algún problema previsible con este código?
Aquí hay una actualización del código de MestreLion que evita drifiting en el tiempo:
import threading
import time
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
self.next_call = time.time()
self.start()
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
self.next_call += self.interval
self._timer = threading.Timer(self.next_call - time.time(), self._run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False
Es posible que desee considerar Twisted que es una biblioteca de red python que implementa el Patrón de Reactor .
from twisted.internet import task
from twisted.internet import reactor
timeout = 60.0 # Sixty seconds
def doWork():
#do work here
pass
l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds
reactor.run()
Si bien "while True: sleep (60)" probablemente funcionará Twisted probablemente ya implemente muchas de las características que eventualmente necesitará (daemonización, registro o manejo de excepciones según lo señalado por bobince) y probablemente sea una solución más robusta
La manera más fácil que creo ser:
import time
def executeSomething():
#code here
time.sleep(60)
while True:
executeSomething()
De esta forma, se ejecuta el código, luego espera 60 segundos, luego se ejecuta nuevamente, espera, se ejecuta, etc. No es necesario complicar las cosas: D
La principal diferencia entre eso y cron es que una excepción matará al daemon para siempre. Es posible que desee envolver con un receptor y registrador de excepciones.
Me enfrenté a un problema similar hace algún tiempo. ¿Puede ser http://cronus.readthedocs.org podría ayudar?
Para v0.2, el siguiente fragmento funciona
import cronus.beat as beat
beat.set_rate(2) # 2 Hz
while beat.true():
# do some time consuming work here
beat.sleep() # total loop duration would be 0.5 sec
Si quieres una forma no bloqueante para ejecutar tu función periódicamente, en lugar de bloquear un ciclo infinito, usaría un temporizador con rosca. De esta forma, su código puede seguir ejecutándose y realizar otras tareas y aún así tener su función llamada cada n segundos. Utilizo esta técnica mucho para imprimir información de progreso en tareas largas, intensivas de CPU / Disco / Red.
Este es el código que publiqué en una pregunta similar, con el control de inicio () y detención ():
from threading import Timer
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
self.start()
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
self._timer = Timer(self.interval, self._run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False
Uso:
from time import sleep
def hello(name):
print "Hello %s!" % name
print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
sleep(5) # your long-running job goes here...
finally:
rt.stop() # better in a try/finally block to make sure the program ends!
caracteristicas:
- Biblioteca estándar solamente, sin dependencias externas
-
start()
ystop()
son seguros para llamar varias veces, incluso si el temporizador ya ha comenzado / detenido - la función a ser llamada puede tener argumentos posicionales y nombrados
- Puede cambiar el
interval
cualquier momento, será efectivo después de la próxima ejecución. ¡Lo mismo paraargs
,kwargs
e inclusofunction
!
Simplemente bloquee su ciclo de tiempo en el reloj del sistema. Fácil.
import time
starttime=time.time()
while True:
print "tick"
time.sleep(60.0 - ((time.time() - starttime) % 60.0))
Una posible respuesta:
import time
t=time.time()
while True:
if time.time()-t>10:
#run your task here
t=time.time()
Use el módulo sched , que implementa un programador de eventos de propósito general.
import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc):
print "Doing stuff..."
# do your stuff
s.enter(60, 1, do_something, (sc,))
s.enter(60, 1, do_something, (s,))
s.run()
Utilizo el método Tkinter after (), que no "roba el juego" (como el módulo sched que se presentó anteriormente), es decir, permite que otras cosas se ejecuten en paralelo:
import Tkinter
def do_something1():
global n1
n1 += 1
if n1 == 6: # (Optional condition)
print "* do_something1() is done *"; return
# Do your stuff here
# ...
print "do_something1() "+str(n1)
tk.after(1000, do_something1)
def do_something2():
global n2
n2 += 1
if n2 == 6: # (Optional condition)
print "* do_something2() is done *"; return
# Do your stuff here
# ...
print "do_something2() "+str(n2)
tk.after(500, do_something2)
tk = Tkinter.Tk();
n1 = 0; n2 = 0
do_something1()
do_something2()
tk.mainloop()
do_something1()
y do_something2()
pueden ejecutarse en paralelo y en cualquier velocidad de intervalo. Aquí, el 2º se ejecutará dos veces más rápido. Nótese también que he usado un contador simple como condición para finalizar cualquiera de las funciones. Puede usar cualquier otra contición que quiera o ninguna si tiene una función que ejecutar hasta que termine el programa (por ejemplo, un reloj).
Utilizo esto para causar 60 eventos por hora con la mayoría de los eventos ocurriendo en el mismo número de segundos después de todo el minuto:
import math
import time
import random
TICK = 60 # one minute tick size
TICK_TIMING = 59 # execute on 59th second of the tick
TICK_MINIMUM = 30 # minimum catch up tick size when lagging
def set_timing():
now = time.time()
elapsed = now - info[''begin'']
minutes = math.floor(elapsed/TICK)
tick_elapsed = now - info[''completion_time'']
if (info[''tick'']+1) > minutes:
wait = max(0,(TICK_TIMING-(time.time() % TICK)))
print (''standard wait: %.2f'' % wait)
time.sleep(wait)
elif tick_elapsed < TICK_MINIMUM:
wait = TICK_MINIMUM-tick_elapsed
print (''minimum wait: %.2f'' % wait)
time.sleep(wait)
else:
print (''skip set_timing(); no wait'')
drift = ((time.time() - info[''begin'']) - info[''tick'']*TICK -
TICK_TIMING + info[''begin'']%TICK)
print (''drift: %.6f'' % drift)
info[''tick''] = 0
info[''begin''] = time.time()
info[''completion_time''] = info[''begin''] - TICK
while 1:
set_timing()
print(''hello world'')
#random real world event
time.sleep(random.random()*TICK_MINIMUM)
info[''tick''] += 1
info[''completion_time''] = time.time()
Dependiendo de las condiciones reales, puede obtener marcas de longitud:
60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc.
pero al final de 60 minutos tendrás 60 tics; y la mayoría de ellos ocurrirá en el desplazamiento correcto al minuto que prefiera.
En mi sistema, obtengo una deriva típica de <1/20 de segundo hasta que surge la necesidad de corrección.
La ventaja de este método es la resolución de la deriva del reloj; que puede causar problemas si está haciendo cosas como agregar un artículo por marca y espera que se agreguen 60 elementos por hora. Si no se tiene en cuenta la deriva, las indicaciones secundarias, como los promedios móviles, pueden tener en cuenta los datos demasiado profundos en el pasado, lo que da como resultado una salida defectuosa.
por ejemplo, mostrar la hora local actual
import datetime
import glib
import logger
def get_local_time():
current_time = datetime.datetime.now().strftime("%H:%M")
logger.info("get_local_time(): %s",current_time)
return str(current_time)
def display_local_time():
logger.info("Current time is: %s", get_local_time())
return True
# call every minute
glib.timeout_add(60*1000, display_local_time)
import time, traceback
def every(delay, task):
next_time = time.time() + delay
while True:
time.sleep(max(0, next_time - time.time()))
try:
task()
except Exception:
traceback.print_exc()
# skip tasks if we are behind schedule:
next_time += (time.time() - next_time) // delay * delay + delay
def foo():
print("foo", time.time())
every(5, foo)
Si desea hacer esto sin bloquear el código restante, puede usarlo para que se ejecute en su propio hilo:
import threading
threading.Thread(target=lambda: every(5, foo)).start()
Esta solución combina varias características que raramente se encuentran combinadas en las otras soluciones:
- Sin encadenamiento: la implementación común en cadena (para programar el próximo evento) que se encuentra en muchas respuestas es frágil en el aspecto de que si algo sale mal dentro del mecanismo de programación (
threading.Timer
o lo que sea), esto terminará la cadena. No se realizarán más ejecuciones, incluso si el motivo del problema ya está solucionado. Un bucle simple y esperar con un simplesleep()
es mucho más robusto en comparación. - Sin deriva: mi solución mantiene un seguimiento exacto de los tiempos en los que se supone que debe ejecutarse. No hay deriva en función del tiempo de ejecución (como en muchas otras soluciones).
- Saltos: Mi solución omitirá tareas si una ejecución tomó demasiado tiempo (por ejemplo, hacer X cada cinco segundos, pero X tardó 6 segundos). Este es el comportamiento cron estándar (y por una buena razón). Muchas otras soluciones simplemente ejecutan la tarea varias veces seguidas sin demora. Para la mayoría de los casos (por ejemplo, tareas de limpieza) esto no se desea. Si lo desea, simplemente use
next_time += delay
lugar.