scala - tipografía - dafont
¿Cómo crear una fuente que pueda recibir elementos más tarde a través de una llamada al método? (3)
Me gustaría crear una
Source
y luego insertar elementos en ella, como en:
val src = ... // create the Source here
// and then, do something like this
pushElement(x1, src)
pushElement(x2, src)
cual es la manera recomendada para hacer esto?
¡Gracias!
Desde Akka 2.5
Source
tiene un método
preMaterialize
.
De acuerdo con la documentación , esto se parece a la forma indicada de hacer lo que pide:
Hay situaciones en las que necesita un valor materializado de
Source
antes de que laSource
se conecte al resto del gráfico. Esto es particularmente útil en el caso de fuentes con "valor materializado", comoSource.queue
,Source.actorRef
oSource.maybe
.
Debajo de un ejemplo sobre cómo sería esto con un
SourceQueue
.
Los elementos se envían a la cola antes y después de la materialización, así como desde dentro del
Flow
:
import akka.actor.ActorSystem import akka.stream.scaladsl._ import akka.stream.{ActorMaterializer, OverflowStrategy} implicit val system = ActorSystem("QuickStart") implicit val materializer = ActorMaterializer() val sourceDecl = Source.queue[String](bufferSize = 2, OverflowStrategy.backpressure) val (sourceMat, source) = sourceDecl.preMaterialize() // Adding element before actual materialization sourceMat.offer("pre materialization element") val flow = Flow[String].map { e => if(!e.contains("new")) { // Adding elements from within the flow sourceMat.offer("new element generated inside the flow") } s"Processing $e" } // Actually materializing with `run` source.via(flow).to(Sink.foreach(println)).run() // Adding element after materialization sourceMat.offer("post materialization element")
Salida:
Processing pre materialization element Processing post materialization element Processing new element generated inside the flow Processing new element generated inside the flow
Después de jugar y buscar una buena solución para esto, encontré esta solución que es limpia, simple y funciona tanto antes como después de la materialización. https://.com/a/32553913/6791842
val (ref: ActorRef, publisher: Publisher[Int]) =
Source.actorRef[Int](bufferSize = 1000, OverflowStrategy.fail)
.toMat(Sink.asPublisher(true))(Keep.both).run()
ref ! 1 //before
val source = Source.fromPublisher(publisher)
ref ! 2 //before
Thread.sleep(1000)
ref ! 3 //before
source.runForeach(println)
ref ! 4 //after
Thread.sleep(1000)
ref ! 5 //after
Salida:
1
2
3
4
5
Hay tres formas de lograrlo:
1. Publicar materialización con SourceQueue
Puede usar
Source.queue
que materializa el flujo en un
SourceQueue
:
case class Weather(zipCode : String, temperature : Double, raining : Boolean)
val bufferSize = 100
//if the buffer fills up then this strategy drops the oldest elements
//upon the arrival of a new element.
val overflowStrategy = akka.stream.OverflowStrategy.dropHead
val queue = Source.queue(bufferSize, overflowStrategy)
.filter(!_.raining)
.to(Sink foreach println)
.run() // in order to "keep" the queue Materialized value instead of the Sink''s
queue offer Weather("02139", 32.0, true)
2. Publicar materialización con el actor
Aquí hay una pregunta y respuesta similar, la esencia es que materializas la transmisión como ActorRef y envías mensajes a esa referencia:
val ref = Source.actorRef[Weather](Int.MaxValue, fail)
.filter(!_.raining)
.to(Sink foreach println )
.run() // in order to "keep" the ref Materialized value instead of the Sink''s
ref ! Weather("02139", 32.0, true)
3. Pre materialización con el actor
Del mismo modo, podría crear explícitamente un actor que contenga un búfer de mensajes, usar ese actor para crear una fuente y luego enviar los mensajes de ese actor como se describe en la respuesta here :
object WeatherForwarder {
def props : Props = Props[WeatherForwarder]
}
//see provided link for example definition
class WeatherForwarder extends Actor {...}
val actorRef = actorSystem actorOf WeatherForwarder.props
//note the stream has not been instatiated yet
actorRef ! Weather("02139", 32.0, true)
//stream already has 1 Weather value to process which is sitting in the
//ActorRef''s internal buffer
val stream = Source(ActorPublisher[Weather](actorRef)).runWith{...}