streams actors scala akka akka-stream

scala - actors - ¿Cómo agregar elementos a la fuente dinámicamente?



akka microservices (2)

Tengo un código de ejemplo para generar una fuente independiente y trabajar con ella:

objeto principal {

def main(args : Array[String]): Unit = { implicit val system = ActorSystem("Sys") import system.dispatcher implicit val materializer = ActorFlowMaterializer() val source: Source[String] = Source(() => { Iterator.continually({ "message:" + ThreadLocalRandom.current().nextInt(10000)}) }) source.runForeach((item:String) => { println(item) }) .onComplete{ _ => system.shutdown() } }

}

Quiero crear la clase que implementa:

trait MySources { def addToSource(item: String) def getSource() : Source[String] }

Y necesito usarlo con múltiples hilos, por ejemplo:

class MyThread(mySources: MySources) extends Thread { override def run(): Unit = { for(i <- 1 to 1000000) { // here will be infinite loop mySources.addToSource(i.toString) } } }

Y el código completo esperado:

object Main { def main(args : Array[String]): Unit = { implicit val system = ActorSystem("Sys") import system.dispatcher implicit val materializer = ActorFlowMaterializer() val sources = new MySourcesImplementation() for(i <- 1 to 100) { (new MyThread(sources)).start() } val source = sources.getSource() source.runForeach((item:String) => { println(item) }) .onComplete{ _ => system.shutdown() } } }

¿Cómo implementar MySources ?



Una forma de tener una fuente no finita es usar un tipo especial de actor como fuente, uno que se mezcle con el rasgo ActorPublisher . Si crea uno de esos tipos de actores y luego finaliza con una llamada a ActorPublisher.apply , termina con una instancia de Reactive Streams Publisher y con eso, puede usar una apply de Source para generar una Source . Después de eso, solo debes asegurarte de que tu clase ActorPublisher maneje correctamente el protocolo de Reactive Streams para enviar elementos en sentido descendente y estarás listo. Un ejemplo muy trivial es el siguiente:

import akka.actor._ import akka.stream.actor._ import akka.stream.ActorFlowMaterializer import akka.stream.scaladsl._ object DynamicSourceExample extends App{ implicit val system = ActorSystem("test") implicit val materializer = ActorFlowMaterializer() val actorRef = system.actorOf(Props[ActorBasedSource]) val pub = ActorPublisher[Int](actorRef) Source(pub). map(_ * 2). runWith(Sink.foreach(println)) for(i <- 1 until 20){ actorRef ! i.toString Thread.sleep(1000) } } class ActorBasedSource extends Actor with ActorPublisher[Int]{ import ActorPublisherMessage._ var items:List[Int] = List.empty def receive = { case s:String => if (totalDemand == 0) items = items :+ s.toInt else onNext(s.toInt) case Request(demand) => if (demand > items.size){ items foreach (onNext) items = List.empty } else{ val (send, keep) = items.splitAt(demand.toInt) items = keep send foreach (onNext) } case other => println(s"got other $other") } }