una tipografía tipografica tipografia texto propia mano illustrator hacer fuentes fuente dafont crear como scala akka akka-stream akka-http

scala - tipografía - dafont



¿Cómo crear una fuente que pueda recibir elementos más tarde a través de una llamada al método? (3)

Me gustaría crear una Source y luego insertar elementos en ella, como en:

val src = ... // create the Source here // and then, do something like this pushElement(x1, src) pushElement(x2, src)

cual es la manera recomendada para hacer esto?

¡Gracias!


Desde Akka 2.5 Source tiene un método preMaterialize .

De acuerdo con la documentación , esto se parece a la forma indicada de hacer lo que pide:

Hay situaciones en las que necesita un valor materializado de Source antes de que la Source se conecte al resto del gráfico. Esto es particularmente útil en el caso de fuentes con "valor materializado", como Source.queue , Source.actorRef o Source.maybe .

Debajo de un ejemplo sobre cómo sería esto con un SourceQueue . Los elementos se envían a la cola antes y después de la materialización, así como desde dentro del Flow :

import akka.actor.ActorSystem import akka.stream.scaladsl._ import akka.stream.{ActorMaterializer, OverflowStrategy} implicit val system = ActorSystem("QuickStart") implicit val materializer = ActorMaterializer() val sourceDecl = Source.queue[String](bufferSize = 2, OverflowStrategy.backpressure) val (sourceMat, source) = sourceDecl.preMaterialize() // Adding element before actual materialization sourceMat.offer("pre materialization element") val flow = Flow[String].map { e => if(!e.contains("new")) { // Adding elements from within the flow sourceMat.offer("new element generated inside the flow") } s"Processing $e" } // Actually materializing with `run` source.via(flow).to(Sink.foreach(println)).run() // Adding element after materialization sourceMat.offer("post materialization element")

Salida:

Processing pre materialization element Processing post materialization element Processing new element generated inside the flow Processing new element generated inside the flow


Después de jugar y buscar una buena solución para esto, encontré esta solución que es limpia, simple y funciona tanto antes como después de la materialización. https://.com/a/32553913/6791842

val (ref: ActorRef, publisher: Publisher[Int]) = Source.actorRef[Int](bufferSize = 1000, OverflowStrategy.fail) .toMat(Sink.asPublisher(true))(Keep.both).run() ref ! 1 //before val source = Source.fromPublisher(publisher) ref ! 2 //before Thread.sleep(1000) ref ! 3 //before source.runForeach(println) ref ! 4 //after Thread.sleep(1000) ref ! 5 //after

Salida:

1 2 3 4 5


Hay tres formas de lograrlo:

1. Publicar materialización con SourceQueue

Puede usar Source.queue que materializa el flujo en un SourceQueue :

case class Weather(zipCode : String, temperature : Double, raining : Boolean) val bufferSize = 100 //if the buffer fills up then this strategy drops the oldest elements //upon the arrival of a new element. val overflowStrategy = akka.stream.OverflowStrategy.dropHead val queue = Source.queue(bufferSize, overflowStrategy) .filter(!_.raining) .to(Sink foreach println) .run() // in order to "keep" the queue Materialized value instead of the Sink''s queue offer Weather("02139", 32.0, true)

2. Publicar materialización con el actor

Aquí hay una pregunta y respuesta similar, la esencia es que materializas la transmisión como ActorRef y envías mensajes a esa referencia:

val ref = Source.actorRef[Weather](Int.MaxValue, fail) .filter(!_.raining) .to(Sink foreach println ) .run() // in order to "keep" the ref Materialized value instead of the Sink''s ref ! Weather("02139", 32.0, true)

3. Pre materialización con el actor

Del mismo modo, podría crear explícitamente un actor que contenga un búfer de mensajes, usar ese actor para crear una fuente y luego enviar los mensajes de ese actor como se describe en la respuesta here :

object WeatherForwarder { def props : Props = Props[WeatherForwarder] } //see provided link for example definition class WeatherForwarder extends Actor {...} val actorRef = actorSystem actorOf WeatherForwarder.props //note the stream has not been instatiated yet actorRef ! Weather("02139", 32.0, true) //stream already has 1 Weather value to process which is sitting in the //ActorRef''s internal buffer val stream = Source(ActorPublisher[Weather](actorRef)).runWith{...}