scala stream scalaz scalaz-stream

Dividir un proceso Scalaz-Stream en dos flujos secundarios



(3)

Usando scalaz-stream ¿es posible dividir / tenedor y luego volver a unir una secuencia?

Como ejemplo, digamos que tengo la siguiente función

val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10) val sumOfEvenNumbers = streamOfNumbers.filter(isEven).fold(0)(add) val sumOfOddNumbers = streamOfNumbers.filter(isOdd).fold(0)(add) zip( sumOfEven, sumOfOdd ).to( someEffectfulFunction )

Con Scalaz-stream, en este ejemplo los resultados serían los esperados: una tupla de números del 1 al 10 pasados ​​a un receptor.

Sin embargo, si reemplazamos streamOfNumbers con algo que requiera IO, en realidad ejecutará la acción IO dos veces.

Usando un Topic puedo crear un proceso de pub / sub que duplica los elementos en la transmisión de forma correcta, sin embargo, no almacena en el búfer, simplemente consume toda la fuente lo más rápido posible, independientemente de la velocidad con que los sumideros la consuman.

Puedo envolver esto en una cola limitada, sin embargo, el resultado final se siente mucho más complejo de lo necesario.

¿Hay una forma más simple de dividir una secuencia en scalaz-stream sin duplicar las acciones IO de la fuente?


Quizás aún pueda usar el tema y simplemente asegurarse de que los procesos secundarios se suscriban antes de presionar al tema.

Sin embargo, tenga en cuenta que esta solución no tiene límites, es decir, si va a presionar demasiado rápido, puede encontrar un error OOM.

def split[A](source:Process[Task,A]): Process[Task,(Process[Task,A], Proces[Task,A])]] = { val topic = async.topic[A] val sub1 = topic.subscribe val sub2 = topic.subscribe merge.mergeN(Process(emit(sub1->sub2),(source to topic.publish).drain)) }


También para aclarar la respuesta previa delas con el requisito de "división". La solución a su problema específico puede ser sin la necesidad de dividir las secuencias:

val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10) val oddOrEven: Process[Task,Int//Int] = streamOfNumbers.map { case even if even % 2 == 0 => right(even) case odd => left(odd) } val summed = oddOrEven.pipeW(sump1).pipeO(sump1) val evenSink: Sink[Task,Int] = ??? val oddSink: Sink[Task,Int] = ??? summed .drainW(evenSink) .to(oddSink)


Yo también necesitaba esta funcionalidad. Mi situación fue un poco más complicada y me impidió trabajar de esta manera.

Gracias a la respuesta de Daniel Spiewak en este hilo , pude hacer que lo siguiente funcionara. onHalt su solución agregandoHalt para que mi aplicación salga una vez que se complete el Process .

def split[A](p: Process[Task, A], limit: Int = 10): Process[Task, (Process[Task, A], Process[Task, A])] = { val left = async.boundedQueue[A](limit) val right = async.boundedQueue[A](limit) val enqueue = p.observe(left.enqueue).observe(right.enqueue).drain.onHalt { cause => Process.await(Task.gatherUnordered(Seq(left.close, right.close))){ _ => Halt(cause) } } val dequeue = Process((left.dequeue, right.dequeue)) enqueue merge dequeue }