Multiple Buses

Multiple Buses

A common architecture when building applications is to separate commands from queries. Commands are actions that do something and queries fetch data. This is called CQRS (Command Query Responsibility Segregation). See Martin Fowler’s article about CQRS to learn more. This architecture could be used together with the Messenger component by defining multiple buses.

A command bus is a little different from a query bus. For example, command buses usually don’t provide any results and query buses are rarely asynchronous. You can configure these buses and their rules by using middleware.

It might also be a good idea to separate actions from reactions by introducing an event bus. The event bus could have zero or more subscribers.

  • YAML

    1. framework:
    2. messenger:
    3. # The bus that is going to be injected when injecting MessageBusInterface
    4. default_bus: command.bus
    5. buses:
    6. command.bus:
    7. middleware:
    8. - validation
    9. - doctrine_transaction
    10. query.bus:
    11. middleware:
    12. - validation
    13. event.bus:
    14. # the 'allow_no_handlers' middleware allows to have no handler
    15. # configured for this bus without throwing an exception
    16. default_middleware: allow_no_handlers
    17. middleware:
    18. - validation
  • XML

    1. <!-- config/packages/messenger.xml -->
    2. <?xml version="1.0" encoding="UTF-8" ?>
    3. <container xmlns="http://symfony.com/schema/dic/services"
    4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    5. xmlns:framework="http://symfony.com/schema/dic/symfony"
    6. xsi:schemaLocation="http://symfony.com/schema/dic/services
    7. https://symfony.com/schema/dic/services/services-1.0.xsd
    8. http://symfony.com/schema/dic/symfony
    9. https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    10. <framework:config>
    11. <!-- The bus that is going to be injected when injecting MessageBusInterface -->
    12. <framework:messenger default-bus="command.bus">
    13. <framework:bus name="command.bus">
    14. <framework:middleware id="validation"/>
    15. <framework:middleware id="doctrine_transaction"/>
    16. </framework:bus>
    17. <framework:bus name="query.bus">
    18. <framework:middleware id="validation"/>
    19. </framework:bus>
    20. <!-- the 'allow_no_handlers' middleware allows to have no handler
    21. configured for this bus without throwing an exception -->
    22. <framework:bus name="event.bus" default-middleware="allow_no_handlers">
    23. <framework:middleware id="validation"/>
    24. </framework:bus>
    25. </framework:messenger>
    26. </framework:config>
    27. </container>
  • PHP

    1. // config/packages/messenger.php
    2. $container->loadFromExtension('framework', [
    3. 'messenger' => [
    4. // The bus that is going to be injected when injecting MessageBusInterface
    5. 'default_bus' => 'command.bus',
    6. 'buses' => [
    7. 'command.bus' => [
    8. 'middleware' => [
    9. 'validation',
    10. 'doctrine_transaction',
    11. ],
    12. ],
    13. 'query.bus' => [
    14. 'middleware' => [
    15. 'validation',
    16. ],
    17. ],
    18. 'event.bus' => [
    19. // the 'allow_no_handlers' middleware allows to have no handler
    20. // configured for this bus without throwing an exception
    21. 'default_middleware' => 'allow_no_handlers',
    22. 'middleware' => [
    23. 'validation',
    24. ],
    25. ],
    26. ],
    27. ],
    28. ]);

This will create three new services:

  • command.bus: autowireable with the Symfony\Component\Messenger\MessageBusInterface type-hint (because this is the default_bus);
  • query.bus: autowireable with MessageBusInterface $queryBus;
  • event.bus: autowireable with MessageBusInterface $eventBus.

Restrict Handlers per Bus

By default, each handler will be available to handle messages on all of your buses. To prevent dispatching a message to the wrong bus without an error, you can restrict each handler to a specific bus using the messenger.message_handler tag:

  • YAML

    1. # config/services.yaml
    2. services:
    3. App\MessageHandler\SomeCommandHandler:
    4. tags: [{ name: messenger.message_handler, bus: command.bus }]
    5. # prevent handlers from being registered twice (or you can remove
    6. # the MessageHandlerInterface that autoconfigure uses to find handlers)
    7. autoconfigure: false
  • XML

    1. <!-- config/services.xml -->
    2. <?xml version="1.0" encoding="UTF-8" ?>
    3. <container xmlns="http://symfony.com/schema/dic/services"
    4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    5. xsi:schemaLocation="http://symfony.com/schema/dic/services
    6. https://symfony.com/schema/dic/services/services-1.0.xsd">
    7. <services>
    8. <service id="App\MessageHandler\SomeCommandHandler">
    9. <tag name="messenger.message_handler" bus="command.bus"/>
    10. </service>
    11. </services>
    12. </container>
  • PHP

    1. // config/services.php
    2. $container->services()
    3. ->set(App\MessageHandler\SomeCommandHandler::class)
    4. ->tag('messenger.message_handler', ['bus' => 'command.bus']);

