actors - akka scala tutorial
¿Cómo comenzar con Akka Streams? (1)
Esta respuesta se basa en la versión
2.4.2
akka-stream
.
La API puede ser ligeramente diferente en otras versiones.
La dependencia puede ser consumida por
sbt
:
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2"
Muy bien, comencemos. La API de Akka Streams consta de tres tipos principales. A diferencia de las secuencias reactivas , estos tipos son mucho más potentes y, por lo tanto, más complejos. Se supone que para todos los ejemplos de código ya existen las siguientes definiciones:
import scala.concurrent._
import akka._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util._
implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher
Las declaraciones de
import
son necesarias para las declaraciones de tipo.
system
representa el sistema de actores de Akka y el
materializer
representa el contexto de evaluación de la secuencia.
En nuestro caso utilizamos un
ActorMaterializer
, lo que significa que las transmisiones se evalúan sobre los actores.
Ambos valores están marcados como
implicit
, lo que le da al compilador de Scala la posibilidad de inyectar estas dos dependencias automáticamente cuando sea necesario.
También importamos
system.dispatcher
, que es un contexto de ejecución para
Futures
.
Una nueva API
Akka Streams tiene estas propiedades clave:
- Implementan la especificación de Reactive Streams , cuyos tres objetivos principales son la contrapresión, los límites asíncronos y sin bloqueo y la interoperabilidad entre diferentes implementaciones.
-
Proporcionan una abstracción para un motor de evaluación de las secuencias, que se llama
Materializer
. -
Los programas se formulan como bloques de construcción reutilizables, que se representan como los tres tipos principales:
Source
,Sink
yFlow
. Los bloques de construcción forman un gráfico cuya evaluación se basa en elMaterializer
y debe activarse explícitamente.
A continuación se presentará una introducción más profunda sobre cómo usar los tres tipos principales.
Fuente
Una
Source
es un creador de datos, sirve como fuente de entrada a la secuencia.
Cada
Source
tiene un solo canal de salida y ningún canal de entrada.
Todos los datos fluyen a través del canal de salida a lo que esté conectado a la
Source
.
Imagen tomada de boldradius.com .
Una
Source
se puede crear de varias maneras:
scala> val s = Source.empty
s: akka.stream.scaladsl.Source[Nothing,akka.NotUsed] = ...
scala> val s = Source.single("single element")
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...
scala> val s = Source(1 to 3)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
scala> val s = Source(Future("single value from a Future"))
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...
scala> s runForeach println
res0: scala.concurrent.Future[akka.Done] = ...
single value from a Future
En los casos anteriores, alimentamos la
Source
con datos finitos, lo que significa que eventualmente terminarán.
No hay que olvidar que los flujos reactivos son vagos y asíncronos de forma predeterminada.
Esto significa que uno tiene que solicitar explícitamente la evaluación de la secuencia.
En Akka Streams esto se puede hacer a través de los métodos
run*
.
runForeach
no sería diferente de la conocida función
foreach
: a través de la adición de
run
se hace explícito que solicitemos una evaluación de la secuencia.
Como los datos finitos son aburridos, continuamos con uno infinito:
scala> val s = Source.repeat(5)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
scala> s take 3 runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
5
5
5
Con el método
take
podemos crear un punto de parada artificial que nos impide evaluar indefinidamente.
Dado que el soporte para actores está integrado, también podemos alimentar fácilmente la transmisión con mensajes que se envían a un actor:
def run(actor: ActorRef) = {
Future { Thread.sleep(300); actor ! 1 }
Future { Thread.sleep(200); actor ! 2 }
Future { Thread.sleep(100); actor ! 3 }
}
val s = Source
.actorRef[Int](bufferSize = 0, OverflowStrategy.fail)
.mapMaterializedValue(run)
scala> s runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
3
2
1
Podemos ver que los
Futures
se ejecutan de forma asíncrona en diferentes subprocesos, lo que explica el resultado.
En el ejemplo anterior, no es necesario un búfer para los elementos entrantes y, por lo tanto, con
OverflowStrategy.fail
podemos configurar que la secuencia falle en un desbordamiento del búfer.
Especialmente a través de esta interfaz de actor, podemos alimentar la transmisión a través de cualquier fuente de datos.
No importa si los datos son creados por el mismo hilo, por otro diferente, por otro proceso o si provienen de un sistema remoto a través de Internet.
Lavabo
Un
Sink
es básicamente lo contrario de una
Source
.
Es el punto final de una secuencia y, por lo tanto, consume datos.
Un
Sink
tiene un solo canal de entrada y ningún canal de salida.
Sinks
son especialmente necesarios cuando queremos especificar el comportamiento del recopilador de datos de una manera reutilizable y sin evaluar la secuencia.
Los métodos
run*
ya conocidos no nos permiten estas propiedades, por lo tanto, se prefiere usar
Sink
lugar.
Imagen tomada de boldradius.com .
Un breve ejemplo de un
Sink
en acción:
scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
scala> val sink = Sink.foreach[Int](elem => println(s"sink received: $elem"))
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...
scala> val flow = source to sink
flow: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> flow.run()
res3: akka.NotUsed = NotUsed
sink received: 1
sink received: 2
sink received: 3
La conexión de una
Source
a un
Sink
se puede hacer con el método
to
.
Devuelve el llamado
RunnableFlow
, que es como veremos más adelante una forma especial de
Flow
: una secuencia que se puede ejecutar simplemente llamando a su método
run()
.
Imagen tomada de boldradius.com .
Por supuesto, es posible reenviar todos los valores que llegan a un sumidero a un actor:
val actor = system.actorOf(Props(new Actor {
override def receive = {
case msg => println(s"actor received: $msg")
}
}))
scala> val sink = Sink.actorRef[Int](actor, onCompleteMessage = "stream completed")
sink: akka.stream.scaladsl.Sink[Int,akka.NotUsed] = ...
scala> val runnable = Source(1 to 3) to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> runnable.run()
res3: akka.NotUsed = NotUsed
actor received: 1
actor received: 2
actor received: 3
actor received: stream completed
Fluir
Las fuentes de datos y los sumideros son excelentes si necesita una conexión entre las transmisiones de Akka y un sistema existente, pero uno realmente no puede hacer nada con ellas. Los flujos son la última pieza que falta en la abstracción base de Akka Streams. Actúan como un conector entre diferentes flujos y se pueden usar para transformar sus elementos.
Imagen tomada de boldradius.com .
Si un
Flow
está conectado a una
Source
el resultado es una nueva
Source
.
Del mismo modo, un
Flow
conectado a un
Sink
crea un nuevo
Sink
.
Y un
Flow
conectado con una
Source
y un
Sink
da como resultado un
RunnableFlow
.
Por lo tanto, se sientan entre el canal de entrada y el de salida, pero por sí mismos no corresponden a uno de los sabores siempre que no estén conectados a una
Source
o un
Sink
.
Imagen tomada de boldradius.com .
Para obtener una mejor comprensión de los
Flows
, veremos algunos ejemplos:
scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
scala> val sink = Sink.foreach[Int](println)
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...
scala> val invert = Flow[Int].map(elem => elem * -1)
invert: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...
scala> val doubler = Flow[Int].map(elem => elem * 2)
doubler: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...
scala> val runnable = source via invert via doubler to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> runnable.run()
res10: akka.NotUsed = NotUsed
-2
-4
-6
A
via
método
via
podemos conectar una
Source
con un
Flow
.
Necesitamos especificar el tipo de entrada porque el compilador no puede inferirlo por nosotros.
Como ya podemos ver en este sencillo ejemplo, los flujos
invert
y
double
son completamente independientes de cualquier productor y consumidor de datos.
Solo transforman los datos y los reenvían al canal de salida.
Esto significa que podemos reutilizar un flujo entre múltiples flujos:
scala> val s1 = Source(1 to 3) via invert to sink
s1: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> val s2 = Source(-3 to -1) via invert to sink
s2: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> s1.run()
res10: akka.NotUsed = NotUsed
-1
-2
-3
scala> s2.run()
res11: akka.NotUsed = NotUsed
3
2
1
s1
y
s2
representan flujos completamente nuevos: no comparten ningún dato a través de sus componentes básicos.
Flujos de datos ilimitados
Antes de continuar, primero debemos volver a visitar algunos de los aspectos clave de las secuencias reactivas. Un número ilimitado de elementos puede llegar a cualquier punto y puede poner una secuencia en diferentes estados. Además de un flujo ejecutable, que es el estado habitual, un flujo puede detenerse ya sea por un error o por una señal que denota que no llegarán más datos. Una secuencia se puede modelar de forma gráfica marcando eventos en una línea de tiempo, como es el caso aquí:
Imagen tomada de La introducción a la Programación reactiva que te has estado perdiendo .
Ya hemos visto flujos ejecutables en los ejemplos de la sección anterior.
Obtenemos un
RunnableGraph
cada vez que se puede materializar una secuencia, lo que significa que un
Sink
está conectado a una
Source
.
Hasta ahora siempre nos hemos materializado en el valor
Unit
, que se puede ver en los tipos:
val source: Source[Int, NotUsed] = Source(1 to 3)
val sink: Sink[Int, Future[Done]] = Sink.foreach[Int](println)
val flow: Flow[Int, Int, NotUsed] = Flow[Int].map(x => x)
Para
Source
y
Sink
el segundo parámetro de tipo y para
Flow
el tercer parámetro de tipo denota el valor materializado.
A lo largo de esta respuesta, no se explicará el significado completo de la materialización.
Sin embargo, se pueden encontrar más detalles sobre la materialización en la
documentación oficial
.
Por ahora, lo único que necesitamos saber es que el valor materializado es lo que obtenemos cuando ejecutamos una secuencia.
Como hasta ahora solo estábamos interesados en los efectos secundarios, obtuvimos
Unit
como el valor materializado.
La excepción a esto fue la materialización de un sumidero, que resultó en un
Future
.
Nos devolvió un
Future
, ya que este valor puede denotar cuando la secuencia que está conectada al sumidero ha finalizado.
Hasta ahora, los ejemplos de código anteriores fueron agradables para explicar el concepto, pero también fueron aburridos porque solo tratamos con flujos finitos o infinitos muy simples.
Para hacerlo más interesante, a continuación se explicará una secuencia asíncrona completa y sin límites.
Ejemplo de ClickStream
Como ejemplo, queremos tener una secuencia que capture eventos de clic. Para hacerlo más desafiante, digamos que también queremos agrupar los eventos de clics que ocurren en poco tiempo uno después del otro. De esta manera, podríamos descubrir fácilmente clics dobles, triples o diez veces. Además, queremos filtrar todos los clics individuales. Respira hondo e imagina cómo resolverías ese problema de manera imperativa. Apuesto a que nadie podría implementar una solución que funcione correctamente en el primer intento. De manera reactiva, este problema es trivial de resolver. De hecho, la solución es tan simple y sencilla de implementar que incluso podemos expresarla en un diagrama que describa directamente el comportamiento del código:
Imagen tomada de La introducción a la Programación reactiva que te has estado perdiendo .
Los cuadros grises son funciones que describen cómo una secuencia se transforma en otra.
Con la función de
throttle
, acumulamos clics dentro de 250 milisegundos, las funciones de
map
y
filter
deben explicarse por sí mismas.
Las esferas de color representan un evento y las flechas representan cómo fluyen a través de nuestras funciones.
Más adelante en los pasos de procesamiento, obtenemos cada vez menos elementos que fluyen a través de nuestro flujo, ya que los agrupamos y filtramos.
El código para esta imagen se vería así:
val multiClickStream = clickStream
.throttle(250.millis)
.map(clickEvents => clickEvents.length)
.filter(numberOfClicks => numberOfClicks >= 2)
¡Toda la lógica se puede representar en solo cuatro líneas de código! En Scala, podríamos escribirlo aún más corto:
val multiClickStream = clickStream.throttle(250.millis).map(_.length).filter(_ >= 2)
La definición de
clickStream
es un poco más compleja, pero este es solo el caso porque el programa de ejemplo se ejecuta en la JVM, donde la captura de eventos de clic no es fácilmente posible.
Otra complicación es que Akka por defecto no proporciona la función del
throttle
.
En cambio, tuvimos que escribirlo nosotros mismos.
Dado que esta función es (como es el caso de las funciones de
map
o
filter
) reutilizable en diferentes casos de uso, no cuento estas líneas con el número de líneas que necesitábamos para implementar la lógica.
Sin embargo, en lenguajes imperativos, es normal que la lógica no se pueda reutilizar tan fácilmente y que los diferentes pasos lógicos sucedan en un solo lugar en lugar de aplicarse secuencialmente, lo que significa que probablemente habríamos deformado nuestro código con la lógica de limitación.
El ejemplo de código completo está disponible como una
gist
y no se discutirá aquí más adelante.
Ejemplo de SimpleWebServer
Lo que debería discutirse en su lugar es otro ejemplo. Si bien la secuencia de clics es un buen ejemplo para permitir que Akka Streams maneje un ejemplo del mundo real, carece del poder para mostrar la ejecución paralela en acción. El siguiente ejemplo representará un pequeño servidor web que puede manejar múltiples solicitudes en paralelo. El servidor web podrá aceptar conexiones entrantes y recibir secuencias de bytes de ellos que representen signos ASCII imprimibles. Estas secuencias de bytes o cadenas deben dividirse en todos los caracteres de nueva línea en partes más pequeñas. Después de eso, el servidor responderá al cliente con cada una de las líneas divididas. Alternativamente, podría hacer algo más con las líneas y dar un token de respuesta especial, pero queremos mantenerlo simple en este ejemplo y, por lo tanto, no introducir ninguna característica sofisticada. Recuerde, el servidor necesita poder manejar múltiples solicitudes al mismo tiempo, lo que básicamente significa que ninguna solicitud puede bloquear ninguna otra solicitud para su posterior ejecución. Resolver todos estos requisitos puede ser difícil de una manera imperativa; sin embargo, con Akka Streams, no deberíamos necesitar más que unas pocas líneas para resolver ninguno de estos. Primero, tengamos una visión general sobre el servidor en sí:
Básicamente, solo hay tres bloques de construcción principales. El primero debe aceptar las conexiones entrantes. El segundo debe manejar las solicitudes entrantes y el tercero debe enviar una respuesta. Implementar todos estos tres bloques de construcción es solo un poco más complicado que implementar la secuencia de clics:
def mkServer(address: String, port: Int)(implicit system: ActorSystem, materializer: Materializer): Unit = {
import system.dispatcher
val connectionHandler: Sink[Tcp.IncomingConnection, Future[Unit]] =
Sink.foreach[Tcp.IncomingConnection] { conn =>
println(s"Incoming connection from: ${conn.remoteAddress}")
conn.handleWith(serverLogic)
}
val incomingCnnections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] =
Tcp().bind(address, port)
val binding: Future[Tcp.ServerBinding] =
incomingCnnections.to(connectionHandler).run()
binding onComplete {
case Success(b) =>
println(s"Server started, listening on: ${b.localAddress}")
case Failure(e) =>
println(s"Server could not be bound to $address:$port: ${e.getMessage}")
}
}
La función
mkServer
toma (además de la dirección y el puerto del servidor) también un sistema de actor y un materializador como parámetros implícitos.
El flujo de control del servidor se representa mediante
binding
, que toma una fuente de conexiones entrantes y las reenvía a un sumidero de conexiones entrantes.
Dentro de
connectionHandler
, que es nuestro sumidero, manejamos cada conexión mediante el
serverLogic
flujo
serverLogic
, que se describirá más adelante.
binding
devuelve un
Future
, que se completa cuando el servidor se ha iniciado o el inicio ha fallado, que podría ser el caso cuando el puerto ya está ocupado por otro proceso.
Sin embargo, el código no refleja completamente el gráfico ya que no podemos ver un bloque de construcción que maneje las respuestas.
La razón de esto es que la conexión ya proporciona esta lógica por sí misma.
Es un flujo bidireccional y no solo unidireccional como los flujos que hemos visto en los ejemplos anteriores.
Como fue el caso de la materialización, tales flujos complejos no se explicarán aquí.
La
documentación oficial
tiene mucho material para cubrir gráficos de flujo más complejos.
Por ahora es suficiente saber que
Tcp.IncomingConnection
representa una conexión que sabe cómo recibir solicitudes y cómo enviar respuestas.
La parte que aún falta es el bloque de construcción
serverLogic
.
Puede verse así:
Una vez más, podemos dividir la lógica en varios bloques de construcción simples que forman el flujo de nuestro programa. Primero queremos dividir nuestra secuencia de bytes en líneas, lo que tenemos que hacer cada vez que encontramos un carácter de nueva línea. Después de eso, los bytes de cada línea deben convertirse en una cadena porque trabajar con bytes sin procesar es engorroso. En general, podríamos recibir una secuencia binaria de un protocolo complicado, lo que haría que trabajar con los datos sin procesar entrantes sea extremadamente desafiante. Una vez que tenemos una cadena legible, podemos crear una respuesta. Por razones de simplicidad, la respuesta puede ser cualquier cosa en nuestro caso. Al final, tenemos que convertir nuestra respuesta a una secuencia de bytes que se pueden enviar por cable. El código para toda la lógica puede verse así:
val serverLogic: Flow[ByteString, ByteString, Unit] = {
val delimiter = Framing.delimiter(
ByteString("/n"),
maximumFrameLength = 256,
allowTruncation = true)
val receiver = Flow[ByteString].map { bytes =>
val message = bytes.utf8String
println(s"Server received: $message")
message
}
val responder = Flow[String].map { message =>
val answer = s"Server hereby responds to message: $message/n"
ByteString(answer)
}
Flow[ByteString]
.via(delimiter)
.via(receiver)
.via(responder)
}
Ya sabemos que
serverLogic
es un flujo que toma un
ByteString
y tiene que producir un
ByteString
.
Con
delimiter
podemos dividir un
ByteString
en partes más pequeñas; en nuestro caso, debe suceder cada vez que se produce un carácter de nueva línea.
receiver
es el flujo que toma todas las secuencias de bytes divididas y las convierte en una cadena.
Por supuesto, esta es una conversión peligrosa, ya que solo los caracteres ASCII imprimibles deben convertirse en una cadena, pero para nuestras necesidades es lo suficientemente bueno.
responder
es el último componente y es responsable de crear una respuesta y convertir la respuesta a una secuencia de bytes.
A diferencia del gráfico, no dividimos este último componente en dos, ya que la lógica es trivial.
Al final, conectamos todos los flujos a través de la función
via
.
En este punto, uno puede preguntarse si nos ocupamos de la propiedad multiusuario que se mencionó al principio.
Y de hecho lo hicimos, aunque puede no ser obvio de inmediato.
Al mirar este gráfico, debería ser más claro:
El componente
serverLogic
no es más que un flujo que contiene flujos más pequeños.
Este componente toma una entrada, que es una solicitud, y produce una salida, que es la respuesta.
Dado que los flujos se pueden construir varias veces y todos funcionan de forma independiente entre sí, a través de este anidamiento logramos nuestra propiedad multiusuario.
Cada solicitud se maneja dentro de su propia solicitud y, por lo tanto, una solicitud de ejecución corta puede anular una solicitud de ejecución larga iniciada anteriormente.
En caso de que se lo haya preguntado, la definición de
serverLogic
que se mostró anteriormente, por supuesto, se puede escribir mucho más corta al incluir la mayoría de sus definiciones internas:
val serverLogic = Flow[ByteString]
.via(Framing.delimiter(
ByteString("/n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.map(msg => s"Server hereby responds to message: $msg/n")
.map(ByteString(_))
Una prueba del servidor web puede verse así:
$ # Client
$ echo "Hello World/nHow are you?" | netcat 127.0.0.1 6666
Server hereby responds to message: Hello World
Server hereby responds to message: How are you?
Para que el ejemplo de código anterior funcione correctamente, primero debemos iniciar el servidor, que se muestra en el script
startServer
:
$ # Server
$ ./startServer 127.0.0.1 6666
[DEBUG] Server started, listening on: /127.0.0.1:6666
[DEBUG] Incoming connection from: /127.0.0.1:37972
[DEBUG] Server received: Hello World
[DEBUG] Server received: How are you?
El ejemplo de código completo de este servidor TCP simple se puede encontrar here . No solo podemos escribir un servidor con Akka Streams sino también el cliente. Puede verse así:
val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
.via(Framing.delimiter(
ByteString("/n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.map(println)
.map(_ ⇒ StdIn.readLine("> "))
.map(_+"/n")
.map(ByteString(_))
connection.join(flow).run()
El código completo del cliente TCP se puede encontrar here . El código se ve bastante similar, pero a diferencia del servidor, ya no tenemos que administrar las conexiones entrantes.
Gráficos complejos
En las secciones anteriores hemos visto cómo podemos construir programas simples a partir de flujos. Sin embargo, en realidad a menudo no es suficiente confiar solo en funciones ya integradas para construir flujos más complejos. Si queremos poder utilizar Akka Streams para programas arbitrarios, necesitamos saber cómo construir nuestras propias estructuras de control personalizadas y flujos combinables que nos permitan abordar la complejidad de nuestras aplicaciones. La buena noticia es que Akka Streams se diseñó para adaptarse a las necesidades de los usuarios y para darle una breve introducción a las partes más complejas de Akka Streams, agregamos algunas características más a nuestro ejemplo de cliente / servidor.
Una cosa que aún no podemos hacer es cerrar una conexión.
En este punto, comienza a complicarse un poco más porque la API de transmisión que hemos visto hasta ahora no nos permite detener una transmisión en un punto arbitrario.
Sin embargo, existe la abstracción
GraphStage
, que se puede utilizar para crear etapas de procesamiento de gráficos arbitrarias con cualquier número de puertos de entrada o salida.
Primero
closeConnection
un vistazo al lado del servidor, donde presentamos un nuevo componente, llamado
closeConnection
:
val closeConnection = new GraphStage[FlowShape[String, String]] {
val in = Inlet[String]("closeConnection.in")
val out = Outlet[String]("closeConnection.out")
override val shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush() = grab(in) match {
case "q" ⇒
push(out, "BYE")
completeStage()
case msg ⇒
push(out, s"Server hereby responds to message: $msg/n")
}
})
setHandler(out, new OutHandler {
override def onPull() = pull(in)
})
}
}
Esta API parece mucho más engorrosa que la API de flujo.
No es de extrañar, tenemos que hacer muchos pasos imprescindibles aquí.
A cambio, tenemos más control sobre el comportamiento de nuestras transmisiones.
En el ejemplo anterior, solo especificamos un puerto de entrada y un puerto de salida y los ponemos a disposición del sistema anulando el valor de la
shape
.
Además, definimos los llamados
InHandler
y
OutHandler
, que son, en este orden, responsables de recibir y emitir elementos.
Si miró detenidamente el ejemplo completo de flujo de clics, ya debería reconocer estos componentes.
En
InHandler
tomamos un elemento y si es una cadena con un solo carácter
''q''
, queremos cerrar la secuencia.
Para darle al cliente la oportunidad de descubrir que la secuencia se cerrará pronto, emitimos la cadena
"BYE"
y luego cerramos inmediatamente el escenario.
El componente
closeConnection
se puede combinar con una secuencia a través del método
via
, que se introdujo en la sección sobre flujos.
Además de poder cerrar las conexiones, también sería bueno si pudiéramos mostrar un mensaje de bienvenida a una conexión recién creada. Para hacer esto, una vez más tenemos que ir un poco más allá:
def serverLogic
(conn: Tcp.IncomingConnection)
(implicit system: ActorSystem)
: Flow[ByteString, ByteString, NotUsed]
= Flow.fromGraph(GraphDSL.create() { implicit b ⇒
import GraphDSL.Implicits._
val welcome = Source.single(ByteString(s"Welcome port ${conn.remoteAddress}!/n"))
val logic = b.add(internalLogic)
val concat = b.add(Concat[ByteString]())
welcome ~> concat.in(0)
logic.outlet ~> concat.in(1)
FlowShape(logic.in, concat.out)
})
La función
serverLogic
ahora toma la conexión entrante como parámetro.
Dentro de su cuerpo usamos un DSL que nos permite describir un comportamiento complejo de flujo.
Con
welcome
, creamos una secuencia que solo puede emitir un elemento: el mensaje de bienvenida.
logic
es lo que se describió como
serverLogic
en la sección anterior.
La única diferencia notable es que le agregamos
closeConnection
.
Ahora en realidad viene la parte interesante del DSL.
La función
GraphDSL.create
pone a disposición un generador
b
, que se utiliza para expresar la secuencia como un gráfico.
Con la función
~>
es posible conectar puertos de entrada y salida entre sí.
El componente
Concat
que se usa en el ejemplo puede concatenar elementos y aquí se usa para anteponer el mensaje de bienvenida frente a los otros elementos que salen de
internalLogic
.
En la última línea, solo ponemos a disposición el puerto de entrada de la lógica del servidor y el puerto de salida de la secuencia concatenada porque todos los demás puertos seguirán siendo un detalle de implementación del componente
serverLogic
.
Para una introducción en profundidad al gráfico DSL de Akka Streams, visite la sección correspondiente en la
documentación oficial
.
El ejemplo de código completo del complejo servidor TCP y de un cliente que puede comunicarse con él se puede encontrar
here
.
Cada vez que abra una nueva conexión desde el cliente, debería ver un mensaje de bienvenida y al escribir
"q"
en el cliente debería ver un mensaje que le indica que la conexión se ha cancelado.
Todavía hay algunos temas que no fueron cubiertos por esta respuesta. Especialmente la materialización puede asustar a un lector u otro, pero estoy seguro de que con el material que se cubre aquí, todos deberían poder seguir los próximos pasos por sí mismos. Como ya se dijo, la documentación oficial es un buen lugar para continuar aprendiendo sobre Akka Streams.
La biblioteca de Akka Streams ya viene con una gran cantidad de documentación . Sin embargo, el principal problema para mí es que proporciona demasiado material: me siento abrumado por la cantidad de conceptos que tengo que aprender. Muchos ejemplos mostrados allí se sienten muy pesados y no se pueden traducir fácilmente a casos de uso del mundo real y, por lo tanto, son bastante esotéricos. Creo que da demasiados detalles sin explicar cómo construir todos los bloques de construcción juntos y cómo exactamente ayuda a resolver problemas específicos.
Hay fuentes, sumideros, flujos, etapas de gráficos, gráficos parciales, materialización, un DSL de gráficos y mucho más y simplemente no sé por dónde empezar. La guía de inicio rápido está destinada a ser un punto de partida, pero no la entiendo. Simplemente incluye los conceptos mencionados anteriormente sin explicarlos. Además, los ejemplos de código no se pueden ejecutar: faltan partes, lo que me hace más o menos imposible seguir el texto.
¿Alguien puede explicar los conceptos de fuentes, sumideros, flujos, etapas de gráficos, gráficos parciales, materialización y tal vez algunas otras cosas que me perdí en palabras simples y con ejemplos fáciles que no explican cada detalle (y que probablemente no sean necesarios de todos modos en el principio)?