iteradores generadores generadoras funciones español python generator coroutine

generadores - python funciones generadoras



¿Convierte las funciones con devolución de llamada en generadores de Python? (4)

Concepto Utilice una cola de bloqueo con maxsize=1 y un modelo de productor / consumidor.

La devolución de llamada produce, luego la siguiente llamada a la devolución de llamada se bloqueará en la cola completa.

El consumidor luego obtiene el valor de la cola, intenta obtener otro valor y bloquea en la lectura.

El productor está autorizado para empujar a la cola, enjuagar y repetir.

Uso:

def dummy(func, arg, callback=None): for i in range(100): callback(func(arg+i)) # Dummy example: for i in Iteratorize(dummy, lambda x: x+1, 0): print(i) # example with scipy: for i in Iteratorize(scipy.optimize.fmin, func, x0): print(i)

Puede usarse como se espera para un iterador:

for i in take(5, Iteratorize(dummy, lambda x: x+1, 0)): print(i)

Iteratorizar la clase:

from thread import start_new_thread from Queue import Queue class Iteratorize: """ Transforms a function that takes a callback into a lazy iterator (generator). """ def __init__(self, func, ifunc, arg, callback=None): self.mfunc=func self.ifunc=ifunc self.c_callback=callback self.q = Queue(maxsize=1) self.stored_arg=arg self.sentinel = object() def _callback(val): self.q.put(val) def gentask(): ret = self.mfunc(self.ifunc, self.stored_arg, callback=_callback) self.q.put(self.sentinel) if self.c_callback: self.c_callback(ret) start_new_thread(gentask, ()) def __iter__(self): return self def next(self): obj = self.q.get(True,None) if obj is self.sentinel: raise StopIteration else: return obj

Probablemente se puede hacer algo de limpieza para aceptar *args y **kwargs para la función que se está envolviendo y / o la devolución de llamada del resultado final.

La función de minimización Scipy (solo para usar como ejemplo), tiene la opción de agregar una función de devolución de llamada en cada paso. Así que puedo hacer algo como

def my_callback(x): print x scipy.optimize.fmin(func, x0, callback=my_callback)

¿Hay alguna forma de usar la función de devolución de llamada para crear una versión generadora de fmin, para que yo pueda hacerlo?

for x in my_fmin(func,x0): print x

Parece que podría ser posible con una combinación de rendimientos y envíos, pero puedo pensar en cualquier cosa.


Generador como coroutine (sin roscado).

Tengamos FakeFtp con la función retrbinary utilizando callback con cada lectura exitosa de una porción de datos:

class FakeFtp(object): def __init__(self): self.data = iter(["aaa", "bbb", "ccc", "ddd"]) def login(self, user, password): self.user = user self.password = password def retrbinary(self, cmd, cb): for chunk in self.data: cb(chunk)

El uso de la función de devolución de llamada simple tiene la desventaja de que se llama repetidamente y la función de devolución de llamada no puede mantener fácilmente el contexto entre las llamadas.

El siguiente código define process_chunks generator, que podrá recibir fragmentos de datos uno por uno y procesarlos. En contraste con la simple devolución de llamada, aquí podemos mantener todo el procesamiento dentro de una función sin perder el contexto.

from contextlib import closing from itertools import count def main(): processed = [] def process_chunks(): for i in count(): try: # (repeatedly) get the chunk to process chunk = yield except GeneratorExit: # finish_up print("Finishing up.") return else: # Here process the chunk as you like print("inside coroutine, processing chunk:", i, chunk) product = "processed({i}): {chunk}".format(i=i, chunk=chunk) processed.append(product) with closing(process_chunks()) as coroutine: # Get the coroutine to the first yield coroutine.next() ftp = FakeFtp() # next line repeatedly calls `coroutine.send(data)` ftp.retrbinary("RETR binary", cb=coroutine.send) # each callback "jumps" to `yield` line in `process_chunks` print("processed result", processed) print("DONE")

Para ver el código en acción, ponga la clase FakeFtp , el código que se muestra arriba y la siguiente línea:

main()

en un archivo y llámalo:

$ python headsandtails.py (''inside coroutine, processing chunk:'', 0, ''aaa'') (''inside coroutine, processing chunk:'', 1, ''bbb'') (''inside coroutine, processing chunk:'', 2, ''ccc'') (''inside coroutine, processing chunk:'', 3, ''ddd'') Finishing up. (''processed result'', [''processed(0): aaa'', ''processed(1): bbb'', ''processed(2): ccc'', ''processed(3): ddd'']) DONE

Cómo funciona

processed = [] está aquí solo para mostrar, el proceso process_chunks no tendrá problemas para cooperar con su contexto externo. Todo está envuelto en def main(): para probar, no hay necesidad de usar variables globales.

