c# masstransit saga

c# - Cómo implementar una saga usando un patrón scatter/Gather en MassTransit 3.0



(3)

¿No podría "simplemente" pasar el objeto a lo largo de la cola, como un parámetro de evento? Cuando el oyente de la saga obtiene un evento de "orden completada", ¿contendría el objeto que se completa en el evento?

Me imagino que se envió a la cola a través de un método genérico, donde el objeto debe implementar IFoodOrdered

Luego, puedes implementar un método virtual que la saga pueda usar para hacer lo "genérico" cuando se recoja, y solo tienes que implementar sobrecargas para esos artículos especiales, que requieren que algo especial suceda.

Jimmy Boagard describe una cadena de comida rápida de McDonalds here comparándola con un patrón de recolección dispersa.

Imagen de flujo de trabajo robada del artículo anterior:

Pensamientos iniciales de implementación:

Tener una interfaz común para todos los tipos de eventos ordenados por alimentos que obtendrían todas las estaciones de alimentos y luego cada estación de alimentos podría consumir / crear su artículo respectivo y publicar un evento hecho común. Ej: las papas fritas y la estación de hamburguesas reciben un mensaje con respecto a una orden de papas fritas, la estación de papas fritas consume la orden anuncia un ItemDoneEvent que la saga está escuchando.

Preocupaciones iniciales:

Como a Saga no le importa el tipo de comida que se complete con el solo hecho de que toda la comida se haya completado, esta sería una buena solución. Sin embargo, después de leer las advertencias here sobre el uso compartido de colas y observar que Consumer.Conditional filtering se ha eliminado con MassTransit 3.0 , parece que el marco dice que "Bad Things (TM) sucederá" con este tipo de enfoque. Pero no estoy seguro de qué otra cosa lo haría sin crear un mensaje de solicitud y respuesta y un evento correlativo para cada artículo de comida en la cocina. Ej: papas fritas, hamburguesas, papas fritas, galletitas cocidas. ¿Esto sería muy tedioso si tuviera que hacer eso para cada artículo en la cocina?

Teniendo en cuenta las preocupaciones anteriores, ¿cómo sería un buen ejemplo de saga para este tipo de flujo de trabajo?


El problema con la devolución de los eventos terminados a la saga es que crea contención en un recurso compartido (es decir, el estado saga).

Jim tiene otra publicación que vino después de la que mencionaste que describe el problema y la solución. Por supuesto, él está hablando específicamente de NServiceBus, pero el problema y los conceptos son los mismos.

https://lostechies.com/jimmybogard/2014/02/27/reducing-nservicebus-saga-load/

Crea un almacenamiento externo. Ponga un registro para cada elemento de trabajo. Permita que cada uno de los trabajadores configure su propio trabajo para que se complete mientras la saga sondea de manera efectiva usando mensajes retrasados ​​para ver si todo el trabajo está hecho.

Entonces todavía está haciendo scatter-gather, pero el "agregador" ha sido reemplazado por el patrón del administrador de procesos para reducir la contención.


Entré en un problema similar: necesito publicar algunos docenas de comandos (la misma interfaz, IMyRequest ) y esperar todo.

En realidad, mi comando inicia otra saga, que publica IMyRequestDone al final del proceso sin completar la saga de marcado. (Necesito completarlos en algún momento más tarde). Entonces, en lugar de guardar el número de sagas anidadas completas en la saga padre, solo consulto las instancias del estado de la saga infantil.

Compruebe en cada mensaje de MyRequestDone :

Schedule(() => FailSagaOnRequestsTimeout, x => x.CheckToken, x => { // timeout for all requests x.Delay = TimeSpan.FromMinutes(10); x.Received = e => e.CorrelateById(context => context.Message.CorrelationId); }); During(Active, When(Xxx) .ThenAsync(async context => { await context.Publish(context => new MyRequestCommand(context.Instance, "foo")); await context.Publish(context => new MyRequestCommand(context.Instance, "bar")); context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow + FailSagaOnRequestsTimeout.Delay; context.Instance.WaitingMyResponsesCount = 2; }) .TransitionTo(WaitingMyResponses) .Schedule(FailSagaOnRequestsTimeout, context => new FailSagaCommand(context.Instance)) ); During(WaitingMyResponses, When(MyRequestDone) .Then(context => { if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow) throw new TimeoutException(); }) .If(context => { var db = serviceProvider.GetRequiredService<DbContext>(); var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList(); var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount && requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing)); // assume 3 states of request - Processing, Done and Failed return allDone; }, x => x .Unschedule(FailSagaOnRequestsTimeout) .TransitionTo(Active)) ) .Catch<TimeoutException>(x => x.TransitionTo(Failed)) ); During(WaitingMyResponses, When(FailSagaOnRequestsTimeout.Received) .TransitionTo(Failed)

Compruebe periódicamente que todas las solicitudes se hayan realizado (por "Reducción de la carga de NServiceBus Saga"):

Schedule(() => CheckAllRequestsDone, x => x.CheckToken, x => { // check interval x.Delay = TimeSpan.FromSeconds(15); x.Received = e => e.CorrelateById(context => context.Message.CorrelationId); }); During(Active, When(Xxx) .ThenAsync(async context => { await context.Publish(context => new MyRequestCommand(context.Instance, "foo")); await context.Publish(context => new MyRequestCommand(context.Instance, "bar")); context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow.AddMinutes(10); context.Instance.WaitingMyResponsesCount = 2; }) .TransitionTo(WaitingMyResponses) .Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance)) ); During(WaitingMyResponses, When(CheckAllRequestsDone.Recieved) .Then(context => { var db = serviceProvider.GetRequiredService<DbContext>(); var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList(); var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount && requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing)); if (!allDone) { if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow + CheckAllRequestsDone.Delay) throw new TimeoutException(); throw new NotAllDoneException(); } }) .TransitionTo(Active) .Catch<NotAllDoneException>(x => x.Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance))) .Catch<TimeoutException>(x => x.TransitionTo(Failed));