actors multithreading scala concurrency akka actor

multithreading - actors akka



Bloqueo de llamadas en Akka Actors (3)

Como novato, estoy tratando de entender cómo trabajan los actores. Y, a partir de la documentación, creo que entiendo que los actores son objetos que se ejecutan en modo de sincronización y que la ejecución de actores puede contener llamadas de método de bloqueo / sincronización, por ejemplo, solicitudes de db

Pero, lo que no entiendo es que si escribe un actor que tiene algunas invocaciones de bloqueo en su interior (como una ejecución de consulta de bloqueo), desordenará todo el conjunto de subprocesos (en el sentido de que la utilización de la CPU se reducirá, etc.) ), derecho ? Quiero decir, según tengo entendido, no hay forma de que JVM comprenda si puede cambiar ese hilo a otra persona, si / cuando el actor realiza una llamada de bloqueo.

Entonces, dada la naturaleza de la concurrencia, ¿no debería ser obvio que los Actores no deberían estar haciendo llamadas de bloqueo, nunca?

Si ese es el caso, ¿cuál es la forma recomendada de hacer una llamada sin bloqueo / asíncrona, digamos que una llamada de servicio web que busca algo y envía un mensaje a otro actor cuando se completa esa solicitud? Deberíamos simplemente usar algo como dentro del actor:

mapa futuro {respuesta => x! response.body}

¿Es esta la forma correcta de manejar esto?

Le agradecería que me lo aclarara.


Realmente depende del caso de uso. Si las consultas no necesitan ser serializadas, entonces puede ejecutar la consulta en un futuro y enviar los resultados al remitente de la siguiente manera:

import scala.concurrent.{ future, blocking} import akka.pattern.pipe val resFut = future { blocking { executeQuery() } } resFut pipeTo sender

También puede crear un distribuidor dedicado exclusivamente para las llamadas de base de datos y utilizar un enrutador para la creación de actores. De esta manera también puede limitar fácilmente el número de solicitudes de base de datos concurrentes.


Realmente excelente introducción "La Guía de Neophyte a Scala Parte 14: El enfoque de actor a la concurrencia" http://danielwestheide.com/blog/2013/02/27/the-neophytes-guide-to-scala-part-14-the-actor-approach-to-concurrency.html .

El actor recibe el mensaje, ajusta el código de bloqueo al futuro, en su método Future.onSuccess: envía los resultados utilizando otros mensajes asíncronos. Pero tenga en cuenta que la variable del remitente podría cambiar, así que ciérrela (haga una referencia local en el objeto futuro).

ps: La Guía de Neophyte a Scala - realmente un gran libro.

Actualizado: (código de muestra añadido)

Tenemos trabajador y gerente. El administrador establece el trabajo a realizar, los informes de los trabajadores "lo tienen" e inician el proceso largo (sueño 1000). Mientras tanto, el administrador de pings del sistema con mensajes "vivos" y el pings del administrador con ellos. Cuando el trabajo realizado, el trabajador notifica al administrador en él

NB: la ejecución de la suspensión 1000 se realizó en el ejecutor del grupo de subprocesos "por defecto / global" importado - puede obtener un hambre de subprocesos. NB: val comandante = el remitente es necesario para "cerrar" una referencia al remitente original, porque cuando se ejecutará onSuccess - el remitente actual dentro del actor ya podría estar configurado para algún otro "remitente" ...

Iniciar sesión:

01:35:12:632 Humming ... 01:35:12:633 manager: flush sent 01:35:12:633 worker: got command 01:35:12:633 manager alive 01:35:12:633 manager alive 01:35:12:633 manager alive 01:35:12:660 worker: started 01:35:12:662 worker: alive 01:35:12:662 manager: resource allocated 01:35:12:662 worker: alive 01:35:12:662 worker: alive 01:35:13:661 worker: done 01:35:13:663 manager: work is done 01:35:17:633 Shutdown!

Código:

import akka.actor.{Props, ActorSystem, ActorRef, Actor} import com.typesafe.config.ConfigFactory import java.text.SimpleDateFormat import java.util.Date import scala.concurrent._ import ExecutionContext.Implicits.global object Sample { private val fmt = new SimpleDateFormat("HH:mm:ss:SSS") def printWithTime(msg: String) = { println(fmt.format(new Date()) + " " + msg) } class WorkerActor extends Actor { protected def receive = { case "now" => val commander = sender printWithTime("worker: got command") future { printWithTime("worker: started") Thread.sleep(1000) printWithTime("worker: done") }(ExecutionContext.Implicits.global) onSuccess { // here commander = original sender who requested the start of the future case _ => commander ! "done" } commander ! "working" case "alive?" => printWithTime("worker: alive") } } class ManagerActor(worker: ActorRef) extends Actor { protected def receive = { case "do" => worker ! "now" printWithTime("manager: flush sent") case "working" => printWithTime("manager: resource allocated") case "done" => printWithTime("manager: work is done") case "alive?" => printWithTime("manager alive") worker ! "alive?" } } def main(args: Array[String]) { val config = ConfigFactory.parseString("" + "akka.loglevel=DEBUG/n" + "akka.debug.lifecycle=on/n" + "akka.debug.receive=on/n" + "akka.debug.event-stream=on/n" + "akka.debug.unhandled=on/n" + "" ) val system = ActorSystem("mine", config) val actor1 = system.actorOf(Props[WorkerActor], "worker") val actor2 = system.actorOf(Props(new ManagerActor(actor1)), "manager") actor2 ! "do" actor2 ! "alive?" actor2 ! "alive?" actor2 ! "alive?" printWithTime("Humming ...") Thread.sleep(5000) printWithTime("Shutdown!") system.shutdown() } }


Tienes razón al pensar en Thread Pool si estás considerando hacer llamadas bloqueadas en Akka. Cuanto más bloqueos hagas, mayor será el Thread Pool que necesitarás. Un sistema completamente no bloqueante realmente solo necesita un conjunto de subprocesos igual al número de núcleos de CPU de su máquina. La configuración de referencia utiliza un conjunto de 3 veces el número de núcleos de CPU en la máquina para permitir algún bloqueo:

# The core pool size factor is used to determine thread pool core size # using the following formula: ceil(available processors * factor). # Resulting size is then bounded by the core-pool-size-min and # core-pool-size-max values. core-pool-size-factor = 3.0

source

Pero es posible que desee aumentar akka.default-dispatcher.fork-join-executor.core-pool-size-factor a un número mayor si hace más bloqueo, o cree un despachador diferente al predeterminado específicamente para bloquear llamadas con un mayor fork-join-executor.core-pool-size-factor

WRT cuál es la mejor forma de bloquear llamadas en Akka. Recomendaría escalar haciendo varias instancias de los actores que bloquean las llamadas y colocando un router delante de ellos para que se vean como un solo actor para el resto de la aplicación.