scala - actors - ¿Cómo agregar elementos a la fuente dinámicamente?
akka microservices (2)
Tengo un código de ejemplo para generar una fuente independiente y trabajar con ella:
objeto principal {
def main(args : Array[String]): Unit = {
implicit val system = ActorSystem("Sys")
import system.dispatcher
implicit val materializer = ActorFlowMaterializer()
val source: Source[String] = Source(() => {
Iterator.continually({ "message:" + ThreadLocalRandom.current().nextInt(10000)})
})
source.runForeach((item:String) => { println(item) })
.onComplete{ _ => system.shutdown() }
}
}
Quiero crear la clase que implementa:
trait MySources {
def addToSource(item: String)
def getSource() : Source[String]
}
Y necesito usarlo con múltiples hilos, por ejemplo:
class MyThread(mySources: MySources) extends Thread {
override def run(): Unit = {
for(i <- 1 to 1000000) { // here will be infinite loop
mySources.addToSource(i.toString)
}
}
}
Y el código completo esperado:
object Main {
def main(args : Array[String]): Unit = {
implicit val system = ActorSystem("Sys")
import system.dispatcher
implicit val materializer = ActorFlowMaterializer()
val sources = new MySourcesImplementation()
for(i <- 1 to 100) {
(new MyThread(sources)).start()
}
val source = sources.getSource()
source.runForeach((item:String) => { println(item) })
.onComplete{ _ => system.shutdown() }
}
}
¿Cómo implementar MySources
?
Con Akka Streams 2 puedes usar un sourceQueue: ¿Cómo crear un Source que pueda recibir elementos más tarde a través de una llamada de método?
Una forma de tener una fuente no finita es usar un tipo especial de actor como fuente, uno que se mezcle con el rasgo ActorPublisher
. Si crea uno de esos tipos de actores y luego finaliza con una llamada a ActorPublisher.apply
, termina con una instancia de Reactive Streams Publisher
y con eso, puede usar una apply
de Source
para generar una Source
. Después de eso, solo debes asegurarte de que tu clase ActorPublisher
maneje correctamente el protocolo de Reactive Streams para enviar elementos en sentido descendente y estarás listo. Un ejemplo muy trivial es el siguiente:
import akka.actor._
import akka.stream.actor._
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl._
object DynamicSourceExample extends App{
implicit val system = ActorSystem("test")
implicit val materializer = ActorFlowMaterializer()
val actorRef = system.actorOf(Props[ActorBasedSource])
val pub = ActorPublisher[Int](actorRef)
Source(pub).
map(_ * 2).
runWith(Sink.foreach(println))
for(i <- 1 until 20){
actorRef ! i.toString
Thread.sleep(1000)
}
}
class ActorBasedSource extends Actor with ActorPublisher[Int]{
import ActorPublisherMessage._
var items:List[Int] = List.empty
def receive = {
case s:String =>
if (totalDemand == 0)
items = items :+ s.toInt
else
onNext(s.toInt)
case Request(demand) =>
if (demand > items.size){
items foreach (onNext)
items = List.empty
}
else{
val (send, keep) = items.splitAt(demand.toInt)
items = keep
send foreach (onNext)
}
case other =>
println(s"got other $other")
}
}