CQRS

The flow of simple CRUD (Create, Read, Update and Delete) applications can be described using the following steps:

  1. The controllers layer handles HTTP requests and delegates tasks to the services layer.
  2. The services layer is where most of the business logic lives.
  3. Services use repositories / DAOs to change / persist entities.
  4. Entities act as containers for the values, with setters and getters.

In most cases, for small and medium-sized applications, this pattern is sufficient. However, when our requirements become more complex, the CQRS model may be more appropriate and scalable. To facilitate that model, Nest provides a lightweight CQRS module. This chapter describes how to use it.

Installation

First install the required package:

  1. $ npm install --save @nestjs/cqrs

Commands

In this model, each action is called a Command. When a command is dispatched, the application reacts to it. Commands can be dispatched from the services layer, or directly from controllers/gateways. Commands are consumed by Command Handlers.

  1. @@filename(heroes-game.service)
  2. @Injectable()
  3. export class HeroesGameService {
  4. constructor(private commandBus: CommandBus) {}
  5. async killDragon(heroId: string, killDragonDto: KillDragonDto) {
  6. return this.commandBus.execute(
  7. new KillDragonCommand(heroId, killDragonDto.dragonId)
  8. );
  9. }
  10. }
  11. @@switch
  12. @Injectable()
  13. @Dependencies(CommandBus)
  14. export class HeroesGameService {
  15. constructor(commandBus) {
  16. this.commandBus = commandBus;
  17. }
  18. async killDragon(heroId, killDragonDto) {
  19. return this.commandBus.execute(
  20. new KillDragonCommand(heroId, killDragonDto.dragonId)
  21. );
  22. }
  23. }

Here’s a sample service that dispatches KillDragonCommand. Let’s see how the command looks:

  1. @@filename(kill-dragon.command)
  2. export class KillDragonCommand {
  3. constructor(
  4. public readonly heroId: string,
  5. public readonly dragonId: string,
  6. ) {}
  7. }
  8. @@switch
  9. export class KillDragonCommand {
  10. constructor(heroId, dragonId) {
  11. this.heroId = heroId;
  12. this.dragonId = dragonId;
  13. }
  14. }

The CommandBus is a stream of commands. It delegates commands to the equivalent handlers. Each command must have a corresponding Command Handler:

  1. @@filename(kill-dragon.handler)
  2. @CommandHandler(KillDragonCommand)
  3. export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
  4. constructor(private repository: HeroRepository) {}
  5. async execute(command: KillDragonCommand) {
  6. const { heroId, dragonId } = command;
  7. const hero = this.repository.findOneById(+heroId);
  8. hero.killEnemy(dragonId);
  9. await this.repository.persist(hero);
  10. }
  11. }
  12. @@switch
  13. @CommandHandler(KillDragonCommand)
  14. @Dependencies(HeroRepository)
  15. export class KillDragonHandler {
  16. constructor(repository) {
  17. this.repository = repository;
  18. }
  19. async execute(command) {
  20. const { heroId, dragonId } = command;
  21. const hero = this.repository.findOneById(+heroId);
  22. hero.killEnemy(dragonId);
  23. await this.repository.persist(hero);
  24. }
  25. }

With this approach, every application state change is driven by the occurrence of a Command. The logic is encapsulated in handlers. With this approach, we can simply add behavior like logging or persisting commands in the database (e.g., for diagnostics purposes).

Events

Command handlers neatly encapsulate logic. While beneficial, the application structure is still not flexible enough, not reactive. To remedy this, we also introduce events.

  1. @@filename(hero-killed-dragon.event)
  2. export class HeroKilledDragonEvent {
  3. constructor(
  4. public readonly heroId: string,
  5. public readonly dragonId: string,
  6. ) {}
  7. }
  8. @@switch
  9. export class HeroKilledDragonEvent {
  10. constructor(heroId, dragonId) {
  11. this.heroId = heroId;
  12. this.dragonId = dragonId;
  13. }
  14. }

Events are asynchronous. They are dispatched either by models or directly using EventBus. In order to dispatch events, models have to extend the AggregateRoot class.

  1. @@filename(hero.model)
  2. export class Hero extends AggregateRoot {
  3. constructor(private id: string) {
  4. super();
  5. }
  6. killEnemy(enemyId: string) {
  7. // logic
  8. this.apply(new HeroKilledDragonEvent(this.id, enemyId));
  9. }
  10. }
  11. @@switch
  12. export class Hero extends AggregateRoot {
  13. constructor(id) {
  14. super();
  15. this.id = id;
  16. }
  17. killEnemy(enemyId) {
  18. // logic
  19. this.apply(new HeroKilledDragonEvent(this.id, enemyId));
  20. }
  21. }

