streams actors scala akka akka-stream

scala - actors - akka microservices



Creando un flujo de actor en Akka Streams (3)

Aquí está una compilación de solución utilizando una etapa de gráfico. El actor tiene que reconocer todos los mensajes para tener una contrapresión. Se notifica al actor cuando la transmisión falla / completa y la transmisión falla cuando el actor termina. Esto puede ser útil si no desea utilizar preguntar, por ejemplo, cuando no todos los mensajes de entrada tienen un mensaje de salida correspondiente.

import akka.actor.{ActorRef, Status, Terminated} import akka.stream._ import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} object ActorRefBackpressureFlowStage { case object StreamInit case object StreamAck case object StreamCompleted case class StreamFailed(ex: Throwable) case class StreamElementIn[A](element: A) case class StreamElementOut[A](element: A) } /** * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. * First element is always `StreamInit`, then stream is waiting for acknowledgement message * `ackMessage` from the given actor which means that it is ready to process * elements. It also requires `ackMessage` message after each stream element * to make backpressure work. Stream elements are wrapped inside `StreamElementIn(elem)` messages. * * The target actor can emit elements at any time by sending a `StreamElementOut(elem)` message, which will * be emitted downstream when there is demand. * * If the target actor terminates the stage will fail with a WatchedActorTerminatedException. * When the stream is completed successfully a `StreamCompleted` message * will be sent to the destination actor. * When the stream is completed with failure a `StreamFailed(ex)` message will be send to the destination actor. */ class ActorRefBackpressureFlowStage[In, Out](private val flowActor: ActorRef) extends GraphStage[FlowShape[In, Out]] { import ActorRefBackpressureFlowStage._ val in: Inlet[In] = Inlet("ActorFlowIn") val out: Outlet[Out] = Outlet("ActorFlowOut") override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { private lazy val self = getStageActor { case (_, StreamAck) => if(firstPullReceived) { if (!isClosed(in) && !hasBeenPulled(in)) { pull(in) } } else { pullOnFirstPullReceived = true } case (_, StreamElementOut(elemOut)) => val elem = elemOut.asInstanceOf[Out] emit(out, elem) case (_, Terminated(targetRef)) => failStage(new WatchedActorTerminatedException("ActorRefBackpressureFlowStage", targetRef)) case (actorRef, unexpected) => failStage(new IllegalStateException(s"Unexpected message: `$unexpected` received from actor `$actorRef`.")) } var firstPullReceived: Boolean = false var pullOnFirstPullReceived: Boolean = false override def preStart(): Unit = { //initialize stage actor and watch flow actor. self.watch(flowActor) tellFlowActor(StreamInit) } setHandler(in, new InHandler { override def onPush(): Unit = { val elementIn = grab(in) tellFlowActor(StreamElementIn(elementIn)) } override def onUpstreamFailure(ex: Throwable): Unit = { tellFlowActor(StreamFailed(ex)) super.onUpstreamFailure(ex) } override def onUpstreamFinish(): Unit = { tellFlowActor(StreamCompleted) super.onUpstreamFinish() } }) setHandler(out, new OutHandler { override def onPull(): Unit = { if(!firstPullReceived) { firstPullReceived = true if(pullOnFirstPullReceived) { if (!isClosed(in) && !hasBeenPulled(in)) { pull(in) } } } } override def onDownstreamFinish(): Unit = { tellFlowActor(StreamCompleted) super.onDownstreamFinish() } }) private def tellFlowActor(message: Any): Unit = { flowActor.tell(message, self.ref) } } override def shape: FlowShape[In, Out] = FlowShape(in, out) }

Es posible crear fuentes y sumideros a partir de actores que utilizan los Source.actorPublisher() y Sink.actorSubscriber() respectivamente. ¿Pero es posible crear un Flow desde el actor?

Conceptualmente, no parece haber una buena razón para no hacerlo, dado que implementa los rasgos ActorPublisher y ActorSubscriber , pero desafortunadamente, el objeto Flow no tiene ningún método para hacerlo. En this excelente publicación de blog se hace en una versión anterior de Akka Streams, por lo que la pregunta es si es posible también en la última versión (2.4.9).


La solución de Konrad demuestra cómo crear una etapa personalizada que utiliza actores, pero en la mayoría de los casos creo que es un poco excesivo.

Normalmente tienes un Actor que es capaz de responder a preguntas:

val actorRef : ActorRef = ??? type Input = ??? type Output = ??? val queryActor : Input => Future[Output] = (actorRef ? _) andThen (_.mapTo[Output])

Esto se puede utilizar fácilmente con la funcionalidad básica de Flow que admite el número máximo de solicitudes simultáneas:

val actorQueryFlow : Int => Flow[Input, Output, _] = (parallelism) => Flow[Input].mapAsync[Output](parallelism)(queryActor)

Ahora actorQueryFlow se puede integrar en cualquier flujo ...


Soy parte del equipo de Akka y me gustaría usar esta pregunta para aclarar algunas cosas sobre las interfaces de Reactive Streams en bruto. Espero que encuentres esto útil.

En particular, pronto publicaremos varias publicaciones en el blog del equipo Akka sobre la creación de escenarios personalizados, incluidos Flows, así que mantén un ojo en ello.

No use ActorPublisher / ActorSubscriber

Por favor, no utilice ActorPublisher y ActorSubscriber . Son de un nivel demasiado bajo y podría terminar implementándolos de tal manera que esté violando la especificación de Reactive Streams . Son una reliquia del pasado e incluso entonces solo eran "solo modo de usuario avanzado". Realmente no hay razón para usar esas clases hoy en día. Nunca proporcionamos una manera de generar un flujo porque la complejidad es simplemente explosiva si se expone como API de actor "sin procesar" para que usted la implemente y haga que todas las reglas se implementen correctamente .

Si realmente desea implementar interfaces de ReactiveStreams sin formato, utilice el TCK de la Especificación para verificar que su implementación sea correcta. Es probable que lo sorprendan algunos de los casos de esquina más complejos que es un Flow (o en la terminología RS que tiene que manejar un Processor ).

La mayoría de las operaciones son posibles de construir sin pasar de bajo nivel

Muchos flujos que debería poder construir simplemente construyendo desde un Flow[T] y agregando las operaciones necesarias en él, solo a modo de ejemplo:

val newFlow: Flow[String, Int, NotUsed] = Flow[String].map(_.toInt)

Que es una descripción reutilizable del Flujo.

Ya que está preguntando sobre el modo de usuario avanzado, este es el operador más poderoso en el DSL en sí mismo: statefulFlatMapConcat . La gran mayoría de las operaciones que operan en elementos de flujo simple se pueden expresar al usarlo: Flow.statefulMapConcat[T](f: () ⇒ (Out) ⇒ Iterable[T]): Repr[T] .

Si necesita temporizadores, puede zip con un Source.timer etc.

GraphStage es la API más simple y segura para construir etapas personalizadas

En su lugar, la creación de Fuentes / Flujos / Fregaderos tiene su propia API potente y segura : GraphStage . Lea la documentación sobre la creación de GraphStages personalizados (pueden ser un Sink / Source / Flow o incluso cualquier forma arbitraria). Maneja todas las reglas complejas de Reactive Streams para ti, mientras te da total libertad y seguridad de tipos mientras implementas tus etapas (lo que podría ser un Flow).

Por ejemplo, tomado de los documentos, es una implementación de GraphStage del operador de filter(T => Boolean) :

class Filter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] { val in = Inlet[A]("Filter.in") val out = Outlet[A]("Filter.out") val shape = FlowShape.of(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { setHandler(in, new InHandler { override def onPush(): Unit = { val elem = grab(in) if (p(elem)) push(out, elem) else pull(in) } }) setHandler(out, new OutHandler { override def onPull(): Unit = { pull(in) } }) } }

También maneja canales asíncronos y es fusible por defecto.

Además de los documentos, estas publicaciones de blog explican en detalle por qué esta API es el santo grial de construir etapas personalizadas de cualquier forma: