scala routing actor akka fault-tolerance

scala - Akka Actor no termina si se lanza una excepción



routing fault-tolerance (3)

Actualmente estoy tratando de comenzar con Akka y estoy enfrentando un problema extraño. Tengo el siguiente código para mi Actor:

class AkkaWorkerFT extends Actor { def receive = { case Work(n, c) if n < 0 => throw new Exception("Negative number") case Work(n, c) => self reply n.isProbablePrime(c); } }

Y así es como comienzo a mis trabajadores:

val workers = Vector.fill(nrOfWorkers)(actorOf[AkkaWorkerFT].start()); val router = Routing.loadBalancerActor(SmallestMailboxFirstIterator(workers)).start()

Y así es como lo cierro todo:

futures.foreach( _.await ) router ! Broadcast(PoisonPill) router ! PoisonPill

Ahora, lo que sucede es que si envío mensajes a los trabajadores con n> 0 (no se lanza ninguna excepción), todo funciona bien y la aplicación se apaga correctamente. Sin embargo, tan pronto como le envío un mensaje único que da como resultado una excepción, la aplicación no finaliza porque todavía hay un actor ejecutándose, pero no puedo averiguar de dónde viene.

En caso de que ayude, esta es la pila del hilo en cuestión:

Thread [akka:event-driven:dispatcher:event:handler-6] (Suspended) Unsafe.park(boolean, long) line: not available [native method] LockSupport.park(Object) line: 158 AbstractQueuedSynchronizer$ConditionObject.await() line: 1987 LinkedBlockingQueue<E>.take() line: 399 ThreadPoolExecutor.getTask() line: 947 ThreadPoolExecutor$Worker.run() line: 907 MonitorableThread(Thread).run() line: 680 MonitorableThread.run() line: 182

PD: El hilo que no termina no es ninguno de los subprocesos de trabajo, porque he agregado una devolución de llamada post-parada, cada uno de ellos se detiene correctamente.

PPS: Actors.registry.shutdownAll soluciones al problema, pero creo que shutdownAll solo debe usarse como último recurso, ¿no es así?


Apagar el registro para asegurarse de que las cosas terminen, como lo propuso Viktor, es un poco extraño. Lo que puedes hacer en cambio es:

EventHandler.shutdown()

que apaga limpiamente todos los oyentes (registradores) que mantienen el mundo en funcionamiento después de la excepción:

def shutdown() { foreachListener(_.stop()) EventHandlerDispatcher.shutdown() }


Gire el registrador en el akka.conf


La forma correcta de manejar los problemas dentro de los actores Akka no es arrojar una excepción, sino establecer jerarquías de supervisor

"Lanzar una excepción en código concurrente (supongamos que estamos usando actores no vinculados), simplemente hará explotar el hilo que actualmente ejecuta el actor.

No hay forma de descubrir que las cosas salieron mal (aparte de inspeccionar el seguimiento de la pila). No hay nada que puedas hacer al respecto ".

vea Tolerancia a fallas a través de jerarquías de supervisor (1.2)

* note * lo anterior es cierto para las versiones anteriores de Akka (1.2). En las versiones más nuevas (por ejemplo, 2.2) aún establecería una jerarquía de supervisor, pero atrapará las excepciones lanzadas por procesos secundarios. p.ej

class Child extends Actor { var state = 0 def receive = { case ex: Exception ⇒ throw ex case x: Int ⇒ state = x case "get" ⇒ sender ! state } }

y en el supervisor:

class Supervisor extends Actor { import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy._ import scala.concurrent.duration._ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: ArithmeticException ⇒ Resume case _: NullPointerException ⇒ Restart case _: IllegalArgumentException ⇒ Stop case _: Exception ⇒ Escalate } def receive = { case p: Props ⇒ sender ! context.actorOf(p) } }

vea Tolerancia a fallas a través de jerarquías de supervisor (2.2)