scala akka future akka-http

scala - Akka HTTP: el bloqueo en un futuro bloquea el servidor



future akka-http (2)

Extraño, pero para mí todo funciona bien (sin bloqueo). Aquí está el código:

import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.Route import akka.stream.ActorMaterializer import scala.concurrent.Future object Main { implicit val system = ActorSystem() implicit val executor = system.dispatcher implicit val materializer = ActorMaterializer() val routes: Route = (post & entity(as[String])) { e => complete { Future { Thread.sleep(5000) e } } } ~ (get & path(Segment)) { r => complete { "get" } } def main(args: Array[String]) { Http().bindAndHandle(routes, "0.0.0.0", 9000).onFailure { case e => system.shutdown() } } }

También puede envolver su código asíncrono en la directiva onComplete o onSuccess :

onComplete(Future{Thread.sleep(5000)}){e} onSuccess(Future{Thread.sleep(5000)}){complete(e)}

Estoy tratando de usar Akka HTTP para autenticar mi solicitud básica. Resulta que tengo un recurso externo para autenticar, así que tengo que hacer una llamada de descanso a este recurso.

Esto lleva algo de tiempo, y mientras se procesa, parece que el resto de mi API está bloqueada, esperando esta llamada. He reproducido esto con un ejemplo muy simple:

// used dispatcher: implicit val system = ActorSystem() implicit val executor = system.dispatcher implicit val materializer = ActorMaterializer() val routes = (post & entity(as[String])) { e => complete { Future{ Thread.sleep(5000) e } } } ~ (get & path(Segment)) { r => complete { "get" } }

Si publico en el punto final del registro, mi punto final también está bloqueado esperando los 5 segundos, que dictaminó el punto final del registro.

¿Es este comportamiento esperado, y si es así, cómo puedo hacer operaciones de bloqueo sin bloquear toda mi API?


Lo que observa es el comportamiento esperado, pero por supuesto es muy malo. Es bueno que las soluciones conocidas y las mejores prácticas existan para protegerse de ellas. En esta respuesta, me gustaría dedicar algo de tiempo para explicar el problema a corto, largo plazo y luego en profundidad: ¡disfruta la lectura!

Respuesta corta : " ¡no bloquee la infraestructura de enrutamiento! ", ¡Utilice siempre un despachador dedicado para operaciones de bloqueo!

Causa del síntoma observado: el problema es que está utilizando context.dispatcher como el despachador en el que se ejecutan los futuros de bloqueo. La infraestructura de enrutamiento utiliza el mismo despachador (que en términos simples es simplemente un "grupo de hilos") para manejar realmente las solicitudes entrantes, por lo que si bloquea todos los hilos disponibles, terminará muriendo de hambre en la infraestructura de enrutamiento. (Una cosa para debate y evaluación comparativa es si Akka HTTP podría proteger de esto, lo agregaré a mi lista de tareas pendientes de investigación).

El bloqueo debe tratarse con especial cuidado para no afectar a otros usuarios del mismo operador (por eso es tan sencillo separar la ejecución en diferentes), como se explica en la sección Documentos de Akka: El bloqueo requiere una administración cuidadosa .

Otra cosa que quería llamar la atención aquí es que uno debe evitar el bloqueo de las API si es posible : si su operación de larga ejecución no es realmente una operación, sino una serie de las mismas, podría haberlas separado en diferentes actores o futuros secuenciados. De todos modos, solo quería señalar, si es posible, evitar esas llamadas de bloqueo, pero si es necesario, a continuación, lo siguiente explica cómo lidiar adecuadamente con ellas.

Análisis en profundidad y soluciones :

Ahora que sabemos lo que está mal, conceptualmente, echemos un vistazo a qué se rompe exactamente en el código anterior y cómo es la solución correcta para este problema:

Color = estado del hilo:

  • turquesa - DORMIR
  • naranja - ESPERANDO
  • verde - RUNNABLE

Ahora investiguemos 3 piezas de código y cómo afectan los despachadores, y el rendimiento de la aplicación. Para forzar este comportamiento, la aplicación ha recibido la siguiente carga:

  • [a] sigue solicitando solicitudes GET (ver el código anterior en la pregunta inicial para eso), no está bloqueando allí
  • [b] luego, después de un tiempo, se activan las solicitudes POST 2000, lo que causará el bloqueo de 5 segundos antes de devolver el futuro

1) [bad] Comportamiento del distribuidor en código incorrecto :

// BAD! (due to the blocking in Future): implicit val defaultDispatcher = system.dispatcher val routes: Route = post { complete { Future { // uses defaultDispatcher Thread.sleep(5000) // will block on the default dispatcher, System.currentTimeMillis().toString // starving the routing infra } } }

Así que exponemos nuestra aplicación a [a] carga, y puede ver varios hilos de akka.actor.default-dispatcher ya - están manejando las solicitudes - pequeño fragmento verde, y naranja significa que los otros están realmente inactivos allí.

Luego comenzamos la carga [b], que causa el bloqueo de estos subprocesos: puede ver un subproceso inicial "default-dispatcher-2,3,4" yendo al bloqueo después de estar inactivo antes. También observamos que el pool crece: se inician nuevos hilos "default-dispatcher-18,19,20,21 ..."; sin embargo, se ponen a dormir inmediatamente (!). ¡Estamos desperdiciando un recurso precioso aquí!