This way, the App\MessageHandler\SomeCommandHandler handler will only be known by the command.bus bus.

You can also automatically add this tag to a number of classes by using the _instanceof service configuration. Using this, you can determine the message bus based on an implemented interface:

  • YAML

    1. # config/services.yaml
    2. services:
    3. # ...
    4. _instanceof:
    5. # all services implementing the CommandHandlerInterface
    6. # will be registered on the command.bus bus
    7. App\MessageHandler\CommandHandlerInterface:
    8. tags:
    9. - { name: messenger.message_handler, bus: command.bus }
    10. # while those implementing QueryHandlerInterface will be
    11. # registered on the query.bus bus
    12. App\MessageHandler\QueryHandlerInterface:
    13. tags:
    14. - { name: messenger.message_handler, bus: query.bus }
  • XML

    1. <!-- config/services.xml -->
    2. <?xml version="1.0" encoding="UTF-8" ?>
    3. <container xmlns="http://symfony.com/schema/dic/services"
    4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    5. xsi:schemaLocation="http://symfony.com/schema/dic/services
    6. https://symfony.com/schema/dic/services/services-1.0.xsd">
    7. <services>
    8. <!-- ... -->
    9. <!-- all services implementing the CommandHandlerInterface
    10. will be registered on the command.bus bus -->
    11. <instanceof id="App\MessageHandler\CommandHandlerInterface">
    12. <tag name="messenger.message_handler" bus="command.bus"/>
    13. </instanceof>
    14. <!-- while those implementing QueryHandlerInterface will be
    15. registered on the query.bus bus -->
    16. <instanceof id="App\MessageHandler\QueryHandlerInterface">
    17. <tag name="messenger.message_handler" bus="query.bus"/>
    18. </instanceof>
    19. </services>
    20. </container>
  • PHP

    1. // config/services.php
    2. namespace Symfony\Component\DependencyInjection\Loader\Configurator;
    3. use App\MessageHandler\CommandHandlerInterface;
    4. use App\MessageHandler\QueryHandlerInterface;
    5. return function(ContainerConfigurator $configurator) {
    6. $services = $configurator->services();
    7. // ...
    8. // all services implementing the CommandHandlerInterface
    9. // will be registered on the command.bus bus
    10. $services->instanceof(CommandHandlerInterface::class)
    11. ->tag('messenger.message_handler', ['bus' => 'command.bus']);
    12. // while those implementing QueryHandlerInterface will be
    13. // registered on the query.bus bus
    14. $services->instanceof(QueryHandlerInterface::class)
    15. ->tag('messenger.message_handler', ['bus' => 'query.bus']);
    16. };

Debugging the Buses

The debug:messenger command lists available messages & handlers per bus. You can also restrict the list to a specific bus by providing its name as argument.

  1. $ php bin/console debug:messenger
  2. Messenger
  3. =========
  4. command.bus
  5. -----------
  6. The following messages can be dispatched:
  7. ---------------------------------------------------------------------------------------
  8. App\Message\DummyCommand
  9. handled by App\MessageHandler\DummyCommandHandler
  10. App\Message\MultipleBusesMessage
  11. handled by App\MessageHandler\MultipleBusesMessageHandler
  12. ---------------------------------------------------------------------------------------
  13. query.bus
  14. ---------
  15. The following messages can be dispatched:
  16. ---------------------------------------------------------------------------------------
  17. App\Message\DummyQuery
  18. handled by App\MessageHandler\DummyQueryHandler
  19. App\Message\MultipleBusesMessage
  20. handled by App\MessageHandler\MultipleBusesMessageHandler
  21. ---------------------------------------------------------------------------------------

This work, including the code samples, is licensed under a Creative Commons BY-SA 3.0 license.