The apply() method does not dispatch events yet because there’s no relationship between the model and the EventPublisher class. How do we associate the model and the publisher? By using a publisher mergeObjectContext() method inside our command handler.

  1. @@filename(kill-dragon.handler)
  2. @CommandHandler(KillDragonCommand)
  3. export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
  4. constructor(
  5. private repository: HeroRepository,
  6. private publisher: EventPublisher,
  7. ) {}
  8. async execute(command: KillDragonCommand) {
  9. const { heroId, dragonId } = command;
  10. const hero = this.publisher.mergeObjectContext(
  11. await this.repository.findOneById(+heroId),
  12. );
  13. hero.killEnemy(dragonId);
  14. hero.commit();
  15. }
  16. }
  17. @@switch
  18. @CommandHandler(KillDragonCommand)
  19. @Dependencies(HeroRepository, EventPublisher)
  20. export class KillDragonHandler {
  21. constructor(repository, publisher) {
  22. this.repository = repository;
  23. this.publisher = publisher;
  24. }
  25. async execute(command) {
  26. const { heroId, dragonId } = command;
  27. const hero = this.publisher.mergeObjectContext(
  28. await this.repository.findOneById(+heroId),
  29. );
  30. hero.killEnemy(dragonId);
  31. hero.commit();
  32. }
  33. }

Now everything works as expected. Notice that we need to commit() events since they’re not being dispatched immediately. Obviously, an object doesn’t have to exist up front. We can easily merge type context as well:

  1. const HeroModel = this.publisher.mergeClassContext(Hero);
  2. new HeroModel('id');

Now the model has the ability to publish events. Additionally, we can emit events manually using EventBus:

  1. this.eventBus.publish(new HeroKilledDragonEvent());

info Hint The EventBus is an injectable class.

Each event can have multiple Event Handlers.

  1. @@filename(hero-killed-dragon.handler)
  2. @EventsHandler(HeroKilledDragonEvent)
  3. export class HeroKilledDragonHandler implements IEventHandler<HeroKilledDragonEvent> {
  4. constructor(private repository: HeroRepository) {}
  5. handle(event: HeroKilledDragonEvent) {
  6. // logic
  7. }
  8. }

Now we can move the write logic into the event handlers.

Sagas

This type of Event-Driven Architecture improves application reactiveness and scalability. Now, when we have events, we can simply react to them in various ways. Sagas are the final building block from an architectural point of view.

Sagas are an extremely powerful feature. A single saga may listen for 1..* events. Using the RxJS library, it can combine, merge, filter or apply other RxJS operators on the event stream. Each saga returns an Observable which contains a command. This command is dispatched asynchronously.

  1. @@filename(heroes-game.saga)
  2. @Injectable()
  3. export class HeroesGameSagas {
  4. @Saga()
  5. dragonKilled = (events$: Observable<any>): Observable<ICommand> => {
  6. return events$.pipe(
  7. ofType(HeroKilledDragonEvent),
  8. map((event) => new DropAncientItemCommand(event.heroId, fakeItemID)),
  9. );
  10. }
  11. }
  12. @@switch
  13. @Injectable()
  14. export class HeroesGameSagas {
  15. @Saga()
  16. dragonKilled = (events$) => {
  17. return events$.pipe(
  18. ofType(HeroKilledDragonEvent),
  19. map((event) => new DropAncientItemCommand(event.heroId, fakeItemID)),
  20. );
  21. }
  22. }

info Hint The ofType operator is exported from the @nestjs/cqrs package.

We declared a rule - when any hero kills the dragon, the ancient item should be dropped. With this in place, DropAncientItemCommand will be dispatched and processed by the appropriate handler.

Queries

The CqrsModule can also be used for handling queries. The QueryBus follows the same pattern as the CommandsBus. Query handlers should implement the IQueryHandler interface and be marked with the @QueryHandler() decorator.

Setup

Finally, let’s look at how to set up the whole CQRS mechanism.

  1. @@filename(heroes-game.module)
  2. export const CommandHandlers = [KillDragonHandler, DropAncientItemHandler];
  3. export const EventHandlers = [HeroKilledDragonHandler, HeroFoundItemHandler];
  4. @Module({
  5. imports: [CqrsModule],
  6. controllers: [HeroesGameController],
  7. providers: [
  8. HeroesGameService,
  9. HeroesGameSagas,
  10. ...CommandHandlers,
  11. ...EventHandlers,
  12. HeroRepository,
  13. ]
  14. })
  15. export class HeroesGameModule {}

Summary

CommandBus, QueryBus and EventBus are Observables. This means that you can easily subscribe to the whole stream and enrich your application with Event Sourcing.

Example

A working example is available here.