transformadores reactiva potencia factor energia coseno corrige corregir como activa scala file-upload playframework-2.0 azure-storage iterate

scala - potencia - Juego 2.x: carga reactiva de archivos con Iteratos



factor de potencia pdf (4)

Básicamente, lo que necesita primero es volver a introducir datos como fragmentos más grandes, 1024 * 1024 bytes.

Primero tengamos un Iteratee que consumirá hasta 1 m de bytes (vale para tener el último fragmento más pequeño)

val consumeAMB = Traversable.takeUpTo[Array[Byte]](1024*1024) &>> Iteratee.consume()

Usando eso, podemos construir un Enumeratee (adaptador) que reagrupe fragmentos, usando una API llamada agrupada:

val rechunkAdapter:Enumeratee[Array[Byte],Array[Byte]] = Enumeratee.grouped(consumeAMB)

Aquí agrupado utiliza un Iteratee para determinar cuánto poner en cada pedazo. Utiliza nuestro nuestro consumerAMB para eso. Lo que significa que el resultado es un Enumeratee que vuelve a ingresar la entrada en Array[Byte] de 1MB.

Ahora tenemos que escribir BodyParser , que usará el método Iteratee.foldM para enviar cada fragmento de bytes:

val writeToStore: Iteratee[Array[Byte],_] = Iteratee.foldM[Array[Byte],_](connectionHandle){ (c,bytes) => // write bytes and return next handle, probable in a Future }

foldM pasa un estado y lo usa en su función pasada (S,Input[Array[Byte]]) => Future[S] para devolver un nuevo Future of state. foldM no volverá a llamar a la función hasta que se complete Future y haya un fragmento de entrada disponible.

Y el analizador corporal volverá a ingresar y lo introducirá en la tienda:

BodyParser( rh => (rechunkAdapter &>> writeToStore).map(Right(_)))

Devolver una derecha indica que está devolviendo un cuerpo al final del análisis del cuerpo (que es el manejador aquí).

Comenzaré con la pregunta: Cómo usar el Iteratee la API de Iteratee para cargar un archivo en el almacenamiento en la nube (Azure Blob Storage en mi caso, pero no creo que sea más importante ahora)

Fondo:

Necesito dividir la entrada en bloques de aproximadamente 1 MB para almacenar archivos multimedia grandes (300 MB +) como BlockBlobs de Azure. Desafortunadamente, mi conocimiento de Scala todavía es pobre (mi proyecto está basado en Java y el único uso para Scala será un controlador de carga).

Intenté con este código: ¿Por qué hace que el error de llamada o hecho en un Iteratee de BodyParser cuelgue la solicitud en Play Framework 2.0? (como Input Iteratee ): funciona bastante bien, pero cada Element que puedo usar tiene un tamaño de 8192 bytes, por lo que es demasiado pequeño para enviar cientos de archivos megabyte a la nube.

Debo decir que ese es un enfoque bastante nuevo para mí, y lo más probable es que malinterpreté algo (no quiero decir que no entendí bien todo;>)

Apreciaré cualquier sugerencia o enlace, que me ayudará con ese tema. Si hay alguna muestra de uso similar, sería la mejor opción para mí tener la idea.


Para aquellos que también están tratando de encontrar una solución a este problema de transmisión, en lugar de escribir un nuevo BodyParser, también pueden usar lo que ya se implementó en parse.multipartFormData . Puede implementar algo como a continuación para sobrescribir el controlador predeterminado handleFilePartAsTemporaryFile .

def handleFilePartAsS3FileUpload: PartHandler[FilePart[String]] = { handleFilePart { case FileInfo(partName, filename, contentType) => (rechunkAdapter &>> writeToS3).map { _ => val compRequest = new CompleteMultipartUploadRequest(...) amazonS3Client.completeMultipartUpload(compRequest) ... } } } def multipartFormDataS3: BodyParser[MultipartFormData[String]] = multipartFormData(handleFilePartAsS3FileUpload)

Puedo hacer que esto funcione, pero aún no estoy seguro de si se transmite todo el proceso de carga. Intenté algunos archivos grandes, parece que la carga S3 solo comienza cuando todo el archivo ha sido enviado desde el lado del cliente.

Miré la implementación del analizador anterior y creo que todo está conectado usando Iteratee para que el archivo se transmita. Si alguien tiene alguna idea sobre esto, será muy útil.


Si su objetivo es transmitir a S3, aquí hay un ayudante que he implementado y probado:

def uploadStream(bucket: String, key: String, enum: Enumerator[Array[Byte]]) (implicit ec: ExecutionContext): Future[CompleteMultipartUploadResult] = { import scala.collection.JavaConversions._ val initRequest = new InitiateMultipartUploadRequest(bucket, key) val initResponse = s3.initiateMultipartUpload(initRequest) val uploadId = initResponse.getUploadId val rechunker: Enumeratee[Array[Byte], Array[Byte]] = Enumeratee.grouped { Traversable.takeUpTo[Array[Byte]](5 * 1024 * 1024) &>> Iteratee.consume() } val uploader = Iteratee.foldM[Array[Byte], Seq[PartETag]](Seq.empty) { case (etags, bytes) => val uploadRequest = new UploadPartRequest() .withBucketName(bucket) .withKey(key) .withPartNumber(etags.length + 1) .withUploadId(uploadId) .withInputStream(new ByteArrayInputStream(bytes)) .withPartSize(bytes.length) val etag = Future { s3.uploadPart(uploadRequest).getPartETag } etag.map(etags :+ _) } val futETags = enum &> rechunker |>>> uploader futETags.map { etags => val compRequest = new CompleteMultipartUploadRequest(bucket, key, uploadId, etags.toBuffer[PartETag]) s3.completeMultipartUpload(compRequest) }.recoverWith { case e: Exception => s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, key, uploadId)) Future.failed(e) } }


agregue lo siguiente a su archivo de configuración

play.http.parser.maxMemoryBuffer = 256K