tutorial - Programe un evento repetitivo en Python 3
while en python 3 (10)
Aquí hay un loop rápido y sucio sin bloqueo con Thread
:
#!/usr/bin/env python3
import threading,time
def worker():
print(time.time())
time.sleep(5)
t = threading.Thread(target=worker)
t.start()
threads = []
t = threading.Thread(target=worker)
threads.append(t)
t.start()
time.sleep(7)
print("Hello World")
No hay nada particularmente especial, el worker
crea un nuevo hilo de sí mismo con un retraso. Puede que no sea lo más eficiente, pero lo suficientemente simple. La respuesta de Northtree sería el camino a seguir si necesita una solución más sofisticada.
Y en base a this , podemos hacer lo mismo, solo con Timer
:
#!/usr/bin/env python3
import threading,time
def hello():
t = threading.Timer(10.0, hello)
t.start()
print( "hello, world",time.time() )
t = threading.Timer(10.0, hello)
t.start()
time.sleep(12)
print("Oh,hai",time.time())
time.sleep(4)
print("How''s it going?",time.time())
Estoy intentando programar un evento repetitivo para ejecutar cada minuto en Python 3.
He visto la clase sched.scheduler
pero me pregunto si hay otra forma de hacerlo. He oído mencionar que podría usar múltiples hilos para esto, lo que no me importaría hacer.
Básicamente solicito un poco de JSON y luego lo analizo; su valor cambia con el tiempo
Para usar sched.scheduler
tengo que crear un bucle para solicitar que programe la ejecución par para una hora:
scheduler = sched.scheduler(time.time, time.sleep)
# Schedule the event. THIS IS UGLY!
for i in range(60):
scheduler.enter(3600 * i, 1, query_rate_limit, ())
scheduler.run()
¿Qué otras formas de hacer esto hay?
Basado en la respuesta de Alex Martelli, he implementado una versión de decorador que es más fácil de integrar.
import sched
import time
import datetime
from functools import wraps
from threading import Thread
def async(func):
@wraps(func)
def async_func(*args, **kwargs):
func_hl = Thread(target=func, args=args, kwargs=kwargs)
func_hl.start()
return func_hl
return async_func
def schedule(interval):
def decorator(func):
def periodic(scheduler, interval, action, actionargs=()):
scheduler.enter(interval, 1, periodic,
(scheduler, interval, action, actionargs))
action(*actionargs)
@wraps(func)
def wrap(*args, **kwargs):
scheduler = sched.scheduler(time.time, time.sleep)
periodic(scheduler, interval, func)
scheduler.run()
return wrap
return decorator
@async
@schedule(1)
def periodic_event():
print(datetime.datetime.now())
if __name__ == ''__main__'':
print(''start'')
periodic_event()
print(''end'')
Basado en la respuesta de MestreLion, resuelve un pequeño problema con multihilo:
from threading import Timer, Lock
class Periodic(object):
"""
A periodic task running in threading.Timers
"""
def __init__(self, interval, function, *args, **kwargs):
self._lock = Lock()
self._timer = None
self.function = function
self.interval = interval
self.args = args
self.kwargs = kwargs
self._stopped = True
if kwargs.pop(''autostart'', True):
self.start()
def start(self, from_run=False):
self._lock.acquire()
if from_run or self._stopped:
self._stopped = False
self._timer = Timer(self.interval, self._run)
self._timer.start()
self._lock.release()
def _run(self):
self.start(from_run=True)
self.function(*self.args, **self.kwargs)
def stop(self):
self._lock.acquire()
self._stopped = True
self._timer.cancel()
self._lock.release()
Mi humilde opinión sobre el tema:
from threading import Timer
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.function = function
self.interval = interval
self.args = args
self.kwargs = kwargs
self.is_running = False
self.start()
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
self._timer = Timer(self.interval, self._run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False
Uso:
from time import sleep
def hello(name):
print "Hello %s!" % name
print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
sleep(5) # your long-running job goes here...
finally:
rt.stop() # better in a try/finally block to make sure the program ends!
caracteristicas:
- Biblioteca estándar solamente, sin dependencias externas
- Utiliza el patrón sugerido por Alex Martnelli
-
start()
ystop()
son seguros para llamar varias veces, incluso si el temporizador ya ha comenzado / detenido - la función a ser llamada puede tener argumentos posicionales y nombrados
- Puede cambiar el
interval
cualquier momento, será efectivo después de la próxima ejecución. ¡Lo mismo paraargs
,kwargs
e inclusofunction
!
Podría usar threading.Timer
, pero eso también programa un evento único, de manera similar al método .enter
de los objetos del planificador.
El patrón normal (en cualquier idioma) para transformar un planificador único en un planificador periódico es hacer que cada evento se vuelva a programar en el intervalo especificado. Por ejemplo, con sched
, no usaría un bucle como lo está haciendo, sino algo así como:
def periodic(scheduler, interval, action, actionargs=()):
scheduler.enter(interval, 1, periodic,
(scheduler, interval, action, actionargs))
action(*actionargs)
e inicie el "programa siempre para siempre" con una llamada
periodic(scheduler, 3600, query_rate_limit)
O bien, podría usar threading.Timer
vez de scheduler.enter
, pero el patrón es bastante similar.
Si necesita una variación más refinada (por ejemplo, detener la reprogramación periódica en un momento dado o bajo ciertas condiciones), no es demasiado difícil acomodarse con algunos parámetros adicionales.
Podría usar el Programador avanzado de Python . Incluso tiene una interfaz tipo cron.
Puedes usar el schedule Funciona en Python 2.7 y 3.3 y es bastante liviano:
import schedule
import time
def job():
print("I''m working...")
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
while 1:
schedule.run_pending()
time.sleep(1)
Usa Celery
from celery.task import PeriodicTask
from datetime import timedelta
class ProcessClicksTask(PeriodicTask):
run_every = timedelta(minutes=30)
def run(self, **kwargs):
#do something
Ver mi muestra
import sched, time
def myTask(m,n):
print n+'' ''+m
def periodic_queue(interval,func,args=(),priority=1):
s = sched.scheduler(time.time, time.sleep)
periodic_task(s,interval,func,args,priority)
s.run()
def periodic_task(scheduler,interval,func,args,priority):
func(*args)
scheduler.enter(interval,priority,periodic_task,
(scheduler,interval,func,args,priority))
periodic_queue(1,myTask,(''world'',''hello''))
task-scheduler es un programador en proceso para organizar y ejecutar la tarea periódicamente según el archivo de configuración YAML. Por favor, consulte https://github.com/tzutalin/task-scheduler