events - programacion - Relación entre controladores de comando, agregados, el repositorio y el almacén de eventos en CQRS
arquitectura software hexagonal (3)
Lo siguiente se basa en mi propia experiencia y en mis experimentos con varios frameworks como Lokad.CQRS, NCQRS, etc. Estoy seguro de que hay varias formas de manejar esto. Publicaré lo que tiene más sentido para mí.
1. Creación agregada:
Cada vez que un controlador de comando necesita un agregado, utiliza un repositorio. El repositorio recupera la respectiva lista de eventos del almacén de eventos y llama a un constructor sobrecargado, inyectando los eventos
var stream = eventStore.LoadStream(id)
var User = new User(stream)
Si el agregado no existía antes, la secuencia estará vacía y el objeto recién creado estará en su estado original. Es posible que desee asegurarse de que en este estado solo unos pocos comandos puedan dar vida al agregado, por ejemplo User.Create()
.
2. Almacenamiento de nuevos eventos
El manejo de comandos ocurre dentro de una Unidad de Trabajo . Durante la ejecución del comando, cada evento resultante se agregará a una lista dentro del agregado ( User.Changes
). Una vez que finalice la ejecución, los cambios se agregarán al almacén de eventos. En el siguiente ejemplo, esto sucede en la siguiente línea:
store.AppendToStream(cmd.UserId, stream.Version, user.Changes)
3. Orden de los eventos
Imagínese lo que sucedería si dos eventos posteriores de CustomerMoved
se reproducen en el orden incorrecto.
Un ejemplo
Trataré de ilustrarlo con un trozo de pseudocódigo (deliberadamente dejé las preocupaciones del repositorio dentro del controlador de comando para mostrar lo que sucedería detrás de las escenas):
Servicio de aplicación:
UserCommandHandler
Handle(CreateUser cmd)
stream = store.LoadStream(cmd.UserId)
user = new User(stream.Events)
user.Create(cmd.UserName, ...)
store.AppendToStream(cmd.UserId, stream.Version, user.Changes)
Handle(BlockUser cmd)
stream = store.LoadStream(cmd.UserId)
user = new User(stream.Events)
user.Block(string reason)
store.AppendToStream(cmd.UserId, stream.Version, user.Changes)
Agregar:
User
created = false
blocked = false
Changes = new List<Event>
ctor(eventStream)
foreach (event in eventStream)
this.Apply(event)
Create(userName, ...)
if (this.created) throw "User already exists"
this.Apply(new UserCreated(...))
Block(reason)
if (!this.created) throw "No such user"
if (this.blocked) throw "User is already blocked"
this.Apply(new UserBlocked(...))
Apply(userCreatedEvent)
this.created = true
this.Changes.Add(userCreatedEvent)
Apply(userBlockedEvent)
this.blocked = true
this.Changes.Add(userBlockedEvent)
Actualizar:
Como nota al margen: la respuesta de Yves me recordó un interesante artículo de Udi Dahan de hace un par de años:
Me gustaría conocer algunos detalles de las relaciones entre los controladores de comandos, los agregados, el repositorio y el almacén de eventos en los sistemas basados en CQRS.
Lo que he entendido hasta ahora:
- Los manejadores de comandos reciben comandos del bus. Son responsables de cargar el agregado apropiado del repositorio y llamar a la lógica del dominio en el agregado. Una vez terminado, eliminan el comando del bus.
- Un agregado proporciona comportamiento y un estado interno. El estado nunca es público. La única forma de cambiar el estado es mediante el uso del comportamiento. Los métodos que modelan este comportamiento crean eventos a partir de las propiedades del comando y aplican estos eventos al agregado, que a su vez llama a un controlador de eventos que establece el estado interno en consecuencia.
- El repositorio simplemente permite cargar agregados en una ID determinada y agregar nuevos agregados. Básicamente, el repositorio conecta el dominio al almacén de eventos.
- La tienda de eventos, por último, pero no menos importante, es responsable de almacenar eventos en una base de datos (o el almacenamiento que se use), y volver a cargar estos eventos como una llamada secuencia de eventos.
Hasta aquí todo bien. Ahora hay algunos problemas que aún no recibí:
- Si un manejador de comandos llama el comportamiento en un agregado aún existente, todo es bastante fácil. El controlador de comandos obtiene una referencia al repositorio, llama a su método loadById y se devuelve el agregado. Pero, ¿qué hace el controlador de comando cuando aún no hay agregado, pero se debe crear uno? Según entiendo, el agregado debería ser reconstruido usando los eventos. Esto significa que la creación del agregado se realiza en respuesta a un evento fooCreated. Pero para poder almacenar cualquier evento (incluido el creado por foo), necesito un agregado. Así que esto me parece un problema de gallina y huevo: no puedo crear el agregado sin el evento, pero el único componente que debe crear eventos es el agregado. Básicamente se trata de: ¿Cómo creo nuevos agregados? ¿Quién hace qué?
- Cuando un agregado desencadena un evento, un controlador de eventos interno responde a él (generalmente al ser llamado a través de un método de aplicación) y cambia el estado del agregado. ¿Cómo se entrega este evento al repositorio? ¿Quién origina la acción "por favor envíe los nuevos eventos al repositorio / tienda de eventos"? El agregado en sí? ¿El repositorio mirando el agregado? ¿Alguien más está suscrito a los eventos internos? ...?
- Por último, pero no por eso menos importante, tengo un problema para entender correctamente el concepto de una secuencia de eventos: en mi imaginación, es simplemente algo así como una lista ordenada de eventos. Lo que es importante es que está "ordenado". ¿Es esto correcto?
Una pequeña variación en la excelente respuesta de Dennis:
- Al tratar con casos de uso "creacionales" (es decir, que deberían derivar agregados nuevos), intente encontrar otro agregado o fábrica a donde pueda trasladar esa responsabilidad. Esto no entra en conflicto con tener un ctor que tome eventos para hidratarse (o cualquier otro mecanismo para rehidratarse). A veces la fábrica es solo un método estático (bueno para la captura de "contexto" / "intención"), a veces es un método de instancia de otro agregado (buen lugar para la herencia de "datos"), a veces es un objeto de fábrica explícito (buen lugar para " complejo "lógica de creación".
- Me gusta proporcionar un método GetChanges () explícito en mi agregado que devuelve la lista interna como una matriz. Si mi agregado debe permanecer en la memoria más allá de una ejecución, también agrego un método AcceptChanges () para indicar que la lista interna debe borrarse (normalmente se invoca después de que las cosas se hayan descargado al almacén de eventos). Aquí puede usar un modelo basado en extracción (GetChanges / Changes) o push (think .net event o IObservable). Mucho depende de la semántica transaccional, la tecnología, las necesidades, etc.
- Your eventstream es una lista enlazada. Cada revisión (evento / conjunto de cambios) apunta al anterior (también conocido como el padre). Su secuencia de eventos es una secuencia de eventos / cambios que le sucedió a un agregado específico. El orden solo debe garantizarse dentro del límite agregado.
Casi estoy de acuerdo con yves-reynhout y dennis-traub pero quiero mostrarte cómo lo hago. Quiero despojar a mis agregados de la responsabilidad de aplicar los eventos en sí mismos o de rehidratarse a sí mismos; de lo contrario, hay una gran cantidad de duplicación de código: cada constructor agregado tendrá el mismo aspecto:
UserAggregate:
ctor(eventStream)
foreach (event in eventStream)
this.Apply(event)
OrderAggregate:
ctor(eventStream)
foreach (event in eventStream)
this.Apply(event)
ProfileAggregate:
ctor(eventStream)
foreach (event in eventStream)
this.Apply(event)
Esas responsabilidades podrían dejarse al despachador de comandos. El comando es manejado directamente por el agregado.
Command dispatcher class
dispatchCommand(command) method:
newEvents = ConcurentProofFunctionCaller.executeFunctionUntilSucceeds(tryToDispatchCommand)
EventDispatcher.dispatchEvents(newEvents)
tryToDispatchCommand(command) method:
aggregateClass = CommandSubscriber.getAggregateClassForCommand(command)
aggregate = AggregateRepository.loadAggregate(aggregateClass, command.getAggregateId())
newEvents = CommandApplier.applyCommandOnAggregate(aggregate, command)
AggregateRepository.saveAggregate(command.getAggregateId(), aggregate, newEvents)
ConcurentProofFunctionCaller class
executeFunctionUntilSucceeds(pureFunction) method:
do this n times
try
call result=pureFunction()
return result
catch(ConcurentWriteException)
continue
throw TooManyRetries
AggregateRepository class
loadAggregate(aggregateClass, aggregateId) method:
aggregate = new aggregateClass
priorEvents = EventStore.loadEvents()
this.applyEventsOnAggregate(aggregate, priorEvents)
saveAggregate(aggregateId, aggregate, newEvents)
this.applyEventsOnAggregate(aggregate, newEvents)
EventStore.saveEventsForAggregate(aggregateId, newEvents, priorEvents.version)
SomeAggregate class
handleCommand1(command1) method:
return new SomeEvent or throw someException BUT don''t change state!
applySomeEvent(SomeEvent) method:
changeStateSomehow() and not throw any exception and don''t return anything!
Tenga en cuenta que esto es un pseudo código proyectado desde una aplicación PHP; el código real debería tener las cosas inyectadas y otras responsabilidades refactorizadas en otras clases. La idea es mantener los agregados lo más limpios posible y evitar la duplicación de códigos.
Algunos aspectos importantes sobre los agregados:
- los manejadores de comandos no deberían cambiar el estado; ceden eventos o lanzan excepciones
- el evento aplicado no debe arrojar ninguna excepción y no debe devolver nada; solo cambian el estado interno
here puede encontrar una implementación PHP de código abierto.