scala - tutorial - ¿Cómo crear una fuente akka-stream a partir de un flujo que genere valores recursivamente?
tipografia tutorial (3)
Ah, las alegrías de los ciclos en las corrientes de Akka. Tuve un problema muy similar que resolví de una manera profundamente hacky. Posiblemente te sea de utilidad.
Solución Hacky:
// add a graph stage that will complete successfully if it sees no element within 5 seconds
val timedStopper = b.add(
Flow[Item]
.idleTimeout(5.seconds)
.recoverWithRetries(1, {
case _: TimeoutException => Source.empty[Item]
}))
source ~> merge ~> fetch ~> timedStopper ~> bcast ~> data
merge <~ buffer <~ kids <~ bcast
Lo que esto hace es que 5 segundos después de que el último elemento pase por la etapa timedStopper
, esa etapa completa la secuencia con éxito. Esto se logra mediante el uso de idleTimeout
, que falla la secuencia con una TimeoutException
, y luego utiliza recoverWithRetries
para convertir esa falla en una finalización exitosa. (Mencioné que era hacky).
Obviamente, esto no es adecuado si puede tener más de 5 segundos entre los elementos, o si no puede permitirse una larga espera entre la secuencia "en realidad" completada y Akka retomándola. Afortunadamente, ninguno de los dos era una preocupación para nosotros, y en ese caso, ¡en realidad funciona bastante bien!
Solución no hacky
Desafortunadamente, las únicas formas en que puedo pensar para hacer esto sin hacer trampas a través de los tiempos de espera son muy, muy complicadas.
Básicamente, necesitas poder rastrear dos cosas:
- ¿Hay algún elemento todavía en el búfer, o en proceso de ser enviado al búfer?
- ¿Está abierta la fuente entrante?
y complete el flujo si y solo si la respuesta a ambas preguntas es no. Los bloques de construcción nativos Akka probablemente no podrán manejar esto. Una etapa gráfica personalizada podría, sin embargo. Una opción podría ser escribir uno que tome el lugar de Merge
y darle alguna forma de conocer el contenido del búfer, o posiblemente hacer que rastree tanto los ID que recibe como los ID que la transmisión está enviando al búfer. El problema es que las etapas de gráficos personalizados no son particularmente agradables para escribir en el mejor de los casos, y mucho menos cuando se está mezclando lógica en etapas como esta.
Advertencias
Las transmisiones Akka simplemente no funcionan bien con los ciclos, especialmente cómo calculan la finalización. Como resultado, puede que este no sea el único problema que encuentre.
Por ejemplo, un problema que tuvimos con una estructura muy similar fue que una falla en la fuente se trató como la secuencia que se completó con éxito, con un Future
exitoso que se materializó. El problema es que, de manera predeterminada, una etapa que falla fallará en sus secuencias descendentes, pero cancelará sus corrientes ascendentes (lo que cuenta como una finalización exitosa para esas etapas). Con un ciclo como el que tienes, el resultado es una carrera, ya que la cancelación se propaga por una rama pero el fracaso por la otra. También es necesario comprobar qué sucede si se produce un error en el sumidero; Dependiendo de la configuración de cancelación para la transmisión, es posible que la cancelación no se propague hacia arriba y la fuente continuará felizmente tirando de los elementos.
Una última opción: evitar manejar la lógica recursiva con flujos en absoluto. En un extremo, si hay alguna forma de escribir un solo método recursivo de la cola que saque todos los elementos anidados a la vez y los ponga en una etapa de Flujo, eso resolverá sus problemas. Por otro lado, estamos considerando seriamente ir a Kafka haciendo cola para nuestro propio sistema.
Necesito atravesar una API que tiene la forma de un árbol. Por ejemplo, una estructura de directorios o hilos de discusión. Se puede modelar a través del siguiente flujo:
type ItemId = Int
type Data = String
case class Item(data: Data, kids: List[ItemId])
def randomData(): Data = scala.util.Random.alphanumeric.take(2).mkString
// 0 => [1, 9]
// 1 => [10, 19]
// 2 => [20, 29]
// ...
// 9 => [90, 99]
// _ => []
// NB. I don''t have access to this function, only the itemFlow.
def nested(id: ItemId): List[ItemId] =
if (id == 0) (1 to 9).toList
else if (1 <= id && id <= 9) ((id * 10) to ((id + 1) * 10 - 1)).toList
else Nil
val itemFlow: Flow[ItemId, Item, NotUsed] =
Flow.fromFunction(id => Item(randomData, nested(id)))
¿Cómo puedo atravesar estos datos? Conseguí el siguiente trabajo:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import scala.concurrent.Await
import scala.concurrent.duration.Duration
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val loop =
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val source = b.add(Flow[Int])
val merge = b.add(Merge[Int](2))
val fetch = b.add(itemFlow)
val bcast = b.add(Broadcast[Item](2))
val kids = b.add(Flow[Item].mapConcat(_.kids))
val data = b.add(Flow[Item].map(_.data))
val buffer = Flow[Int].buffer(100, OverflowStrategy.dropHead)
source ~> merge ~> fetch ~> bcast ~> data
merge <~ buffer <~ kids <~ bcast
FlowShape(source.in, data.out)
}
val flow = Flow.fromGraph(loop)
Await.result(
Source.single(0).via(flow).runWith(Sink.foreach(println)),
Duration.Inf
)
system.terminate()
Sin embargo, ya que estoy usando un flujo con un búfer, el Stream nunca se completará.
Se completa cuando se han vaciado los elementos anteriores y los elementos almacenados en búfer
Leí la sección Ciclos de gráfico, vida y puntos muertos varias veces y todavía estoy luchando por encontrar una respuesta.
Esto crearía un bloqueo vivo:
import java.util.concurrent.atomic.AtomicInteger
def unfold[S, E](seed: S, flow: Flow[S, E, NotUsed])(loop: E => List[S]): Source[E, NotUsed] = {
// keep track of how many element flows,
val remaining = new AtomicInteger(1) // 1 = seed
// should be > max loop(x)
val bufferSize = 10000
val (ref, publisher) =
Source.actorRef[S](bufferSize, OverflowStrategy.fail)
.toMat(Sink.asPublisher(true))(Keep.both)
.run()
ref ! seed
Source.fromPublisher(publisher)
.via(flow)
.map{x =>
loop(x).foreach{ c =>
remaining.incrementAndGet()
ref ! c
}
x
}
.takeWhile(_ => remaining.decrementAndGet > 0)
}
EDITAR: Agregué un repositorio git para probar su solución https://github.com/MasseGuillaume/source-unfold
Resolví este problema escribiendo mi propio GraphStage.
import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import scala.concurrent.ExecutionContext
import scala.collection.mutable
import scala.util.{Success, Failure, Try}
import scala.collection.mutable
def unfoldTree[S, E](seeds: List[S],
flow: Flow[S, E, NotUsed],
loop: E => List[S],
bufferSize: Int)(implicit ec: ExecutionContext): Source[E, NotUsed] = {
Source.fromGraph(new UnfoldSource(seeds, flow, loop, bufferSize))
}
object UnfoldSource {
implicit class MutableQueueExtensions[A](private val self: mutable.Queue[A]) extends AnyVal {
def dequeueN(n: Int): List[A] = {
val b = List.newBuilder[A]
var i = 0
while (i < n) {
val e = self.dequeue
b += e
i += 1
}
b.result()
}
}
}
class UnfoldSource[S, E](seeds: List[S],
flow: Flow[S, E, NotUsed],
loop: E => List[S],
bufferSize: Int)(implicit ec: ExecutionContext) extends GraphStage[SourceShape[E]] {
val out: Outlet[E] = Outlet("UnfoldSource.out")
override val shape: SourceShape[E] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {
// Nodes to expand
val frontier = mutable.Queue[S]()
frontier ++= seeds
// Nodes expanded
val buffer = mutable.Queue[E]()
// Using the flow to fetch more data
var inFlight = false
// Sink pulled but the buffer was empty
var downstreamWaiting = false
def isBufferFull() = buffer.size >= bufferSize
def fillBuffer(): Unit = {
val batchSize = Math.min(bufferSize - buffer.size, frontier.size)
val batch = frontier.dequeueN(batchSize)
inFlight = true
val toProcess =
Source(batch)
.via(flow)
.runWith(Sink.seq)(materializer)
val callback = getAsyncCallback[Try[Seq[E]]]{
case Failure(ex) => {
fail(out, ex)
}
case Success(es) => {
val got = es.size
inFlight = false
es.foreach{ e =>
buffer += e
frontier ++= loop(e)
}
if (downstreamWaiting && buffer.nonEmpty) {
val e = buffer.dequeue
downstreamWaiting = false
sendOne(e)
} else {
checkCompletion()
}
()
}
}
toProcess.onComplete(callback.invoke)
}
override def preStart(): Unit = {
checkCompletion()
}
def checkCompletion(): Unit = {
if (!inFlight && buffer.isEmpty && frontier.isEmpty) {
completeStage()
}
}
def sendOne(e: E): Unit = {
push(out, e)
checkCompletion()
}
def onPull(): Unit = {
if (buffer.nonEmpty) {
sendOne(buffer.dequeue)
} else {
downstreamWaiting = true
}
if (!isBufferFull && frontier.nonEmpty) {
fillBuffer()
}
}
setHandler(out, this)
}
}
Causa de no finalización
No creo que la causa de la transmisión que nunca se completa se deba a "usar un flujo con un búfer". La causa real, similar a .com/questions/32459329/… , es el hecho de que la combinación con el parámetro predeterminado eagerClose=False
está esperando que tanto la source
como el buffer
completen antes de que se complete (combinación). Pero el búfer está esperando fusionar para completar Así que la fusión está esperando en el búfer y la memoria intermedia está esperando en la fusión.
eagerCerrar fusionar
Puede configurar eagerClose=True
al crear su combinación. Sin embargo, el uso de un cierre impaciente puede desafortunadamente dar como resultado que algunos valores ItemId
nunca sean consultados.
Solucion indirecta
Si materializa un nuevo flujo para cada nivel del árbol, entonces la recursión puede extraerse fuera del flujo.
Puedes construir una función de consulta utilizando el itemFlow
:
val itemQuery : Iterable[ItemId] => Future[Seq[Data]] =
(itemIds) => Source.apply(itemIds)
.via(itemFlow)
.runWith(Sink.seq[Data])
Esta función de consulta ahora puede ajustarse dentro de una función auxiliar recursiva:
val recQuery : (Iterable[ItemId], Iterable[Data]) => Future[Seq[Data]] =
(itemIds, currentData) => itemQuery(itemIds) flatMap { allNewData =>
val allNewKids = allNewData.flatMap(_.kids).toSet
if(allNewKids.isEmpty)
Future.successful(currentData ++ allNewData)
else
recQuery(allNewKids, currentData ++ data)
}
El número de flujos creados será igual a la profundidad máxima del árbol.
Desafortunadamente, debido a que los futuros están involucrados, esta función recursiva no es recursiva y podría resultar en un "desbordamiento de pila" si el árbol es demasiado profundo.