El número de dichos hilos iniciados depende de la configuración predeterminada del despachador, pero probablemente no excederá de 50 o más. Dado que acabamos de disparar 2k operaciones de bloqueo, nos morimos de hambre todo el grupo de subprocesos - las operaciones de bloqueo dominan de modo que el enrutamiento infra no tiene ningún hilo disponible para manejar las otras solicitudes - muy mal!

Hagamos algo al respecto (que es una buena práctica de Akka, por cierto, siempre aislar el comportamiento de bloqueo como se muestra a continuación):

2) [good!] Comportamiento del despachador buen código estructurado / despachadores :

En su application.conf configure este despachador dedicado para el comportamiento de bloqueo:

my-blocking-dispatcher { type = Dispatcher executor = "thread-pool-executor" thread-pool-executor { // in Akka previous to 2.4.2: core-pool-size-min = 16 core-pool-size-max = 16 max-pool-size-min = 16 max-pool-size-max = 16 // or in Akka 2.4.2+ fixed-pool-size = 16 } throughput = 100 }

Debería leer más en la documentación de Akka Dispatchers , para comprender las diversas opciones aquí. Sin embargo, el punto principal es que elegimos un ThreadPoolExecutor que tiene un límite ThreadPoolExecutor de hilos que mantiene disponible para las operaciones de bloqueo. La configuración de tamaño depende de lo que hace su aplicación y de cuántos núcleos tiene su servidor.

A continuación, tenemos que usarlo, en lugar del predeterminado:

// GOOD (due to the blocking in Future): implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher") val routes: Route = post { complete { Future { // uses the good "blocking dispatcher" that we configured, // instead of the default dispatcher – the blocking is isolated. Thread.sleep(5000) System.currentTimeMillis().toString } } }

Presionamos la aplicación con la misma carga, primero un poco de solicitudes normales y luego agregamos las de bloqueo. Así es como se comportarán las ThreadPools en este caso:

Por lo tanto, inicialmente las solicitudes normales son manejadas fácilmente por el despachador predeterminado, puede ver algunas líneas verdes allí; esa es la ejecución real (realmente no estoy poniendo al servidor bajo mucha carga, por lo que está prácticamente inactivo).

Ahora cuando comenzamos a emitir las operaciones de bloqueo, my-blocking-dispatcher-* entra en acción y comienza hasta la cantidad de subprocesos configurados. Maneja todos los Dormir allí. Además, después de un cierto período de no pasar nada en esos hilos, los apaga. Si tuviéramos que golpear el servidor con otro bloque de bloqueo, el grupo comenzaría nuevos hilos que se encargarían del sueño (), pero mientras tanto, no estamos desperdiciando nuestros preciosos hilos en "solo quédense allí y hacer nada".

Al usar esta configuración, el rendimiento de las solicitudes GET normales no se vio afectado, todavía se sirvieron felizmente en el despachador predeterminado (todavía bastante libre).

Esta es la forma recomendada de tratar con cualquier tipo de bloqueo en aplicaciones reactivas. A menudo se conoce como "bulkheading" (o "aislar") las partes que se comportan mal en una aplicación, en este caso el mal comportamiento es dormir / bloquear.

3) [workaround-ish] comportamiento del despachador cuando se blocking correctamente :

En este ejemplo, usamos el método scaladoc for scala.concurrent.blocking que puede ayudar cuando se enfrentan operaciones de bloqueo. Por lo general, hace que se generen más hilos para sobrevivir a las operaciones de bloqueo.

// OK, default dispatcher but we''ll use `blocking` implicit val dispatcher = system.dispatcher val routes: Route = post { complete { Future { // uses the default dispatcher (it''s a Fork-Join Pool) blocking { // will cause much more threads to be spun-up, avoiding starvation somewhat, // but at the cost of exploding the number of threads (which eventually // may also lead to starvation problems, but on a different layer) Thread.sleep(5000) System.currentTimeMillis().toString } } } }

La aplicación se comportará así:

Notarás que se crearon MUCHOS hilos nuevos, esto es porque el bloqueo sugiere "oh, esto será un bloqueo, por lo que necesitamos más hilos". Esto hace que el tiempo total que estamos bloqueados sea más pequeño que en el ejemplo 1), sin embargo, luego tenemos cientos de hilos que no hacen nada una vez que las operaciones de bloqueo han terminado ... Claro, eventualmente se cerrarán (el FJP hace esto ), pero por un tiempo tendremos una cantidad grande (no controlada) de subprocesos en ejecución, en contraste con la 2) solución, donde sabemos exactamente cuántos hilos estamos dedicando para los comportamientos de bloqueo.

En resumen : nunca bloquee al despachador predeterminado :-)

La mejor práctica es usar el patrón que se muestra en 2) , tener un despachador para las operaciones de bloqueo disponibles y ejecutarlas allí.

Espero que esto ayude, feliz hakking !

Se discutió la versión HTTP de Akka : 2.0.1

Profiler used: Muchas personas me han preguntado en respuesta a esta respuesta en privado qué profiler solía visualizar los estados de subprocesos en las fotos de arriba, así que YourKit esta información aquí: utilicé YourKit que es un perfilador comercial impresionante (gratis para OSS), aunque puede obtener los mismos resultados utilizando VisualVM gratuito de OpenJDK .