def process_chunks() es el núcleo de la solución. Puede tener parámetros de entrada de un disparo (que no se usan aquí), pero el punto principal, donde recibe la entrada, es que cada línea de yield devuelve lo que cualquiera envía a través de .send(data) a la instancia de este generador. Se puede realizar una coroutine.send(chunk) pero en este ejemplo se realiza a través de la devolución de llamada haciendo referencia a esta función callback.send .

Tenga en cuenta que en la solución real no hay problema en tener varios yield en el código, se procesan uno por uno. Esto se puede usar, por ejemplo, para leer (e ignorar) el encabezado del archivo CSV y luego continuar procesando registros con datos.

Podríamos instanciar y usar el generador de la siguiente manera:

coroutine = process_chunks() # Get the coroutine to the first yield coroutine.next() ftp = FakeFtp() # next line repeatedly calls `coroutine.send(data)` ftp.retrbinary("RETR binary", cb=coroutine.send) # each callback "jumps" to `yield` line in `process_chunks` # close the coroutine (will throw the `GeneratorExit` exception into the # `process_chunks` coroutine). coroutine.close()

El código real está utilizando el contextlib contexto de closing contextlib para garantizar que siempre se llame a coroutine.close() .

Conclusiones

Esta solución no proporciona el tipo de iterador para consumir datos en el estilo tradicional "desde afuera". Por otro lado, somos capaces de:

  • usar el generador "desde adentro"
  • mantener todo el procesamiento iterativo dentro de una función sin ser interrumpido entre devoluciones de llamada
  • opcionalmente usar contexto externo
  • proporcionar resultados utilizables al exterior
  • Todo esto se puede hacer sin utilizar enhebrado.

Créditos : La solución está muy inspirada en la SO SOY iterador de FTP de Python FTP (sin cargar el archivo completo en la memoria) escrito por el usuario2357112


Como se señaló en los comentarios, podría hacerlo en un nuevo hilo, utilizando la Queue . El inconveniente es que todavía necesitaría alguna forma de acceder al resultado final (lo que devuelve fmin al final). Mi ejemplo a continuación utiliza una devolución de llamada opcional para hacer algo con él (otra opción sería simplemente producirlo también, aunque su código de llamada tendría que diferenciar entre los resultados de iteración y los resultados finales):

from thread import start_new_thread from Queue import Queue def my_fmin(func, x0, end_callback=(lambda x:x), timeout=None): q = Queue() # fmin produces, the generator consumes job_done = object() # signals the processing is done # Producer def my_callback(x): q.put(x) def task(): ret = scipy.optimize.fmin(func,x0,callback=my_callback) q.put(job_done) end_callback(ret) # "Returns" the result of the main call # Starts fmin in a new thread start_new_thread(task,()) # Consumer while True: next_item = q.get(True,timeout) # Blocks until an input is available if next_item is job_done: break yield next_item

Actualización: para bloquear la ejecución de la siguiente iteración hasta que el consumidor haya terminado de procesar la última, también es necesario usar task_done y join .

# Producer def my_callback(x): q.put(x) q.join() # Blocks until task_done is called # Consumer while True: next_item = q.get(True,timeout) # Blocks until an input is available if next_item is job_done: break yield next_item q.task_done() # Unblocks the producer, so a new iteration can start

Tenga en cuenta que maxsize=1 no es necesario, ya que no se agregará ningún elemento nuevo a la cola hasta que se consuma el último.

Actualización 2: también tenga en cuenta que, a menos que todos los elementos sean recuperados finalmente por este generador, el subproceso creado se bloqueará (se bloqueará para siempre y sus recursos nunca se liberarán). El productor está esperando en la cola, y dado que almacena una referencia a esa cola, nunca será reclamado por el CG incluso si el consumidor lo está. La cola se volverá inalcanzable, por lo que nadie podrá liberar el bloqueo.

Una solución limpia para eso es desconocida, si es posible (ya que dependería de la función particular utilizada en lugar de fmin ). Se podría hacer una solución temporal utilizando el timeout , ya que el productor genera una excepción si put bloques por demasiado tiempo:

q = Queue(maxsize=1) # Producer def my_callback(x): q.put(x) q.put("dummy",True,timeout) # Blocks until the first result is retrieved q.join() # Blocks again until task_done is called # Consumer while True: next_item = q.get(True,timeout) # Blocks until an input is available q.task_done() # (one "task_done" per "get") if next_item is job_done: break yield next_item q.get() # Retrieves the "dummy" object (must be after yield) q.task_done() # Unblocks the producer, so a new iteration can start


Qué tal si

data = [] scipy.optimize.fmin(func,x0,callback=data.append) for line in data: print line

Si no, ¿qué es exactamente lo que quieres hacer con los datos del generador?