tipos primaria planos plano para niños mapas mapa los leer entre diferencia como scala akka-stream

scala - primaria - planos para niños



Diferencia entre mapa y mapaAsync (1)

Firma

La diferencia se resalta mejor en las signatures : Flow.map toma una función que devuelve un tipo T mientras Flow.mapAsync toma una función que devuelve un tipo Future[T] .

Ejemplo practico

Como ejemplo, supongamos que tenemos una función que consulta en la base de datos el nombre completo de un usuario basado en una identificación de usuario:

type UserID = String type FullName = String val databaseLookup : UserID => FullName = ??? //implementation unimportant

Dada una Source de valores akka stream, podríamos usar Flow.map dentro de un Stream para consultar la base de datos e imprimir los nombres completos en la consola:

val userIDSource : Source[UserID, _] = ??? val stream = userIDSource.via(Flow[UserID].map(databaseLookup)) .to(Sink.foreach[FullName](println)) .run()

Una limitación de este enfoque es que esta secuencia solo realizará una consulta de 1 db a la vez. La consulta en serie será un "cuello de botella" y probablemente evitará el rendimiento máximo en nuestra transmisión.

Podríamos intentar mejorar el rendimiento a través de consultas concurrentes utilizando un Future :

def concurrentDBLookup(userID : UserID) : Future[FullName] = Future { databaseLookup(userID) } val concurrentStream = userIDSource.via(Flow[UserID].map(concurrentDBLookup)) .to(Sink.foreach[Future[FullName]](_ foreach println)) .run()

El problema con este apéndice simplista es que hemos eliminado efectivamente la contrapresión.

El Fregadero está tirando del Futuro y está agregando una foreach println , que es relativamente rápida en comparación con las consultas de la base de datos. El flujo continuará propagando la demanda a la Fuente y Flow.map más Futuros dentro del Flow.map . Por lo tanto, no hay límite para el número de databaseLookup de databaseLookup ejecutan simultáneamente. La consulta paralela sin restricciones eventualmente podría sobrecargar la base de datos.

Flow.mapAsync al rescate; podemos tener acceso simultáneo a la base de datos y, al mismo tiempo, limitar el número de búsquedas simultáneas:

val maxLookupCount = 10 val maxLookupConcurrentStream = userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup)) .to(Sink.foreach[FullName](println)) .run()

También Sink.foreach cuenta que Sink.foreach hizo más simple, ya no toma un Future[FullName] sino un FullName lugar.

Mapa asíncrono desordenado

Si no es necesario mantener un orden secuencial de los ID de usuario a los Flow.mapAsyncUnordered entonces puede usar Flow.mapAsyncUnordered . Por ejemplo: solo necesita imprimir todos los nombres en la consola, pero no le importó el orden en que se imprimieron.

¿Alguien puede explicarme la diferencia entre el mapa y el mapa Async wrt AKKA stream? En la documentación se dice que

Las transformaciones de flujo y los efectos secundarios que involucran servicios externos no basados ​​en flujo se pueden realizar con mapAsync o mapAsyncUnordered

¿Por qué no podemos simplemente nosotros mapa aquí? ¿Supongo que el flujo, la fuente, el sumidero serían todos de naturaleza monádica y, por lo tanto, el mapa debería funcionar bien con el retraso en la naturaleza de estos?