framework - Reenvíe un flujo de carga de archivos a S3 a través de Iteratee con Play2/Scala
scala for web development (1)
He leído algunas cosas acerca de la posibilidad de enviar un archivo a S3 a través de Iteratee, que parece permitir enviar fragmentos de S3 de un archivo a medida que los recibimos y evitar un OutOfMemory para archivos grandes por ejemplo.
Encontré esta publicación SO que probablemente sea casi lo que necesito hacer: Play 2.x: carga de archivos reactivos con Iteratees No entiendo bien cómo hacerlo, o si está realmente disponible en Play 2.0.2 ( porque Sadek Brodi dice que foldM está disponible en Play 2.1 solo por ejemplo)
¿Alguien puede explicar esto de manera simple, para alguien que ha leído un blog sobre Iteratees y todavía no es un experto en Scala / Play2?
Ni siquiera sé si debería usar un analizador de cuerpo de varias partes o algo así, pero una cosa que sé es que no entiendo lo que hace este código:
val consumeAMB =
Traversable.takeUpTo[Array[Byte]](1028*1028) &>> Iteratee.consume()
val rechunkAdapter:Enumeratee[Array[Byte],Array[Byte]] =
Enumeratee.grouped(consumeAMB)
val writeToStore: Iteratee[Array[Byte],_] =
Iteratee.foldM[Array[Byte],_](connectionHandle){ (c,bytes) =>
// write bytes and return next handle, probable in a Future
}
BodyParser( rh => (rechunkAdapter &>> writeToStore).map(Right(_)))
Por cierto, ¿cuál será la diferencia en el consumo de memoria en comparación con el uso clásico de Java InputStream / OutputStream? De hecho, puedo reenviar un archivo de 500 mb a S3 de forma no bloqueante, con un consumo de memoria muy bajo, sin usar iterados, usando Java + AsyncHttpClient + Grizzly (pero creo que también funcionaría con Netty).
Entonces, ¿cuál es la ventaja de usar Iteratee?
Una diferencia que puedo ver es que el InputStream que recibo y reenvío a S3 en mi caso está respaldado por un archivo temporal (esto es un comportamiento CXF), por lo que puede que no sea tan reactivo como Play Iteratee
Pero con Iteratees, si el Enumerador produce bytes recibidos por la conexión y los reenvía a S3 a través de un Iteratee, entonces si la conexión a S3 no es buena y los bytes no se pueden reenviar muy rápidamente, donde se almacenan los bytes "pendientes" ?
Explicación simple? Lo intentaré. :)
Estás construyendo una tubería de componentes. Una vez que haya construido la tubería, se pueden enviar los datos. Es un Iteratee , por lo que sabe cómo iterar en datos.
El archivo que desea cargar se encuentra en el cuerpo de la solicitud y BodyParser es lo que maneja los cuerpos de la solicitud en Play. Así que pones tu canalización iterativa en un BodyParser. Cuando se realiza una solicitud, a su canalización se le enviarán los datos (se repetirá sobre ella).
Su canalización ( rechunkAdapter &>> writeToStore
) rechunkAdapter &>> writeToStore
datos en bits de 1 MB y luego los envía a S3.
La primera parte de la tubería ( rechunkAdapter
) hace la fragmentación. En realidad, tiene su propio mini-pipeline que hace la fragmentación ( consumeAMB
). Una vez que el mini-tubo ha recibido datos suficientes para hacer un trozo, lo envía a la tubería principal.
La segunda parte de la tubería ( writeToStore
) es como un bucle que se writeToStore
en cada fragmento, lo que le writeToStore
la oportunidad de enviar cada fragmento a S3.
¿Ventajas de iterate?
Una vez que sepa lo que está pasando, puede crear tuberías de iteración conectando los componentes. Y el comprobador de tipos lo más a menudo le dirá cuando conecte algo incorrectamente.
Por ejemplo, podemos modificar la tubería anterior para arreglar el hecho de que es lento. Probablemente sea lento porque la carga de la solicitud se detiene cada vez que un fragmento está listo para subir a S3. Es importante reducir la velocidad de la carga de la solicitud para que no nos quedemos sin memoria, pero podríamos ser un poco más indulgentes al agregar un búfer de tamaño fijo. Así que solo agregue Concurrent.buffer(2)
en el medio de la tubería para amortiguar hasta 2 trozos.
Iteratees proporcionan un enfoque funcional a las corrientes. Esto es una ventaja o desventaja, dependiendo de cómo se sienta con respecto a la programación funcional. :) En comparación con las corrientes perezosas (otro enfoque funcional), las iteraciones ofrecen un control preciso sobre el uso de recursos.
Finalmente, las iteraciones nos permiten hacer una programación de transmisión asíncrona muy compleja de manera relativamente (!) Simple. Podemos procesar IO sin mantener hilos, lo que es una gran victoria para la escalabilidad. El ejemplo clásico de Java InputStream / OutputStream requiere 2 hilos.