while tutorial true thread terminar tareas script repetitivos programar programador programacion paso orientada hora for eventos espaƱol ejemplos ejecutar condiciones con como ciclos ciclo cada bucle automatizar python python-3.x scheduled-tasks timing

tutorial - Programe un evento repetitivo en Python 3



while en python 3 (10)

Aquí hay un loop rápido y sucio sin bloqueo con Thread :

#!/usr/bin/env python3 import threading,time def worker(): print(time.time()) time.sleep(5) t = threading.Thread(target=worker) t.start() threads = [] t = threading.Thread(target=worker) threads.append(t) t.start() time.sleep(7) print("Hello World")

No hay nada particularmente especial, el worker crea un nuevo hilo de sí mismo con un retraso. Puede que no sea lo más eficiente, pero lo suficientemente simple. La respuesta de Northtree sería el camino a seguir si necesita una solución más sofisticada.

Y en base a this , podemos hacer lo mismo, solo con Timer :

#!/usr/bin/env python3 import threading,time def hello(): t = threading.Timer(10.0, hello) t.start() print( "hello, world",time.time() ) t = threading.Timer(10.0, hello) t.start() time.sleep(12) print("Oh,hai",time.time()) time.sleep(4) print("How''s it going?",time.time())

Estoy intentando programar un evento repetitivo para ejecutar cada minuto en Python 3.

He visto la clase sched.scheduler pero me pregunto si hay otra forma de hacerlo. He oído mencionar que podría usar múltiples hilos para esto, lo que no me importaría hacer.

Básicamente solicito un poco de JSON y luego lo analizo; su valor cambia con el tiempo

Para usar sched.scheduler tengo que crear un bucle para solicitar que programe la ejecución par para una hora:

scheduler = sched.scheduler(time.time, time.sleep) # Schedule the event. THIS IS UGLY! for i in range(60): scheduler.enter(3600 * i, 1, query_rate_limit, ()) scheduler.run()

¿Qué otras formas de hacer esto hay?


Basado en la respuesta de Alex Martelli, he implementado una versión de decorador que es más fácil de integrar.

import sched import time import datetime from functools import wraps from threading import Thread def async(func): @wraps(func) def async_func(*args, **kwargs): func_hl = Thread(target=func, args=args, kwargs=kwargs) func_hl.start() return func_hl return async_func def schedule(interval): def decorator(func): def periodic(scheduler, interval, action, actionargs=()): scheduler.enter(interval, 1, periodic, (scheduler, interval, action, actionargs)) action(*actionargs) @wraps(func) def wrap(*args, **kwargs): scheduler = sched.scheduler(time.time, time.sleep) periodic(scheduler, interval, func) scheduler.run() return wrap return decorator @async @schedule(1) def periodic_event(): print(datetime.datetime.now()) if __name__ == ''__main__'': print(''start'') periodic_event() print(''end'')


Basado en la respuesta de MestreLion, resuelve un pequeño problema con multihilo:

from threading import Timer, Lock class Periodic(object): """ A periodic task running in threading.Timers """ def __init__(self, interval, function, *args, **kwargs): self._lock = Lock() self._timer = None self.function = function self.interval = interval self.args = args self.kwargs = kwargs self._stopped = True if kwargs.pop(''autostart'', True): self.start() def start(self, from_run=False): self._lock.acquire() if from_run or self._stopped: self._stopped = False self._timer = Timer(self.interval, self._run) self._timer.start() self._lock.release() def _run(self): self.start(from_run=True) self.function(*self.args, **self.kwargs) def stop(self): self._lock.acquire() self._stopped = True self._timer.cancel() self._lock.release()


Mi humilde opinión sobre el tema:

from threading import Timer class RepeatedTimer(object): def __init__(self, interval, function, *args, **kwargs): self._timer = None self.function = function self.interval = interval self.args = args self.kwargs = kwargs self.is_running = False self.start() def _run(self): self.is_running = False self.start() self.function(*self.args, **self.kwargs) def start(self): if not self.is_running: self._timer = Timer(self.interval, self._run) self._timer.start() self.is_running = True def stop(self): self._timer.cancel() self.is_running = False

Uso:

from time import sleep def hello(name): print "Hello %s!" % name print "starting..." rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start() try: sleep(5) # your long-running job goes here... finally: rt.stop() # better in a try/finally block to make sure the program ends!

caracteristicas:

  • Biblioteca estándar solamente, sin dependencias externas
  • Utiliza el patrón sugerido por Alex Martnelli
  • start() y stop() son seguros para llamar varias veces, incluso si el temporizador ya ha comenzado / detenido
  • la función a ser llamada puede tener argumentos posicionales y nombrados
  • Puede cambiar el interval cualquier momento, será efectivo después de la próxima ejecución. ¡Lo mismo para args , kwargs e incluso function !

Podría usar threading.Timer , pero eso también programa un evento único, de manera similar al método .enter de los objetos del planificador.

El patrón normal (en cualquier idioma) para transformar un planificador único en un planificador periódico es hacer que cada evento se vuelva a programar en el intervalo especificado. Por ejemplo, con sched , no usaría un bucle como lo está haciendo, sino algo así como:

def periodic(scheduler, interval, action, actionargs=()): scheduler.enter(interval, 1, periodic, (scheduler, interval, action, actionargs)) action(*actionargs)

e inicie el "programa siempre para siempre" con una llamada

periodic(scheduler, 3600, query_rate_limit)

O bien, podría usar threading.Timer vez de scheduler.enter , pero el patrón es bastante similar.

Si necesita una variación más refinada (por ejemplo, detener la reprogramación periódica en un momento dado o bajo ciertas condiciones), no es demasiado difícil acomodarse con algunos parámetros adicionales.



Puedes usar el schedule Funciona en Python 2.7 y 3.3 y es bastante liviano:

import schedule import time def job(): print("I''m working...") schedule.every(10).minutes.do(job) schedule.every().hour.do(job) schedule.every().day.at("10:30").do(job) while 1: schedule.run_pending() time.sleep(1)


Usa Celery

from celery.task import PeriodicTask from datetime import timedelta class ProcessClicksTask(PeriodicTask): run_every = timedelta(minutes=30) def run(self, **kwargs): #do something


Ver mi muestra

import sched, time def myTask(m,n): print n+'' ''+m def periodic_queue(interval,func,args=(),priority=1): s = sched.scheduler(time.time, time.sleep) periodic_task(s,interval,func,args,priority) s.run() def periodic_task(scheduler,interval,func,args,priority): func(*args) scheduler.enter(interval,priority,periodic_task, (scheduler,interval,func,args,priority)) periodic_queue(1,myTask,(''world'',''hello''))