generadores - python funciones generadoras
¿Convierte las funciones con devolución de llamada en generadores de Python? (4)
Concepto Utilice una cola de bloqueo con
maxsize=1
y un modelo de productor / consumidor.
La devolución de llamada produce, luego la siguiente llamada a la devolución de llamada se bloqueará en la cola completa.
El consumidor luego obtiene el valor de la cola, intenta obtener otro valor y bloquea en la lectura.
El productor está autorizado para empujar a la cola, enjuagar y repetir.
Uso:
def dummy(func, arg, callback=None):
for i in range(100):
callback(func(arg+i))
# Dummy example:
for i in Iteratorize(dummy, lambda x: x+1, 0):
print(i)
# example with scipy:
for i in Iteratorize(scipy.optimize.fmin, func, x0):
print(i)
Puede usarse como se espera para un iterador:
for i in take(5, Iteratorize(dummy, lambda x: x+1, 0)):
print(i)
Iteratorizar la clase:
from thread import start_new_thread
from Queue import Queue
class Iteratorize:
"""
Transforms a function that takes a callback
into a lazy iterator (generator).
"""
def __init__(self, func, ifunc, arg, callback=None):
self.mfunc=func
self.ifunc=ifunc
self.c_callback=callback
self.q = Queue(maxsize=1)
self.stored_arg=arg
self.sentinel = object()
def _callback(val):
self.q.put(val)
def gentask():
ret = self.mfunc(self.ifunc, self.stored_arg, callback=_callback)
self.q.put(self.sentinel)
if self.c_callback:
self.c_callback(ret)
start_new_thread(gentask, ())
def __iter__(self):
return self
def next(self):
obj = self.q.get(True,None)
if obj is self.sentinel:
raise StopIteration
else:
return obj
Probablemente se puede hacer algo de limpieza para aceptar *args
y **kwargs
para la función que se está envolviendo y / o la devolución de llamada del resultado final.
La función de minimización Scipy (solo para usar como ejemplo), tiene la opción de agregar una función de devolución de llamada en cada paso. Así que puedo hacer algo como
def my_callback(x):
print x
scipy.optimize.fmin(func, x0, callback=my_callback)
¿Hay alguna forma de usar la función de devolución de llamada para crear una versión generadora de fmin, para que yo pueda hacerlo?
for x in my_fmin(func,x0):
print x
Parece que podría ser posible con una combinación de rendimientos y envíos, pero puedo pensar en cualquier cosa.
Generador como coroutine (sin roscado).
Tengamos FakeFtp
con la función retrbinary
utilizando callback con cada lectura exitosa de una porción de datos:
class FakeFtp(object):
def __init__(self):
self.data = iter(["aaa", "bbb", "ccc", "ddd"])
def login(self, user, password):
self.user = user
self.password = password
def retrbinary(self, cmd, cb):
for chunk in self.data:
cb(chunk)
El uso de la función de devolución de llamada simple tiene la desventaja de que se llama repetidamente y la función de devolución de llamada no puede mantener fácilmente el contexto entre las llamadas.
El siguiente código define process_chunks
generator, que podrá recibir fragmentos de datos uno por uno y procesarlos. En contraste con la simple devolución de llamada, aquí podemos mantener todo el procesamiento dentro de una función sin perder el contexto.
from contextlib import closing
from itertools import count
def main():
processed = []
def process_chunks():
for i in count():
try:
# (repeatedly) get the chunk to process
chunk = yield
except GeneratorExit:
# finish_up
print("Finishing up.")
return
else:
# Here process the chunk as you like
print("inside coroutine, processing chunk:", i, chunk)
product = "processed({i}): {chunk}".format(i=i, chunk=chunk)
processed.append(product)
with closing(process_chunks()) as coroutine:
# Get the coroutine to the first yield
coroutine.next()
ftp = FakeFtp()
# next line repeatedly calls `coroutine.send(data)`
ftp.retrbinary("RETR binary", cb=coroutine.send)
# each callback "jumps" to `yield` line in `process_chunks`
print("processed result", processed)
print("DONE")
Para ver el código en acción, ponga la clase FakeFtp
, el código que se muestra arriba y la siguiente línea:
main()
en un archivo y llámalo:
$ python headsandtails.py
(''inside coroutine, processing chunk:'', 0, ''aaa'')
(''inside coroutine, processing chunk:'', 1, ''bbb'')
(''inside coroutine, processing chunk:'', 2, ''ccc'')
(''inside coroutine, processing chunk:'', 3, ''ddd'')
Finishing up.
(''processed result'', [''processed(0): aaa'', ''processed(1): bbb'', ''processed(2): ccc'', ''processed(3): ddd''])
DONE
Cómo funciona
processed = []
está aquí solo para mostrar, el proceso process_chunks
no tendrá problemas para cooperar con su contexto externo. Todo está envuelto en def main():
para probar, no hay necesidad de usar variables globales.
def process_chunks()
es el núcleo de la solución. Puede tener parámetros de entrada de un disparo (que no se usan aquí), pero el punto principal, donde recibe la entrada, es que cada línea de yield
devuelve lo que cualquiera envía a través de .send(data)
a la instancia de este generador. Se puede realizar una coroutine.send(chunk)
pero en este ejemplo se realiza a través de la devolución de llamada haciendo referencia a esta función callback.send
.
Tenga en cuenta que en la solución real no hay problema en tener varios yield
en el código, se procesan uno por uno. Esto se puede usar, por ejemplo, para leer (e ignorar) el encabezado del archivo CSV y luego continuar procesando registros con datos.
Podríamos instanciar y usar el generador de la siguiente manera:
coroutine = process_chunks()
# Get the coroutine to the first yield
coroutine.next()
ftp = FakeFtp()
# next line repeatedly calls `coroutine.send(data)`
ftp.retrbinary("RETR binary", cb=coroutine.send)
# each callback "jumps" to `yield` line in `process_chunks`
# close the coroutine (will throw the `GeneratorExit` exception into the
# `process_chunks` coroutine).
coroutine.close()
El código real está utilizando el contextlib
contexto de closing
contextlib
para garantizar que siempre se llame a coroutine.close()
.
Conclusiones
Esta solución no proporciona el tipo de iterador para consumir datos en el estilo tradicional "desde afuera". Por otro lado, somos capaces de:
- usar el generador "desde adentro"
- mantener todo el procesamiento iterativo dentro de una función sin ser interrumpido entre devoluciones de llamada
- opcionalmente usar contexto externo
- proporcionar resultados utilizables al exterior
- Todo esto se puede hacer sin utilizar enhebrado.
Créditos : La solución está muy inspirada en la SO SOY iterador de FTP de Python FTP (sin cargar el archivo completo en la memoria) escrito por el usuario2357112
Como se señaló en los comentarios, podría hacerlo en un nuevo hilo, utilizando la Queue
. El inconveniente es que todavía necesitaría alguna forma de acceder al resultado final (lo que devuelve fmin
al final). Mi ejemplo a continuación utiliza una devolución de llamada opcional para hacer algo con él (otra opción sería simplemente producirlo también, aunque su código de llamada tendría que diferenciar entre los resultados de iteración y los resultados finales):
from thread import start_new_thread
from Queue import Queue
def my_fmin(func, x0, end_callback=(lambda x:x), timeout=None):
q = Queue() # fmin produces, the generator consumes
job_done = object() # signals the processing is done
# Producer
def my_callback(x):
q.put(x)
def task():
ret = scipy.optimize.fmin(func,x0,callback=my_callback)
q.put(job_done)
end_callback(ret) # "Returns" the result of the main call
# Starts fmin in a new thread
start_new_thread(task,())
# Consumer
while True:
next_item = q.get(True,timeout) # Blocks until an input is available
if next_item is job_done:
break
yield next_item
Actualización: para bloquear la ejecución de la siguiente iteración hasta que el consumidor haya terminado de procesar la última, también es necesario usar task_done
y join
.
# Producer
def my_callback(x):
q.put(x)
q.join() # Blocks until task_done is called
# Consumer
while True:
next_item = q.get(True,timeout) # Blocks until an input is available
if next_item is job_done:
break
yield next_item
q.task_done() # Unblocks the producer, so a new iteration can start
Tenga en cuenta que maxsize=1
no es necesario, ya que no se agregará ningún elemento nuevo a la cola hasta que se consuma el último.
Actualización 2: también tenga en cuenta que, a menos que todos los elementos sean recuperados finalmente por este generador, el subproceso creado se bloqueará (se bloqueará para siempre y sus recursos nunca se liberarán). El productor está esperando en la cola, y dado que almacena una referencia a esa cola, nunca será reclamado por el CG incluso si el consumidor lo está. La cola se volverá inalcanzable, por lo que nadie podrá liberar el bloqueo.
Una solución limpia para eso es desconocida, si es posible (ya que dependería de la función particular utilizada en lugar de fmin
). Se podría hacer una solución temporal utilizando el timeout
, ya que el productor genera una excepción si put
bloques por demasiado tiempo:
q = Queue(maxsize=1)
# Producer
def my_callback(x):
q.put(x)
q.put("dummy",True,timeout) # Blocks until the first result is retrieved
q.join() # Blocks again until task_done is called
# Consumer
while True:
next_item = q.get(True,timeout) # Blocks until an input is available
q.task_done() # (one "task_done" per "get")
if next_item is job_done:
break
yield next_item
q.get() # Retrieves the "dummy" object (must be after yield)
q.task_done() # Unblocks the producer, so a new iteration can start
Qué tal si
data = []
scipy.optimize.fmin(func,x0,callback=data.append)
for line in data:
print line
Si no, ¿qué es exactamente lo que quieres hacer con los datos del generador?