scala - primaria - planos para niños
Diferencia entre mapa y mapaAsync (1)
Firma
La diferencia se resalta mejor en las signatures : Flow.map
toma una función que devuelve un tipo T
mientras Flow.mapAsync
toma una función que devuelve un tipo Future[T]
.
Ejemplo practico
Como ejemplo, supongamos que tenemos una función que consulta en la base de datos el nombre completo de un usuario basado en una identificación de usuario:
type UserID = String
type FullName = String
val databaseLookup : UserID => FullName = ??? //implementation unimportant
Dada una Source
de valores akka stream, podríamos usar Flow.map
dentro de un Stream para consultar la base de datos e imprimir los nombres completos en la consola:
val userIDSource : Source[UserID, _] = ???
val stream =
userIDSource.via(Flow[UserID].map(databaseLookup))
.to(Sink.foreach[FullName](println))
.run()
Una limitación de este enfoque es que esta secuencia solo realizará una consulta de 1 db a la vez. La consulta en serie será un "cuello de botella" y probablemente evitará el rendimiento máximo en nuestra transmisión.
Podríamos intentar mejorar el rendimiento a través de consultas concurrentes utilizando un Future
:
def concurrentDBLookup(userID : UserID) : Future[FullName] =
Future { databaseLookup(userID) }
val concurrentStream =
userIDSource.via(Flow[UserID].map(concurrentDBLookup))
.to(Sink.foreach[Future[FullName]](_ foreach println))
.run()
El problema con este apéndice simplista es que hemos eliminado efectivamente la contrapresión.
El Fregadero está tirando del Futuro y está agregando una foreach println
, que es relativamente rápida en comparación con las consultas de la base de datos. El flujo continuará propagando la demanda a la Fuente y Flow.map
más Futuros dentro del Flow.map
. Por lo tanto, no hay límite para el número de databaseLookup
de databaseLookup
ejecutan simultáneamente. La consulta paralela sin restricciones eventualmente podría sobrecargar la base de datos.
Flow.mapAsync
al rescate; podemos tener acceso simultáneo a la base de datos y, al mismo tiempo, limitar el número de búsquedas simultáneas:
val maxLookupCount = 10
val maxLookupConcurrentStream =
userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup))
.to(Sink.foreach[FullName](println))
.run()
También Sink.foreach
cuenta que Sink.foreach
hizo más simple, ya no toma un Future[FullName]
sino un FullName
lugar.
Mapa asíncrono desordenado
Si no es necesario mantener un orden secuencial de los ID de usuario a los Flow.mapAsyncUnordered
entonces puede usar Flow.mapAsyncUnordered
. Por ejemplo: solo necesita imprimir todos los nombres en la consola, pero no le importó el orden en que se imprimieron.
¿Alguien puede explicarme la diferencia entre el mapa y el mapa Async wrt AKKA stream? En la documentación se dice que
Las transformaciones de flujo y los efectos secundarios que involucran servicios externos no basados en flujo se pueden realizar con mapAsync o mapAsyncUnordered
¿Por qué no podemos simplemente nosotros mapa aquí? ¿Supongo que el flujo, la fuente, el sumidero serían todos de naturaleza monádica y, por lo tanto, el mapa debería funcionar bien con el retraso en la naturaleza de estos?