Evitar fugas de memoria con Scalaz 7 zipWithIndex/group enumeratees
iteratee (1)
Fondo
Como se señala en esta pregunta , estoy usando iteraciones de Scalaz 7 para procesar una gran cantidad de datos (es decir, sin límites) en un espacio de montón constante.
Mi código se ve así:
type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]
def processChunk(c: Chunk, idx: Long): Result
def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] =
Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) =>
rs ++ vs map {
case (c, i) => processChunk(c, i)
}
} &= (data.zipWithIndex mapE Iteratee.group(P))
El problema
Parece que me encontré con una pérdida de memoria, pero no estoy lo suficientemente familiarizado con Scalaz / FP para saber si el error está en Scalaz o en mi código. Intuitivamente, espero que este código requiera solo (del orden de) P veces el espacio de tamaño de fragmento.
Nota: Encontré una pregunta similar en la que se encontró un OutOfMemoryError
, pero mi código no está usando consume
.
Pruebas
Ejecuté algunas pruebas para tratar de aislar el problema. En resumen, la fuga solo parece surgir cuando se zipWithIndex
tanto zipWithIndex
como el group
.
// no zipping/grouping
scala> (i1 &= enumArrs(1 << 25, 128)).run.unsafePerformIO
res47: Long = 4294967296
// grouping only
scala> (i2 &= (enumArrs(1 << 25, 128) mapE Iteratee.group(4))).run.unsafePerformIO
res49: Long = 4294967296
// zipping and grouping
scala> (i3 &= (enumArrs(1 << 25, 128).zipWithIndex mapE Iteratee.group(4))).run.unsafePerformIO
java.lang.OutOfMemoryError: Java heap space
// zipping only
scala> (i4 &= (enumArrs(1 << 25, 128).zipWithIndex)).run.unsafePerformIO
res51: Long = 4294967296
// no zipping/grouping, larger arrays
scala> (i1 &= enumArrs(1 << 27, 128)).run.unsafePerformIO
res53: Long = 17179869184
// zipping only, larger arrays
scala> (i4 &= (enumArrs(1 << 27, 128).zipWithIndex)).run.unsafePerformIO
res54: Long = 17179869184
Código para las pruebas:
import scalaz.iteratee._, scalaz.effect.IO, scalaz.std.vector._
// define an enumerator that produces a stream of new, zero-filled arrays
def enumArrs(sz: Int, n: Int) =
Iteratee.enumIterator[Array[Int], IO](
Iterator.continually(Array.fill(sz)(0)).take(n))
// define an iteratee that consumes a stream of arrays
// and computes its length
val i1 = Iteratee.fold[Array[Int], IO, Long](0) {
(c, a) => c + a.length
}
// define an iteratee that consumes a grouped stream of arrays
// and computes its length
val i2 = Iteratee.fold[Vector[Array[Int]], IO, Long](0) {
(c, as) => c + as.map(_.length).sum
}
// define an iteratee that consumes a grouped/zipped stream of arrays
// and computes its length
val i3 = Iteratee.fold[Vector[(Array[Int], Long)], IO, Long](0) {
(c, vs) => c + vs.map(_._1.length).sum
}
// define an iteratee that consumes a zipped stream of arrays
// and computes its length
val i4 = Iteratee.fold[(Array[Int], Long), IO, Long](0) {
(c, v) => c + v._1.length
}
Preguntas
- Es el error en mi código?
- ¿Cómo puedo hacer que esto funcione en un espacio de montón constante?
Esto servirá de poco consuelo para cualquiera que esté atascado con la API de iteratee
anterior, pero recientemente verifiqué que pasa una prueba equivalente contra la API de scalaz-stream . Esta es una API de procesamiento de flujo más nueva que está destinada a reemplazar iteratee
.
Para completar, aquí está el código de prueba:
// create a stream containing `n` arrays with `sz` Ints in each one
def streamArrs(sz: Int, n: Int): Process[Task, Array[Int]] =
(Process emit Array.fill(sz)(0)).repeat take n
(streamArrs(1 << 25, 1 << 14).zipWithIndex
pipe process1.chunk(4)
pipe process1.fold(0L) {
(c, vs) => c + vs.map(_._1.length.toLong).sum
}).runLast.run
Esto debería funcionar con cualquier valor para el parámetro n
(siempre que esté dispuesto a esperar el tiempo suficiente) - Probé con 2 ^ 14 matrices de 32MiB (es decir, un total de medio TiB de memoria asignada a lo largo del tiempo).