python - full - Equivalente de asyncio.Queues con "hilos" de trabajador
queue python install (3)
Estoy tratando de descubrir cómo portar un programa enhebrado para usar asyncio
. Tengo un montón de código que se sincroniza en torno a algunas Queues
biblioteca estándar, básicamente así:
import queue, random, threading, time
q = queue.Queue()
def produce():
while True:
time.sleep(0.5 + random.random()) # sleep for .5 - 1.5 seconds
q.put(random.random())
def consume():
while True:
value = q.get(block=True)
print("Consumed", value)
threading.Thread(target=produce).start()
threading.Thread(target=consume).start()
Un hilo crea valores (posiblemente la entrada del usuario) y otro hilo hace algo con ellos. El punto es que estos hilos están inactivos hasta que haya nuevos datos, momento en el que se despiertan y hacen algo con él.
Estoy tratando de implementar este patrón usando asyncio, pero parece que no puedo encontrar la forma de hacerlo "ir".
Mis intentos parecen más o menos así (y no hago nada en absoluto).
import asyncio, random
q = asyncio.Queue()
@asyncio.coroutine
def produce():
while True:
q.put(random.random())
yield from asyncio.sleep(0.5 + random.random())
@asyncio.coroutine
def consume():
while True:
value = yield from q.get()
print("Consumed", value)
# do something here to start the coroutines. asyncio.Task()?
loop = asyncio.get_event_loop()
loop.run_forever()
Probé variaciones en el uso de corutinas, no las uso, envolviendo cosas en tareas, tratando de hacerlas crear o devolver futuros, etc.
Estoy empezando a pensar que tengo una idea equivocada acerca de cómo debería estar usando asyncio (quizás este patrón debería implementarse de una manera diferente de la que no tengo conocimiento). Cualquier indicador sería apreciada.
Esto es lo que uso en producción, movido a la esencia: https://gist.github.com/thehesiod/7081ab165b9a0d4de2e07d321cc2391d
Sí exactamente. Las tareas son tus amigos:
import asyncio, random
q = asyncio.Queue()
@asyncio.coroutine
def produce():
while True:
yield from q.put(random.random())
yield from asyncio.sleep(0.5 + random.random())
@asyncio.coroutine
def consume():
while True:
value = yield from q.get()
print("Consumed", value)
loop = asyncio.get_event_loop()
loop.create_task(produce())
loop.create_task(consume())
loop.run_forever()
asyncio.ensure_future
se puede usar también para la creación de tareas.
Y tenga en cuenta: q.put()
es una coroutine , por lo que debe usar yield from q.put(value)
.
UPD
Cambió de asyncio.Task()
/ asyncio.async()
a la nueva marca API loop.create_task()
y asyncio.ensure_future()
en el ejemplo.
Un poco más tarde y tal vez OT, tenga en cuenta que puede consumir desde la Queue
de múltiples tareas ya que eran consumidores independientes.
El siguiente fragmento muestra como ejemplo cómo puede lograr el mismo patrón de grupo de subprocesos con tareas de asyncio
.
q = asyncio.Queue()
async def sum(x):
await asyncio.sleep(0.1) # simulates asynchronously
return x
async def consumer(i):
print("Consumer {} started".format(i))
while True:
f, x = await q.get()
print("Consumer {} procesing {}".format(i, x))
r = await sum(x)
f.set_result(r)
async def producer():
consumers = [asyncio.ensure_future(consumer(i)) for i in range(5)]
loop = asyncio.get_event_loop()
tasks = [(asyncio.Future(), x) for x in range(10)]
for task in tasks:
await q.put(task)
# wait until all futures are completed
results = await asyncio.gather(*[f for f, _ in tasks])
assert results == [r for _, r in tasks]
# destroy tasks
for c in consumers:
c.cancel()
asyncio.get_event_loop().run_until_complete(producer())