navisworks español scala concurrency parallel-processing

español - Iterar líneas sobre un archivo en paralelo(Scala)?



manual de navisworks 2018 en español pdf (6)

Sé sobre las colecciones paralelas en Scala. ¡Son útiles! Sin embargo, me gustaría iterar sobre las líneas de un archivo que es demasiado grande para la memoria en paralelo. Podría crear hilos y configurar un bloqueo sobre un escáner, por ejemplo, pero sería genial si pudiera ejecutar código como:

Source.fromFile(path).getLines.par foreach { line =>

Lamentablemente, sin embargo

error: value par is not a member of Iterator[String]

¿Cuál es la forma más fácil de lograr algún paralelismo aquí? Por ahora, voy a leer en algunas líneas y manejarlas en paralelo.


A continuación me ayudó a lograr

source.getLines.toStream.par.foreach( line => println(line))


Los comentarios sobre la respuesta de Dan Simon me hicieron pensar. ¿Por qué no tratamos de envolver la fuente en una secuencia?

def src(source: Source) = Stream[String] = { if (source.hasNext) Stream.cons(source.takeWhile( _ != ''/n'' ).mkString) else Stream.empty }

Entonces podrías consumirlo en paralelo así:

src(Source.fromFile(path)).par foreach process

Probé esto, y se compila y ejecuta de todos modos. No estoy sinceramente seguro de si está cargando todo el archivo en la memoria o no, pero no creo que sea así.


Me doy cuenta de que esta es una vieja pregunta, pero puede encontrar que la implementación de ParIterator en la biblioteca de iterata es una implementación útil que no requiere ensamblaje:

scala> import com.timgroup.iterata.ParIterator.Implicits._ scala> val it = (1 to 100000).toIterator.par().map(n => (n + 1, Thread.currentThread.getId)) scala> it.map(_._2).toSet.size res2: Int = 8 // addition was distributed over 8 threads


Puede usar la agrupación para dividir fácilmente el iterador en fragmentos que puede cargar en la memoria y luego procesar en paralelo.

val chunkSize = 128 * 1024 val iterator = Source.fromFile(path).getLines.grouped(chunkSize) iterator.foreach { lines => lines.par.foreach { line => process(line) } }

En mi opinión, algo así es la forma más sencilla de hacerlo.



Voy a poner esto como una respuesta por separado, ya que es fundamentalmente diferente de la última (y realmente funciona)

Aquí hay un resumen de una solución que utiliza actores, que es básicamente lo que describe el comentario de Kim Stebel. Hay dos clases de actor, un solo actor de FileReader que lee líneas individuales del archivo a pedido y varios actores de Worker. Todos los trabajadores envían solicitudes al lector y procesan líneas en paralelo a medida que se leen del archivo.

Estoy usando actores de Akka aquí, pero usar otra implementación es básicamente la misma idea.

case object LineRequest case object BeginProcessing class FileReader extends Actor { //reads a single line from the file or returns None if EOF def getLine:Option[String] = ... def receive = { case LineRequest => self.sender.foreach{_ ! getLine} //sender is an Option[ActorRef] } } class Worker(reader: ActorRef) extends Actor { def process(line:String) ... def receive = { case BeginProcessing => reader ! LineRequest case Some(line) => { process(line) reader ! LineRequest } case None => self.stop } } val reader = actorOf[FileReader].start val workers = Vector.fill(4)(actorOf(new Worker(reader)).start) workers.foreach{_ ! BeginProcessing} //wait for the workers to stop...

De esta manera, no hay más de 4 (o el número de trabajadores que tenga) líneas sin procesar en memoria a la vez.