scala scalaz iteratee

Evitar fugas de memoria con Scalaz 7 zipWithIndex/group enumeratees



iteratee (1)

Fondo

Como se señala en esta pregunta , estoy usando iteraciones de Scalaz 7 para procesar una gran cantidad de datos (es decir, sin límites) en un espacio de montón constante.

Mi código se ve así:

type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A] type ErrorOr[A] = ErrorOrT[IO, A] def processChunk(c: Chunk, idx: Long): Result def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] = Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) => rs ++ vs map { case (c, i) => processChunk(c, i) } } &= (data.zipWithIndex mapE Iteratee.group(P))

El problema

Parece que me encontré con una pérdida de memoria, pero no estoy lo suficientemente familiarizado con Scalaz / FP para saber si el error está en Scalaz o en mi código. Intuitivamente, espero que este código requiera solo (del orden de) P veces el espacio de tamaño de fragmento.

Nota: Encontré una pregunta similar en la que se encontró un OutOfMemoryError , pero mi código no está usando consume .

Pruebas

Ejecuté algunas pruebas para tratar de aislar el problema. En resumen, la fuga solo parece surgir cuando se zipWithIndex tanto zipWithIndex como el group .

// no zipping/grouping scala> (i1 &= enumArrs(1 << 25, 128)).run.unsafePerformIO res47: Long = 4294967296 // grouping only scala> (i2 &= (enumArrs(1 << 25, 128) mapE Iteratee.group(4))).run.unsafePerformIO res49: Long = 4294967296 // zipping and grouping scala> (i3 &= (enumArrs(1 << 25, 128).zipWithIndex mapE Iteratee.group(4))).run.unsafePerformIO java.lang.OutOfMemoryError: Java heap space // zipping only scala> (i4 &= (enumArrs(1 << 25, 128).zipWithIndex)).run.unsafePerformIO res51: Long = 4294967296 // no zipping/grouping, larger arrays scala> (i1 &= enumArrs(1 << 27, 128)).run.unsafePerformIO res53: Long = 17179869184 // zipping only, larger arrays scala> (i4 &= (enumArrs(1 << 27, 128).zipWithIndex)).run.unsafePerformIO res54: Long = 17179869184

Código para las pruebas:

import scalaz.iteratee._, scalaz.effect.IO, scalaz.std.vector._ // define an enumerator that produces a stream of new, zero-filled arrays def enumArrs(sz: Int, n: Int) = Iteratee.enumIterator[Array[Int], IO]( Iterator.continually(Array.fill(sz)(0)).take(n)) // define an iteratee that consumes a stream of arrays // and computes its length val i1 = Iteratee.fold[Array[Int], IO, Long](0) { (c, a) => c + a.length } // define an iteratee that consumes a grouped stream of arrays // and computes its length val i2 = Iteratee.fold[Vector[Array[Int]], IO, Long](0) { (c, as) => c + as.map(_.length).sum } // define an iteratee that consumes a grouped/zipped stream of arrays // and computes its length val i3 = Iteratee.fold[Vector[(Array[Int], Long)], IO, Long](0) { (c, vs) => c + vs.map(_._1.length).sum } // define an iteratee that consumes a zipped stream of arrays // and computes its length val i4 = Iteratee.fold[(Array[Int], Long), IO, Long](0) { (c, v) => c + v._1.length }

Preguntas

  • Es el error en mi código?
  • ¿Cómo puedo hacer que esto funcione en un espacio de montón constante?

Esto servirá de poco consuelo para cualquiera que esté atascado con la API de iteratee anterior, pero recientemente verifiqué que pasa una prueba equivalente contra la API de scalaz-stream . Esta es una API de procesamiento de flujo más nueva que está destinada a reemplazar iteratee .

Para completar, aquí está el código de prueba:

// create a stream containing `n` arrays with `sz` Ints in each one def streamArrs(sz: Int, n: Int): Process[Task, Array[Int]] = (Process emit Array.fill(sz)(0)).repeat take n (streamArrs(1 << 25, 1 << 14).zipWithIndex pipe process1.chunk(4) pipe process1.fold(0L) { (c, vs) => c + vs.map(_._1.length.toLong).sum }).runLast.run

Esto debería funcionar con cualquier valor para el parámetro n (siempre que esté dispuesto a esperar el tiempo suficiente) - Probé con 2 ^ 14 matrices de 32MiB (es decir, un total de medio TiB de memoria asignada a lo largo del tiempo).