metodos full example espaƱol asyncio python python-3.x queue python-asyncio

python - full - Equivalente de asyncio.Queues con "hilos" de trabajador



queue python install (3)

Estoy tratando de descubrir cómo portar un programa enhebrado para usar asyncio . Tengo un montón de código que se sincroniza en torno a algunas Queues biblioteca estándar, básicamente así:

import queue, random, threading, time q = queue.Queue() def produce(): while True: time.sleep(0.5 + random.random()) # sleep for .5 - 1.5 seconds q.put(random.random()) def consume(): while True: value = q.get(block=True) print("Consumed", value) threading.Thread(target=produce).start() threading.Thread(target=consume).start()

Un hilo crea valores (posiblemente la entrada del usuario) y otro hilo hace algo con ellos. El punto es que estos hilos están inactivos hasta que haya nuevos datos, momento en el que se despiertan y hacen algo con él.

Estoy tratando de implementar este patrón usando asyncio, pero parece que no puedo encontrar la forma de hacerlo "ir".

Mis intentos parecen más o menos así (y no hago nada en absoluto).

import asyncio, random q = asyncio.Queue() @asyncio.coroutine def produce(): while True: q.put(random.random()) yield from asyncio.sleep(0.5 + random.random()) @asyncio.coroutine def consume(): while True: value = yield from q.get() print("Consumed", value) # do something here to start the coroutines. asyncio.Task()? loop = asyncio.get_event_loop() loop.run_forever()

Probé variaciones en el uso de corutinas, no las uso, envolviendo cosas en tareas, tratando de hacerlas crear o devolver futuros, etc.

Estoy empezando a pensar que tengo una idea equivocada acerca de cómo debería estar usando asyncio (quizás este patrón debería implementarse de una manera diferente de la que no tengo conocimiento). Cualquier indicador sería apreciada.



Sí exactamente. Las tareas son tus amigos:

import asyncio, random q = asyncio.Queue() @asyncio.coroutine def produce(): while True: yield from q.put(random.random()) yield from asyncio.sleep(0.5 + random.random()) @asyncio.coroutine def consume(): while True: value = yield from q.get() print("Consumed", value) loop = asyncio.get_event_loop() loop.create_task(produce()) loop.create_task(consume()) loop.run_forever()

asyncio.ensure_future se puede usar también para la creación de tareas.

Y tenga en cuenta: q.put() es una coroutine , por lo que debe usar yield from q.put(value) .

UPD

Cambió de asyncio.Task() / asyncio.async() a la nueva marca API loop.create_task() y asyncio.ensure_future() en el ejemplo.


Un poco más tarde y tal vez OT, tenga en cuenta que puede consumir desde la Queue de múltiples tareas ya que eran consumidores independientes.

El siguiente fragmento muestra como ejemplo cómo puede lograr el mismo patrón de grupo de subprocesos con tareas de asyncio .

q = asyncio.Queue() async def sum(x): await asyncio.sleep(0.1) # simulates asynchronously return x async def consumer(i): print("Consumer {} started".format(i)) while True: f, x = await q.get() print("Consumer {} procesing {}".format(i, x)) r = await sum(x) f.set_result(r) async def producer(): consumers = [asyncio.ensure_future(consumer(i)) for i in range(5)] loop = asyncio.get_event_loop() tasks = [(asyncio.Future(), x) for x in range(10)] for task in tasks: await q.put(task) # wait until all futures are completed results = await asyncio.gather(*[f for f, _ in tasks]) assert results == [r for _, r in tasks] # destroy tasks for c in consumers: c.cancel() asyncio.get_event_loop().run_until_complete(producer())