java multithreading exception-handling akka message-queue

java - Esperando indefinidamente un mensaje que puede que nunca llegue



multithreading exception-handling (1)

Tengo un actor de Java que es responsable de filtrar / reintentar la lógica en un recurso externo que puede no estar disponible temporalmente Los campos y métodos comunes del actor son:

public class MyActorImpl implements MyActor { private static final long MINWAIT = 50; private static final long MAXWAIT = 1000; private static final long DEFAULTWAIT = 0; private static final double BACKOFFMULTIPLIER = 1.5; private long updateWait(long currentWait) { return Math.min(Math.max((long) (currentWait * BACKOFFMULTIPLIER), MINWAIT), MAXWAIT); } // mutable private long opWait = DEFAULTWAIT; private final Queue<OpInput> opBuffer = new ArrayDeque<>(); // called from external actor public void operation(OpInput opInput) { operation(opInput, DEFAULTWAIT); } // called internally public void operation(OpInput opInput, long currentWait); }

El actor tiene varias operaciones que tienen más o menos la misma lógica de reintento / búfer; Cada operación tiene sus propios campos [op]Wait y [op]Buffer .

  1. El actor principal llama a la void operation(OpInput opInput)
  2. El método anterior llama a la void operation(OpInput opInput, long currentWait) usando DEFAULTWAIT para el segundo parámetro
  3. Si el parámetro currentWait no es igual a opWait entonces la entrada se almacena en opBuffer , de lo contrario, la entrada se envía al recurso externo.
  4. Si el recurso externo devuelve un éxito, opWait se establece en DEFAULTWAIT , y el contenido de opBuffer se envía de vuelta a través del método de operation(opInput) . Si el recurso externo (o más probablemente la red) devuelve un error, entonces actualizo opWait = updateWait(opWait) y programo la operation(opInput, opWait) en el programador del sistema actor con un retraso de opWait ms.

Es decir, estoy usando el programador del sistema actor para implementar el retroceso exponencial; Estoy usando el parámetro currentWait para identificar el mensaje que estoy reintentando y estoy guardando en el búfer los otros mensajes hasta que el recurso externo procese el mensaje primario.

El problema es que si el mensaje de operation(opInput, currentWait) programada operation(opInput, currentWait) se pierde, entonces los mensajes se almacenarán en búfer para siempre porque el currentWait == opWait guard fallará para todos los demás mensajes. Podría usar algo como spring-retry para implementar un retroceso exponencial, pero no veo una manera de fusionar los ciclos de reintentos de las operaciones, lo que significa que podría estar usando un hilo por ciclo de reintentos (mientras que usar el programador del sistema actor no pone mucho más de una tensión en el sistema).

Estoy buscando una forma más tolerante a fallas para implementar el almacenamiento en búfer y el retroceso exponencial en la interfaz entre un actor y un recurso externo sin tener que asignar demasiados recursos a la tarea.


Si te estoy entendiendo correctamente, si el único problema es perder el mensaje programado, ¿por qué no usas algo como el Patrón de Proxy Confiable para ese mensaje en particular y luego, si falla, opWait = DEFAULTWAIT;

Hay algunas cosas acerca de su código que recibo, no entiendo lo que quiere decir cuando dice que public void operation(OpInput opInput) se llama externamente. ¿Quiere decir que este método está interactuando con la red, que utiliza recursos que a veces no están disponibles?

Si puedo, puedo sugerir una alternativa. Por lo que entiendo, su principal problema es que tiene recursos que no están disponibles a veces, por lo que tiene algún tipo de que / buffer que implementa con algún tipo de lógica de espera para que el mensaje se procese una vez que esté disponible nuevamente, lo que desafortunadamente involucra algunos mensajes que podrían perderse y resultar en una espera infinita. Creo que podrías lograr lo que quieres usando Futures con tiempos de espera. Luego vuelva a intentarlo si el futuro no se completa en esa cantidad de tiempo hasta 3 repeticiones. Incluso podría ajustar este tiempo en función de la carga del servidor y el tiempo que lleva completar un mensaje. Espero que ayude.