make - Llamada a un método asíncrono en Python?
process async python (12)
Me preguntaba si hay alguna biblioteca para llamadas a métodos asíncronos en Python . Sería genial si pudieras hacer algo como
@async
def longComputation():
<code>
token = longComputation()
token.registerCallback(callback_function)
# alternative, polling
while not token.finished():
doSomethingElse()
if token.finished():
result = token.result()
O para llamar a una rutina no asincrónica asincrónicamente
def longComputation()
<code>
token = asynccall(longComputation())
Sería genial tener una estrategia más refinada como nativa en el núcleo del lenguaje. ¿Esto fue considerado?
¿Hay alguna razón para no usar hilos? Puedes usar la clase de threading
. En lugar de la función finished()
use isAlive()
. La función result()
podría join()
el hilo y recuperar el resultado. Y, si puede, anule las run()
y __init__
para llamar a la función especificada en el constructor y guarde el valor en alguna parte de la instancia de la clase.
A partir de Python 3.5, puede usar generadores mejorados para funciones asíncronas.
import asyncio
import datetime
Sintaxis mejorada del generador:
@asyncio.coroutine
def display_date(loop):
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
yield from asyncio.sleep(1)
loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()
Nueva sintaxis async/await
:
async def display_date(loop):
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(1)
loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()
Algo como esto funciona para mí, puede llamar a la función y se enviará a un nuevo hilo.
from thread import start_new_thread
def dowork(asynchronous=True):
if asynchronous:
args = (False)
start_new_thread(dowork,args) #Call itself on a new thread.
else:
while True:
#do something...
time.sleep(60) #sleep for a minute
return
Algo como:
import threading
thr = threading.Thread(target=foo, args=(), kwargs={})
thr.start() # Will run "foo"
....
thr.is_alive() # Will return whether foo is running currently
....
thr.join() # Will wait till "foo" is done
Consulte la documentación en https://docs.python.org/2/library/threading.html#module-threading para obtener más detalles; este código debería funcionar también para Python 3.
Mi solución es:
import threading
class TimeoutError(RuntimeError):
pass
class AsyncCall(object):
def __init__(self, fnc, callback = None):
self.Callable = fnc
self.Callback = callback
def __call__(self, *args, **kwargs):
self.Thread = threading.Thread(target = self.run, name = self.Callable.__name__, args = args, kwargs = kwargs)
self.Thread.start()
return self
def wait(self, timeout = None):
self.Thread.join(timeout)
if self.Thread.isAlive():
raise TimeoutError()
else:
return self.Result
def run(self, *args, **kwargs):
self.Result = self.Callable(*args, **kwargs)
if self.Callback:
self.Callback(self.Result)
class AsyncMethod(object):
def __init__(self, fnc, callback=None):
self.Callable = fnc
self.Callback = callback
def __call__(self, *args, **kwargs):
return AsyncCall(self.Callable, self.Callback)(*args, **kwargs)
def Async(fnc = None, callback = None):
if fnc == None:
def AddAsyncCallback(fnc):
return AsyncMethod(fnc, callback)
return AddAsyncCallback
else:
return AsyncMethod(fnc, callback)
Y funciona exactamente como se solicita:
@Async
def fnc():
pass
No está en el núcleo del lenguaje, pero una biblioteca muy madura que hace lo que quieres es Twisted . Introduce el objeto Diferido, al que puede adjuntar devoluciones de llamada o manejadores de errores ("errores") a. A Diferido es básicamente una "promesa" de que una función tendrá un resultado eventual.
Podría usar eventlet. Te permite escribir lo que parece ser un código sincrónico, pero que funcione de forma asíncrona a través de la red.
Aquí hay un ejemplo de un rastreador supermínimo:
urls = ["http://www.google.com/intl/en_ALL/images/logo.gif",
"https://wiki.secondlife.com/w/images/secondlife.jpg",
"http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"]
import eventlet
from eventlet.green import urllib2
def fetch(url):
return urllib2.urlopen(url).read()
pool = eventlet.GreenPool()
for body in pool.imap(fetch, urls):
print "got body", len(body)
Puede implementar un decorador para que sus funciones sean asincrónicas, aunque eso es un poco complicado. El módulo de multiprocessing
está lleno de pequeños caprichos y restricciones aparentemente arbitrarias, una razón más para encapsularlo detrás de una interfaz amigable.
from inspect import getmodule
from multiprocessing import Pool
def async(decorated):
r''''''Wraps a top-level function around an asynchronous dispatcher.
when the decorated function is called, a task is submitted to a
process pool, and a future object is returned, providing access to an
eventual return value.
The future object has a blocking get() method to access the task
result: it will return immediately if the job is already done, or block
until it completes.
This decorator won''t work on methods, due to limitations in Python''s
pickling machinery (in principle methods could be made pickleable, but
good luck on that).
''''''
# Keeps the original function visible from the module global namespace,
# under a name consistent to its __name__ attribute. This is necessary for
# the multiprocessing pickling machinery to work properly.
module = getmodule(decorated)
decorated.__name__ += ''_original''
setattr(module, decorated.__name__, decorated)
def send(*args, **opts):
return async.pool.apply_async(decorated, args, opts)
return send
El siguiente código ilustra el uso del decorador:
@async
def printsum(uid, values):
summed = 0
for value in values:
summed += value
print("Worker %i: sum value is %i" % (uid, summed))
return (uid, summed)
if __name__ == ''__main__'':
from random import sample
# The process pool must be created inside __main__.
async.pool = Pool(4)
p = range(0, 1000)
results = []
for i in range(4):
result = printsum(i, sample(p, 100))
results.append(result)
for result in results:
print("Worker %i: sum value is %i" % result.get())
En un caso del mundo real, elaboraría un poco más sobre el decorador, proporcionando alguna forma de desactivarlo para la depuración (manteniendo la futura interfaz en su lugar), o tal vez una facilidad para tratar con excepciones; pero creo que esto demuestra el principio bastante bien.
Puede usar concurrent.futures (agregado en Python 3.2).
import time
from concurrent.futures import ThreadPoolExecutor
def long_computation(duration):
for x in range(0, duration):
print(x)
time.sleep(1)
return duration * 2
print(''Use polling'')
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(long_computation, 5)
while not future.done():
print(''waiting...'')
time.sleep(0.5)
print(future.result())
print(''Use callback'')
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(long_computation, 5)
future.add_done_callback(lambda f: print(f.result()))
print(''waiting for callback'')
executor.shutdown(False) # non-blocking
print(''shutdown invoked'')
Puede usar el módulo de multiprocesamiento agregado en Python 2.6. Puede usar pools de procesos y luego obtener resultados de forma asincrónica con:
apply_async(func[, args[, kwds[, callback]]])
P.ej:
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == ''__main__'':
pool = Pool(processes=1) # Start a worker processes.
result = pool.apply_async(f, [10], callback) # Evaluate "f(10)" asynchronously calling callback when finished.
Esta es solo una alternativa. Este módulo proporciona muchas instalaciones para lograr lo que desea. También será muy fácil hacer un decorador a partir de esto.
Puedes usar el proceso. Si desea ejecutarlo por siempre, use mientras (como la red) en su función:
from multiprocessing import Process
def foo():
while 1:
# Do something
p = Process(target = foo)
p.start()
si solo quieres ejecutarlo una vez, hazlo así:
from multiprocessing import Process
def foo():
# Do something
p = Process(target = foo)
p.start()
p.join()
Sólo
import threading, time
def f():
print "f started"
time.sleep(3)
print "f finished"
threading.Thread(target=f).start()