How to Create Your own Messenger Transport

How to Create Your own Messenger Transport

Once you have written your transport’s sender and receiver, you can register your transport factory to be able to use it via a DSN in the Symfony application.

Create your Transport Factory

You need to give FrameworkBundle the opportunity to create your transport from a DSN. You will need a transport factory:

  1. use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
  2. use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
  3. use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
  4. use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
  5. use Symfony\Component\Messenger\Transport\TransportInterface;
  6. class YourTransportFactory implements TransportFactoryInterface
  7. {
  8. public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
  9. {
  10. return new YourTransport(/* ... */);
  11. }
  12. public function supports(string $dsn, array $options): bool
  13. {
  14. return 0 === strpos($dsn, 'my-transport://');
  15. }
  16. }

The transport object needs to implement the Symfony\Component\Messenger\Transport\TransportInterface (which combines the Symfony\Component\Messenger\Transport\Sender\SenderInterface and Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface). Here is a simplified example of a database transport:

  1. use Ramsey\Uuid\Uuid;
  2. use Symfony\Component\Messenger\Envelope;
  3. use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
  4. use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
  5. use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
  6. use Symfony\Component\Messenger\Transport\TransportInterface;
  7. class YourTransport implements TransportInterface
  8. {
  9. private $db;
  10. private $serializer;
  11. /**
  12. * @param FakeDatabase $db is used for demo purposes. It is not a real class.
  13. */
  14. public function __construct(FakeDatabase $db, SerializerInterface $serializer = null)
  15. {
  16. $this->db = $db;
  17. $this->serializer = $serializer ?? new PhpSerializer();
  18. }
  19. public function get(): iterable
  20. {
  21. // Get a message from "my_queue"
  22. $row = $this->db->createQuery(
  23. 'SELECT *
  24. FROM my_queue
  25. WHERE (delivered_at IS NULL OR delivered_at < :redeliver_timeout)
  26. AND handled = FALSE'
  27. )
  28. ->setParameter('redeliver_timeout', new DateTimeImmutable('-5minutes'))
  29. ->getOneOrNullResult();
  30. if (null === $row) {
  31. return [];
  32. }
  33. $envelope = $this->serializer->decode([
  34. 'body' => $row['envelope'],
  35. ]);
  36. return [$envelope->with(new TransportMessageIdStamp($row['id']))];
  37. }
  38. public function ack(Envelope $envelope): void
  39. {
  40. $stamp = $envelope->last(TransportMessageIdStamp::class);
  41. if (!$stamp instanceof TransportMessageIdStamp) {
  42. throw new \LogicException('No TransportMessageIdStamp found on the Envelope.');
  43. }
  44. // Mark the message as "handled"
  45. $this->db->createQuery('UPDATE my_queue SET handled = TRUE WHERE id = :id')
  46. ->setParameter('id', $stamp->getId())
  47. ->execute();
  48. }
  49. public function reject(Envelope $envelope): void
  50. {
  51. $stamp = $envelope->last(TransportMessageIdStamp::class);
  52. if (!$stamp instanceof TransportMessageIdStamp) {
  53. throw new \LogicException('No TransportMessageIdStamp found on the Envelope.');
  54. }
  55. // Delete the message from the "my_queue" table
  56. $this->db->createQuery('DELETE FROM my_queue WHERE id = :id')
  57. ->setParameter('id', $stamp->getId())
  58. ->execute();
  59. }
  60. public function send(Envelope $envelope): Envelope
  61. {
  62. $encodedMessage = $this->serializer->encode($envelope);
  63. $uuid = Uuid::uuid4()->toString();
  64. // Add a message to the "my_queue" table
  65. $this->db->createQuery(
  66. 'INSERT INTO my_queue (id, envelope, delivered_at, handled)
  67. VALUES (:id, :envelope, NULL, FALSE)'
  68. )
  69. ->setParameters([
  70. 'id' => $uuid,
  71. 'envelope' => $encodedMessage['body'],
  72. ])
  73. ->execute();
  74. return $envelope->with(new TransportMessageIdStamp($uuid));
  75. }
  76. }

The implementation above is not runnable code but illustrates how a Symfony\Component\Messenger\Transport\TransportInterface could be implemented. For real implementations see Symfony\Component\Messenger\Transport\InMemoryTransport and Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver.

Register your Factory

  • YAML

    1. # config/services.yaml
    2. services:
    3. Your\Transport\YourTransportFactory:
    4. tags: [messenger.transport_factory]
  • XML

    1. <!-- config/services.xml -->
    2. <?xml version="1.0" encoding="UTF-8" ?>
    3. <container xmlns="http://symfony.com/schema/dic/services"
    4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    5. xsi:schemaLocation="http://symfony.com/schema/dic/services
    6. https://symfony.com/schema/dic/services/services-1.0.xsd">
    7. <services>
    8. <service id="Your\Transport\YourTransportFactory">
    9. <tag name="messenger.transport_factory"/>
    10. </service>
    11. </services>
    12. </container>
  • PHP

    1. // config/services.php
    2. use Your\Transport\YourTransportFactory;
    3. $container->register(YourTransportFactory::class)
    4. ->setTags(['messenger.transport_factory']);

Use your Transport

Within the framework.messenger.transports.* configuration, create your named transport using your own DSN:

  • YAML

    1. # config/packages/messenger.yaml
    2. framework:
    3. messenger:
    4. transports:
    5. yours: 'my-transport://...'
  • XML

    1. <!-- config/packages/messenger.xml -->
    2. <?xml version="1.0" encoding="UTF-8" ?>
    3. <container xmlns="http://symfony.com/schema/dic/services"
    4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    5. xmlns:framework="http://symfony.com/schema/dic/symfony"
    6. xsi:schemaLocation="http://symfony.com/schema/dic/services
    7. https://symfony.com/schema/dic/services/services-1.0.xsd
    8. http://symfony.com/schema/dic/symfony
    9. https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    10. <framework:config>
    11. <framework:messenger>
    12. <framework:transport name="yours" dsn="my-transport://..."/>
    13. </framework:messenger>
    14. </framework:config>
    15. </container>
  • PHP

    1. // config/packages/messenger.php
    2. $container->loadFromExtension('framework', [
    3. 'messenger' => [
    4. 'transports' => [
    5. 'yours' => 'my-transport://...',
    6. ],
    7. ],
    8. ]);

In addition of being able to route your messages to the yours sender, this will give you access to the following services:

  1. messenger.sender.yours: the sender;
  2. messenger.receiver.yours: the receiver.

This work, including the code samples, is licensed under a Creative Commons BY-SA 3.0 license.