c# .net azure azure-eventhub

c# - Obligar a EventProcessorHost a volver a entregar el evento Azure Event Hub eventData al método IEventProcessor.ProcessEvents



.net azure-eventhub (2)

La aplicación utiliza .NET 4.6.1 y el paquete Microsoft.Azure.ServiceBus.EventProcessorHost nuget v2.0.2 , junto con su paquete de dependencia WindowsAzure.ServiceBus v3.0.1 para procesar los mensajes de Azure Event Hub.

La aplicación tiene una implementación de IEventProcessor . Cuando se lanza una excepción no controlada desde el método ProcessEventsAsync , EventProcessorHost nunca reenvía esos mensajes a la instancia en ejecución de IEventProcessor . (Como anécdota, volverá a enviarse si la aplicación de alojamiento se detiene y se reinicia, o si el alquiler se pierde y se vuelve a obtener).

¿Hay alguna manera de forzar el mensaje de evento que resultó en una excepción que EventProcessorHost a la implementación de IEventProcessor ?

Una posible solución se presenta en este comentario sobre una pregunta casi idéntica: Reenviar mensajes EventHub no procesados ​​en IEventProcessor.ProcessEventsAsync

El comentario sugiere mantener una copia del último mensaje de evento procesado con éxito y un punto de control explícito utilizando ese mensaje cuando se produce una excepción en ProcessEventsAsync . Sin embargo, después de implementar y probar una solución de este tipo, EventProcessorHost aún no se vuelve a enviar. La implementación es bastante simple:

private EventData _lastSuccessfulEvent; public async Task ProcessEventsAsync( PartitionContext context, IEnumerable<EventData> messages) { try { await ProcessEvents(context, messages); // does actual processing, may throw exception _lastSuccessfulEvent = messages .OrderByDescending(ed => ed.SequenceNumber) .First(); } catch(Exception ex) { await context.CheckpointAsync(_lastSuccessfulEvent); } }

Un análisis de cosas en acción:

Una muestra de registro parcial está disponible aquí: https://gist.github.com/ttbjj/4781aa992941e00e4e15e0bf1c45f316#file-gistfile1-txt


Respuesta simple: ¿Has probado EventProcessorHost.ResetConnection (string partiotionId) ?

Respuesta compleja: podría ser un problema de arquitectura que debe abordarse al final, ¿por qué falló el procesamiento? ¿fue un error transitorio? está reintentando la lógica de procesamiento es un posible escenario? Y así...


TLDR : la única forma confiable de reproducir un lote fallido de eventos en IEventProcessor.ProcessEventsAsync es: - Shutdown el EventProcessorHost (también conocido como EPH ) de inmediato , ya sea mediante el uso de eph.UnregisterEventProcessorAsync() o finalizando el proceso , según la situación. . Esto permitirá que otras instancias de EPH adquieran el arrendamiento para esta partición y comiencen desde el punto de control anterior.

Antes de explicar esto, quiero señalar que esta es una gran pregunta y, de hecho, fue una de las elecciones de diseño más difíciles que tuvimos que hacer para EPH . En mi opinión, se trataba de un compromiso b / w: usability / supportability del marco de EPH , sobre la Technical-Correctness .

La situación ideal habría sido: cuando el código de usuario en IEventProcessorImpl.ProcessEventsAsync arroja una excepción, la biblioteca EPH no debería detectar esto. Debería haber dejado pasar esta Exception : bloquear el proceso y el crash-dump la callstack muestra claramente la callstack responsable. Todavía creo, esta es la solución más technically-correct .

Situación actual : el contrato de IEventProcessorImpl.ProcessEventsAsync API & EPH es,

  1. siempre que se pueda recibir EventData del servicio EventHubs - continúe invocando la devolución de llamada del usuario ( IEventProcessorImplementation.ProcessEventsAsync ) con EventData''s y si la devolución de llamada del usuario arroja errores al invocar, notifique EventProcessorOptions.ExceptionReceived .
  2. El código de usuario dentro de IEventProcessorImpl.ProcessEventsAsync debe manejar todos los errores e incorporar Retry''s según sea necesario. EPH no establece ningún tiempo de espera en esta devolución de llamada para otorgarles a los usuarios control total sobre el tiempo de procesamiento.
  3. Si un evento específico es la causa del problema, marque EventData con una propiedad especial, por ejemplo: type = poison-event y vuelva a enviarlo al mismo EventHub (incluya un puntero al evento real, copie estos EventData.Offset y SequenceNumber en las Nuevas EventData.ApplicationProperties ) o EventData.ApplicationProperties a una cola SERVICEBUS o almacénelo en otro lugar, básicamente, identifique y difiera el procesamiento del evento poison .
  4. si manejó todos los casos posibles y aún se ejecuta en Exceptions - catch''em & shutdown EPH o failfast el proceso con esta excepción. Cuando la EPH vuelve a subir, comenzará desde donde-izquierda.

¿Por qué no funciona el control de puntería ''el evento anterior''? (Léelo para comprender la EPH en general):

Detrás de escena, EPH ejecuta una bomba por receptor de la partición EventHub Consumergroup, cuyo trabajo es iniciar el receptor desde un checkpoint determinado (si está presente) y crear una instancia dedicada de la implementación de IEventProcessor y luego receive desde la partición EventHub designada desde el Offset especificado en el punto de control (si no está presente - EventProcessorOptions.initialOffsetProvider ) y finalmente invocar IEventProcessorImpl.ProcessEventsAsync . El objetivo del Checkpoint de Checkpoint es poder comenzar a procesar mensajes de manera confiable, cuando el proceso de EPH se apaga y la propiedad de la partición se mueve a otras instancias de EPH . Por lo tanto, el checkpoint se consumirá solo al encender la BOMBA y NO se leerá una vez que la bomba haya comenzado.

Mientras escribo esto, EPH está en la versión 2.2.10 ...