c# - Obligar a EventProcessorHost a volver a entregar el evento Azure Event Hub eventData al método IEventProcessor.ProcessEvents
.net azure-eventhub (2)
La aplicación utiliza .NET 4.6.1 y el paquete Microsoft.Azure.ServiceBus.EventProcessorHost nuget v2.0.2 , junto con su paquete de dependencia WindowsAzure.ServiceBus v3.0.1 para procesar los mensajes de Azure Event Hub.
La aplicación tiene una implementación de IEventProcessor
. Cuando se lanza una excepción no controlada desde el método ProcessEventsAsync
, EventProcessorHost
nunca reenvía esos mensajes a la instancia en ejecución de IEventProcessor
. (Como anécdota, volverá a enviarse si la aplicación de alojamiento se detiene y se reinicia, o si el alquiler se pierde y se vuelve a obtener).
¿Hay alguna manera de forzar el mensaje de evento que resultó en una excepción que EventProcessorHost
a la implementación de IEventProcessor
?
Una posible solución se presenta en este comentario sobre una pregunta casi idéntica: Reenviar mensajes EventHub no procesados en IEventProcessor.ProcessEventsAsync
El comentario sugiere mantener una copia del último mensaje de evento procesado con éxito y un punto de control explícito utilizando ese mensaje cuando se produce una excepción en ProcessEventsAsync
. Sin embargo, después de implementar y probar una solución de este tipo, EventProcessorHost
aún no se vuelve a enviar. La implementación es bastante simple:
private EventData _lastSuccessfulEvent;
public async Task ProcessEventsAsync(
PartitionContext context,
IEnumerable<EventData> messages)
{
try
{
await ProcessEvents(context, messages); // does actual processing, may throw exception
_lastSuccessfulEvent = messages
.OrderByDescending(ed => ed.SequenceNumber)
.First();
}
catch(Exception ex)
{
await context.CheckpointAsync(_lastSuccessfulEvent);
}
}
Un análisis de cosas en acción:
Una muestra de registro parcial está disponible aquí: https://gist.github.com/ttbjj/4781aa992941e00e4e15e0bf1c45f316#file-gistfile1-txt
Respuesta simple: ¿Has probado EventProcessorHost.ResetConnection (string partiotionId) ?
Respuesta compleja: podría ser un problema de arquitectura que debe abordarse al final, ¿por qué falló el procesamiento? ¿fue un error transitorio? está reintentando la lógica de procesamiento es un posible escenario? Y así...
TLDR : la única forma confiable de reproducir un lote fallido de eventos en IEventProcessor.ProcessEventsAsync
es: - Shutdown
el EventProcessorHost
(también conocido como EPH
) de inmediato , ya sea mediante el uso de eph.UnregisterEventProcessorAsync()
o finalizando el proceso , según la situación. . Esto permitirá que otras instancias de EPH
adquieran el arrendamiento para esta partición y comiencen desde el punto de control anterior.
Antes de explicar esto, quiero señalar que esta es una gran pregunta y, de hecho, fue una de las elecciones de diseño más difíciles que tuvimos que hacer para EPH
. En mi opinión, se trataba de un compromiso b / w: usability
/ supportability
del marco de EPH
, sobre la Technical-Correctness
.
La situación ideal habría sido: cuando el código de usuario en IEventProcessorImpl.ProcessEventsAsync
arroja una excepción, la biblioteca EPH
no debería detectar esto. Debería haber dejado pasar esta Exception
: bloquear el proceso y el crash-dump
la callstack
muestra claramente la callstack
responsable. Todavía creo, esta es la solución más technically-correct
.
Situación actual : el contrato de IEventProcessorImpl.ProcessEventsAsync
API & EPH
es,
- siempre que se pueda recibir EventData del servicio EventHubs - continúe invocando la devolución de llamada del usuario (
IEventProcessorImplementation.ProcessEventsAsync
) conEventData''s
y si la devolución de llamada del usuario arroja errores al invocar, notifiqueEventProcessorOptions.ExceptionReceived
. - El código de usuario dentro de
IEventProcessorImpl.ProcessEventsAsync
debe manejar todos los errores e incorporarRetry''s
según sea necesario.EPH
no establece ningún tiempo de espera en esta devolución de llamada para otorgarles a los usuarios control total sobre el tiempo de procesamiento. - Si un evento específico es la causa del problema, marque
EventData
con una propiedad especial, por ejemplo: type =poison-event
y vuelva a enviarlo al mismoEventHub
(incluya un puntero al evento real, copie estosEventData.Offset
ySequenceNumber
en las NuevasEventData.ApplicationProperties
) oEventData.ApplicationProperties
a una cola SERVICEBUS o almacénelo en otro lugar, básicamente, identifique y difiera el procesamiento del evento poison . - si manejó todos los casos posibles y aún se ejecuta en
Exceptions
- catch''em & shutdownEPH
ofailfast
el proceso con esta excepción. Cuando laEPH
vuelve a subir, comenzará desde donde-izquierda.
¿Por qué no funciona el control de puntería ''el evento anterior''? (Léelo para comprender la EPH
en general):
Detrás de escena, EPH
ejecuta una bomba por receptor de la partición EventHub Consumergroup, cuyo trabajo es iniciar el receptor desde un checkpoint
determinado (si está presente) y crear una instancia dedicada de la implementación de IEventProcessor
y luego receive
desde la partición EventHub designada desde el Offset
especificado en el punto de control (si no está presente - EventProcessorOptions.initialOffsetProvider
) y finalmente invocar IEventProcessorImpl.ProcessEventsAsync
. El objetivo del Checkpoint
de Checkpoint
es poder comenzar a procesar mensajes de manera confiable, cuando el proceso de EPH
se apaga y la propiedad de la partición se mueve a otras instancias de EPH
. Por lo tanto, el checkpoint
se consumirá solo al encender la BOMBA y NO se leerá una vez que la bomba haya comenzado.
Mientras escribo esto, EPH
está en la versión 2.2.10 ...