una tutorial tipografia personalizadas modificar letras illustrator fuente crear como scala akka-stream

scala - tutorial - ¿Cómo crear una fuente akka-stream a partir de un flujo que genere valores recursivamente?



tipografia tutorial (3)

Ah, las alegrías de los ciclos en las corrientes de Akka. Tuve un problema muy similar que resolví de una manera profundamente hacky. Posiblemente te sea de utilidad.

Solución Hacky:

// add a graph stage that will complete successfully if it sees no element within 5 seconds val timedStopper = b.add( Flow[Item] .idleTimeout(5.seconds) .recoverWithRetries(1, { case _: TimeoutException => Source.empty[Item] })) source ~> merge ~> fetch ~> timedStopper ~> bcast ~> data merge <~ buffer <~ kids <~ bcast

Lo que esto hace es que 5 segundos después de que el último elemento pase por la etapa timedStopper , esa etapa completa la secuencia con éxito. Esto se logra mediante el uso de idleTimeout , que falla la secuencia con una TimeoutException , y luego utiliza recoverWithRetries para convertir esa falla en una finalización exitosa. (Mencioné que era hacky).

Obviamente, esto no es adecuado si puede tener más de 5 segundos entre los elementos, o si no puede permitirse una larga espera entre la secuencia "en realidad" completada y Akka retomándola. Afortunadamente, ninguno de los dos era una preocupación para nosotros, y en ese caso, ¡en realidad funciona bastante bien!

Solución no hacky

Desafortunadamente, las únicas formas en que puedo pensar para hacer esto sin hacer trampas a través de los tiempos de espera son muy, muy complicadas.

Básicamente, necesitas poder rastrear dos cosas:

  • ¿Hay algún elemento todavía en el búfer, o en proceso de ser enviado al búfer?
  • ¿Está abierta la fuente entrante?

y complete el flujo si y solo si la respuesta a ambas preguntas es no. Los bloques de construcción nativos Akka probablemente no podrán manejar esto. Una etapa gráfica personalizada podría, sin embargo. Una opción podría ser escribir uno que tome el lugar de Merge y darle alguna forma de conocer el contenido del búfer, o posiblemente hacer que rastree tanto los ID que recibe como los ID que la transmisión está enviando al búfer. El problema es que las etapas de gráficos personalizados no son particularmente agradables para escribir en el mejor de los casos, y mucho menos cuando se está mezclando lógica en etapas como esta.

Advertencias

Las transmisiones Akka simplemente no funcionan bien con los ciclos, especialmente cómo calculan la finalización. Como resultado, puede que este no sea el único problema que encuentre.

Por ejemplo, un problema que tuvimos con una estructura muy similar fue que una falla en la fuente se trató como la secuencia que se completó con éxito, con un Future exitoso que se materializó. El problema es que, de manera predeterminada, una etapa que falla fallará en sus secuencias descendentes, pero cancelará sus corrientes ascendentes (lo que cuenta como una finalización exitosa para esas etapas). Con un ciclo como el que tienes, el resultado es una carrera, ya que la cancelación se propaga por una rama pero el fracaso por la otra. También es necesario comprobar qué sucede si se produce un error en el sumidero; Dependiendo de la configuración de cancelación para la transmisión, es posible que la cancelación no se propague hacia arriba y la fuente continuará felizmente tirando de los elementos.

Una última opción: evitar manejar la lógica recursiva con flujos en absoluto. En un extremo, si hay alguna forma de escribir un solo método recursivo de la cola que saque todos los elementos anidados a la vez y los ponga en una etapa de Flujo, eso resolverá sus problemas. Por otro lado, estamos considerando seriamente ir a Kafka haciendo cola para nuestro propio sistema.

Necesito atravesar una API que tiene la forma de un árbol. Por ejemplo, una estructura de directorios o hilos de discusión. Se puede modelar a través del siguiente flujo:

type ItemId = Int type Data = String case class Item(data: Data, kids: List[ItemId]) def randomData(): Data = scala.util.Random.alphanumeric.take(2).mkString // 0 => [1, 9] // 1 => [10, 19] // 2 => [20, 29] // ... // 9 => [90, 99] // _ => [] // NB. I don''t have access to this function, only the itemFlow. def nested(id: ItemId): List[ItemId] = if (id == 0) (1 to 9).toList else if (1 <= id && id <= 9) ((id * 10) to ((id + 1) * 10 - 1)).toList else Nil val itemFlow: Flow[ItemId, Item, NotUsed] = Flow.fromFunction(id => Item(randomData, nested(id)))

¿Cómo puedo atravesar estos datos? Conseguí el siguiente trabajo:

import akka.NotUsed import akka.actor.ActorSystem import akka.stream._ import akka.stream.scaladsl._ import scala.concurrent.Await import scala.concurrent.duration.Duration implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() val loop = GraphDSL.create() { implicit b => import GraphDSL.Implicits._ val source = b.add(Flow[Int]) val merge = b.add(Merge[Int](2)) val fetch = b.add(itemFlow) val bcast = b.add(Broadcast[Item](2)) val kids = b.add(Flow[Item].mapConcat(_.kids)) val data = b.add(Flow[Item].map(_.data)) val buffer = Flow[Int].buffer(100, OverflowStrategy.dropHead) source ~> merge ~> fetch ~> bcast ~> data merge <~ buffer <~ kids <~ bcast FlowShape(source.in, data.out) } val flow = Flow.fromGraph(loop) Await.result( Source.single(0).via(flow).runWith(Sink.foreach(println)), Duration.Inf ) system.terminate()

Sin embargo, ya que estoy usando un flujo con un búfer, el Stream nunca se completará.

Se completa cuando se han vaciado los elementos anteriores y los elementos almacenados en búfer

Flow.buffer

Leí la sección Ciclos de gráfico, vida y puntos muertos varias veces y todavía estoy luchando por encontrar una respuesta.

Esto crearía un bloqueo vivo:

import java.util.concurrent.atomic.AtomicInteger def unfold[S, E](seed: S, flow: Flow[S, E, NotUsed])(loop: E => List[S]): Source[E, NotUsed] = { // keep track of how many element flows, val remaining = new AtomicInteger(1) // 1 = seed // should be > max loop(x) val bufferSize = 10000 val (ref, publisher) = Source.actorRef[S](bufferSize, OverflowStrategy.fail) .toMat(Sink.asPublisher(true))(Keep.both) .run() ref ! seed Source.fromPublisher(publisher) .via(flow) .map{x => loop(x).foreach{ c => remaining.incrementAndGet() ref ! c } x } .takeWhile(_ => remaining.decrementAndGet > 0) }

EDITAR: Agregué un repositorio git para probar su solución https://github.com/MasseGuillaume/source-unfold


Resolví este problema escribiendo mi propio GraphStage.

import akka.NotUsed import akka.stream._ import akka.stream.scaladsl._ import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler} import scala.concurrent.ExecutionContext import scala.collection.mutable import scala.util.{Success, Failure, Try} import scala.collection.mutable def unfoldTree[S, E](seeds: List[S], flow: Flow[S, E, NotUsed], loop: E => List[S], bufferSize: Int)(implicit ec: ExecutionContext): Source[E, NotUsed] = { Source.fromGraph(new UnfoldSource(seeds, flow, loop, bufferSize)) } object UnfoldSource { implicit class MutableQueueExtensions[A](private val self: mutable.Queue[A]) extends AnyVal { def dequeueN(n: Int): List[A] = { val b = List.newBuilder[A] var i = 0 while (i < n) { val e = self.dequeue b += e i += 1 } b.result() } } } class UnfoldSource[S, E](seeds: List[S], flow: Flow[S, E, NotUsed], loop: E => List[S], bufferSize: Int)(implicit ec: ExecutionContext) extends GraphStage[SourceShape[E]] { val out: Outlet[E] = Outlet("UnfoldSource.out") override val shape: SourceShape[E] = SourceShape(out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { // Nodes to expand val frontier = mutable.Queue[S]() frontier ++= seeds // Nodes expanded val buffer = mutable.Queue[E]() // Using the flow to fetch more data var inFlight = false // Sink pulled but the buffer was empty var downstreamWaiting = false def isBufferFull() = buffer.size >= bufferSize def fillBuffer(): Unit = { val batchSize = Math.min(bufferSize - buffer.size, frontier.size) val batch = frontier.dequeueN(batchSize) inFlight = true val toProcess = Source(batch) .via(flow) .runWith(Sink.seq)(materializer) val callback = getAsyncCallback[Try[Seq[E]]]{ case Failure(ex) => { fail(out, ex) } case Success(es) => { val got = es.size inFlight = false es.foreach{ e => buffer += e frontier ++= loop(e) } if (downstreamWaiting && buffer.nonEmpty) { val e = buffer.dequeue downstreamWaiting = false sendOne(e) } else { checkCompletion() } () } } toProcess.onComplete(callback.invoke) } override def preStart(): Unit = { checkCompletion() } def checkCompletion(): Unit = { if (!inFlight && buffer.isEmpty && frontier.isEmpty) { completeStage() } } def sendOne(e: E): Unit = { push(out, e) checkCompletion() } def onPull(): Unit = { if (buffer.nonEmpty) { sendOne(buffer.dequeue) } else { downstreamWaiting = true } if (!isBufferFull && frontier.nonEmpty) { fillBuffer() } } setHandler(out, this) } }


Causa de no finalización

No creo que la causa de la transmisión que nunca se completa se deba a "usar un flujo con un búfer". La causa real, similar a .com/questions/32459329/… , es el hecho de que la combinación con el parámetro predeterminado eagerClose=False está esperando que tanto la source como el buffer completen antes de que se complete (combinación). Pero el búfer está esperando fusionar para completar Así que la fusión está esperando en el búfer y la memoria intermedia está esperando en la fusión.

eagerCerrar fusionar

Puede configurar eagerClose=True al crear su combinación. Sin embargo, el uso de un cierre impaciente puede desafortunadamente dar como resultado que algunos valores ItemId nunca sean consultados.

Solucion indirecta

Si materializa un nuevo flujo para cada nivel del árbol, entonces la recursión puede extraerse fuera del flujo.

Puedes construir una función de consulta utilizando el itemFlow :

val itemQuery : Iterable[ItemId] => Future[Seq[Data]] = (itemIds) => Source.apply(itemIds) .via(itemFlow) .runWith(Sink.seq[Data])

Esta función de consulta ahora puede ajustarse dentro de una función auxiliar recursiva:

val recQuery : (Iterable[ItemId], Iterable[Data]) => Future[Seq[Data]] = (itemIds, currentData) => itemQuery(itemIds) flatMap { allNewData => val allNewKids = allNewData.flatMap(_.kids).toSet if(allNewKids.isEmpty) Future.successful(currentData ++ allNewData) else recQuery(allNewKids, currentData ++ data) }

El número de flujos creados será igual a la profundidad máxima del árbol.

Desafortunadamente, debido a que los futuros están involucrados, esta función recursiva no es recursiva y podría resultar en un "desbordamiento de pila" si el árbol es demasiado profundo.