- Messenger: Sync & Queued Message Handling
- Messenger: Sync & Queued Message Handling
Messenger: Sync & Queued Message Handling
- Messenger: Sync & Queued Message Handling
Messenger: Sync & Queued Message Handling
Messenger provides a message bus with the ability to send messages and then handle them immediately in your application or send them through transports (e.g. queues) to be handled later. To learn more deeply about it, read the Messenger component docs.
Installation
In applications using Symfony Flex, run this command to install messenger:
$ composer require symfony/messenger
Creating a Message & Handler
Messenger centers around two different classes that you’ll create: (1) a message class that holds data and (2) a handler(s) class that will be called when that message is dispatched. The handler class will read the message class and perform some task.
There are no specific requirements for a message class, except that it can be serialized:
// src/Message/SmsNotification.php
namespace App\Message;
class SmsNotification
{
private $content;
public function __construct(string $content)
{
$this->content = $content;
}
public function getContent(): string
{
return $this->content;
}
}
A message handler is a PHP callable, the recommended way to create it is to create a class that implements Symfony\Component\Messenger\Handler\MessageHandlerInterface
and has an `__invoke() method that’s type-hinted with the message class (or a message interface):
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
class SmsNotificationHandler implements MessageHandlerInterface
{
public function __invoke(SmsNotification $message)
{
// ... do some work - like sending an SMS message!
}
}
Thanks to autoconfiguration and the SmsNotification
type-hint, Symfony knows that this handler should be called when an SmsNotification
message is dispatched. Most of the time, this is all you need to do. But you can also manually configure message handlers. To see all the configured handlers, run:
$ php bin/console debug:messenger
Dispatching the Message
You’re ready! To dispatch the message (and call the handler), inject the messenger.default_bus
service (via the MessageBusInterface
), like in a controller:
// src/Controller/DefaultController.php
namespace App\Controller;
use App\Message\SmsNotification;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\Messenger\MessageBusInterface;
class DefaultController extends AbstractController
{
public function index(MessageBusInterface $bus)
{
// will cause the SmsNotificationHandler to be called
$bus->dispatch(new SmsNotification('Look! I created a message!'));
// or use the shortcut
$this->dispatchMessage(new SmsNotification('Look! I created a message!'));
// ...
}
}
Transports: Async/Queued Messages
By default, messages are handled as soon as they are dispatched. If you want to handle a message asynchronously, you can configure a transport. A transport is capable of sending messages (e.g. to a queueing system) and then receiving them via a worker. Messenger supports multiple transports.
Note
If you want to use a transport that’s not supported, check out the Enqueue’s transport, which supports things like Kafka, Amazon SQS and Google Pub/Sub.
A transport is registered using a “DSN”. Thanks to Messenger’s Flex recipe, your .env
file already has a few examples.
# MESSENGER_TRANSPORT_DSN=amqp://guest:[email protected]:5672/%2f/messages
# MESSENGER_TRANSPORT_DSN=doctrine://default
# MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
Uncomment whichever transport you want (or set it in .env.local
). See Transport Configuration for more details.
Next, in config/packages/messenger.yaml
, let’s define a transport called async
that uses this configuration:
YAML
# config/packages/messenger.yaml
framework:
messenger:
transports:
async: "%env(MESSENGER_TRANSPORT_DSN)%"
# or expanded to configure more options
#async:
# dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
# options: []
XML
<!-- config/packages/messenger.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services
https://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
<framework:config>
<framework:messenger>
<framework:transport name="async">%env(MESSENGER_TRANSPORT_DSN)%</framework:transport>
<!-- or expanded to configure more options -->
<framework:transport name="async"
dsn="%env(MESSENGER_TRANSPORT_DSN)%"
>
<option key="...">...</option>
</framework:transport>
</framework:messenger>
</framework:config>
</container>
PHP
// config/packages/messenger.php
$container->loadFromExtension('framework', [
'messenger' => [
'transports' => [
'async' => '%env(MESSENGER_TRANSPORT_DSN)%',
// or expanded to configure more options
'async' => [
'dsn' => '%env(MESSENGER_TRANSPORT_DSN)%',
'options' => [],
],
],
],
]);
Routing Messages to a Transport
Now that you have a transport configured, instead of handling a message immediately, you can configure them to be sent to a transport:
YAML
# config/packages/messenger.yaml
framework:
messenger:
transports:
async: "%env(MESSENGER_TRANSPORT_DSN)%"
routing:
# async is whatever name you gave your transport above
'App\Message\SmsNotification': async
XML
<!-- config/packages/messenger.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services
https://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
<framework:config>
<framework:messenger>
<framework:routing message-class="App\Message\SmsNotification">
<!-- async is whatever name you gave your transport above -->
<framework:sender service="async"/>
</framework:routing>
</framework:messenger>
</framework:config>
</container>
PHP
// config/packages/messenger.php
$container->loadFromExtension('framework', [
'messenger' => [
'routing' => [
// async is whatever name you gave your transport above
'App\Message\SmsNotification' => 'async',
],
],
]);
Thanks to this, the App\Message\SmsNotification
will be sent to the async
transport and its handler(s) will not be called immediately. Any messages not matched under routing
will still be handled immediately.
You can also route classes by their parent class or interface. Or send messages to multiple transports:
YAML
# config/packages/messenger.yaml
framework:
messenger:
routing:
# route all messages that extend this example base class or interface
'App\Message\AbstractAsyncMessage': async
'App\Message\AsyncMessageInterface': async
'My\Message\ToBeSentToTwoSenders': [async, audit]
XML
<!-- config/packages/messenger.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services
https://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
<framework:config>
<framework:messenger>
<!-- route all messages that extend this example base class or interface -->
<framework:routing message-class="App\Message\AbstractAsyncMessage">
<framework:sender service="async"/>
</framework:routing>
<framework:routing message-class="App\Message\AsyncMessageInterface">
<framework:sender service="async"/>
</framework:routing>
<framework:routing message-class="My\Message\ToBeSentToTwoSenders">
<framework:sender service="async"/>
<framework:sender service="audit"/>
</framework:routing>
</framework:messenger>
</framework:config>
</container>
PHP
// config/packages/messenger.php
$container->loadFromExtension('framework', [
'messenger' => [
'routing' => [
// route all messages that extend this example base class or interface
'App\Message\AbstractAsyncMessage' => 'async',
'App\Message\AsyncMessageInterface' => 'async',
'My\Message\ToBeSentToTwoSenders' => ['async', 'audit'],
],
],
]);
Note
If you configure routing for both a child and parent class, both rules are used. E.g. if you have an SmsNotification
object that extends from Notification
, both the routing for Notification
and SmsNotification
will be used.
Doctrine Entities in Messages
If you need to pass a Doctrine entity in a message, it’s better to pass the entity’s primary key (or whatever relevant information the handler actually needs, like email
, etc) instead of the object:
// src/Message/NewUserWelcomeEmail.php
namespace App\Message;
class NewUserWelcomeEmail
{
private $userId;
public function __construct(int $userId)
{
$this->userId = $userId;
}
public function getUserId(): int
{
return $this->userId;
}
}
Then, in your handler, you can query for a fresh object:
// src/MessageHandler/NewUserWelcomeEmailHandler.php
namespace App\MessageHandler;
use App\Message\NewUserWelcomeEmail;
use App\Repository\UserRepository;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
class NewUserWelcomeEmailHandler implements MessageHandlerInterface
{
private $userRepository;
public function __construct(UserRepository $userRepository)
{
$this->userRepository = $userRepository;
}
public function __invoke(NewUserWelcomeEmail $welcomeEmail)
{
$user = $this->userRepository->find($welcomeEmail->getUserId());
// ... send an email!
}
}
This guarantees the entity contains fresh data.
Handling Messages Synchronously
If a message doesn’t match any routing rules, it won’t be sent to any transport and will be handled immediately. In some cases (like when binding handlers to different transports), it’s easier or more flexible to handle this explicitly: by creating a sync
transport and “sending” messages there to be handled immediately:
YAML
# config/packages/messenger.yaml
framework:
messenger:
transports:
# ... other transports
sync: 'sync://'
routing:
App\Message\SmsNotification: sync
XML
<!-- config/packages/messenger.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services
https://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
<framework:config>
<framework:messenger>
<!-- ... other transports -->
<framework:transport name="sync" dsn="sync://"/>
<framework:routing message-class="App\Message\SmsNotification">
<framework:sender service="sync"/>
</framework:routing>
</framework:messenger>
</framework:config>
</container>
PHP
// config/packages/messenger.php
$container->loadFromExtension('framework', [
'messenger' => [
'transports' => [
// ... other transports
'sync' => 'sync://',
],
'routing' => [
'App\Message\SmsNotification' => 'sync',
],
],
]);
Creating your Own Transport
You can also create your own transport if you need to send or receive messages from something that is not supported. See How to Create Your own Messenger Transport.
Consuming Messages (Running the Worker)
Once your messages have been routed, in most cases, you’ll need to “consume” them. You can do this with the messenger:consume
command:
$ php bin/console messenger:consume async
# use -vv to see details about what's happening
$ php bin/console messenger:consume async -vv
New in version 4.3: The messenger:consume
command was renamed in Symfony 4.3 (previously it was called messenger:consume-messages
).
The first argument is the receiver’s name (or service id if you routed to a custom service). By default, the command will run forever: looking for new messages on your transport and handling them. This command is called your “worker”.
Deploying to Production
On production, there are a few important things to think about:
Use Supervisor to keep your worker(s) running
You’ll want one or more “workers” running at all times. To do that, use a process control system like Supervisor.
Don’t Let Workers Run Forever
Some services (like Doctrine’s EntityManager
) will consume more memory over time. So, instead of allowing your worker to run forever, use a flag like messenger:consume --limit=10
to tell your worker to only handle 10 messages before exiting (then Supervisor will create a new process). There are also other options like --memory-limit=128M
and --time-limit=3600
.
Restart Workers on Deploy
Each time you deploy, you’ll need to restart all your worker processes so that they see the newly deployed code. To do this, run messenger:stop-workers
on deploy. This will signal to each worker that it should finish the message it’s currently handling and shut down gracefully. Then, Supervisor will create new worker processes. The command uses the app cache internally - so make sure this is configured to use an adapter you like.
Use the Same Cache Between Deploys
If your deploy strategy involves the creation of new target directories, you should set a value for the cache.prefix.seed configuration option in order to use the same cache namespace between deployments. Otherwise, the cache.app
pool will use the value of the kernel.project_dir
parameter as base for the namespace, which will lead to different namespaces each time a new deployment is made.
Prioritized Transports
Sometimes certain types of messages should have a higher priority and be handled before others. To make this possible, you can create multiple transports and route different messages to them. For example:
YAML
# config/packages/messenger.yaml
framework:
messenger:
transports:
async_priority_high:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
# queue_name is specific to the doctrine transport
queue_name: high
# for AMQP send to a separate exchange then queue
#exchange:
# name: high
#queues:
# messages_high: ~
# or redis try "group"
async_priority_low:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
queue_name: low
routing:
'App\Message\SmsNotification': async_priority_low
'App\Message\NewUserWelcomeEmail': async_priority_high
XML
<!-- config/packages/messenger.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services
https://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
<framework:config>
<framework:messenger>
<framework:transport name="async_priority_high" dsn="%env(MESSENGER_TRANSPORT_DSN)%">
<framework:options>
<framework:queue>
<framework:name>Queue</framework:name>
</framework:queue>
</framework:options>
</framework:transport>
<framework:transport name="async_priority_low" dsn="%env(MESSENGER_TRANSPORT_DSN)%">
<option key="queue_name">low</option>
</framework:transport>
<framework:routing message-class="App\Message\SmsNotification">
<framework:sender service="async_priority_low"/>
</framework:routing>
<framework:routing message-class="App\Message\NewUserWelcomeEmail">
<framework:sender service="async_priority_high"/>
</framework:routing>
</framework:messenger>
</framework:config>
</container>
PHP
// config/packages/messenger.php
$container->loadFromExtension('framework', [
'messenger' => [
'transports' => [
'async_priority_high' => [
'dsn' => '%env(MESSENGER_TRANSPORT_DSN)%',
'options' => [
'queue_name' => 'high',
],
],
'async_priority_low' => [
'dsn' => '%env(MESSENGER_TRANSPORT_DSN)%',
'options' => [
'queue_name' => 'low',
],
],
],
'routing' => [
'App\Message\SmsNotification' => 'async_priority_low',
'App\Message\NewUserWelcomeEmail' => 'async_priority_high',
],
],
]);
You can then run individual workers for each transport or instruct one worker to handle messages in a priority order:
$ php bin/console messenger:consume async_priority_high async_priority_low
The worker will always first look for messages waiting on async_priority_high
. If there are none, then it will consume messages from async_priority_low
.
Supervisor Configuration
Supervisor is a great tool to guarantee that your worker process(es) is always running (even if it closes due to failure, hitting a message limit or thanks to messenger:stop-workers
). You can install it on Ubuntu, for example, via:
$ sudo apt-get install supervisor
Supervisor configuration files typically live in a /etc/supervisor/conf.d
directory. For example, you can create a new messenger-worker.conf
file there to make sure that 2 instances of messenger:consume
are running at all times:
;/etc/supervisor/conf.d/messenger-worker.conf
[program:messenger-consume]
command=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
user=ubuntu
numprocs=2
startsecs=0
autostart=true
autorestart=true
process_name=%(program_name)s_%(process_num)02d
Change the async
argument to use the name of your transport (or transports) and user
to the Unix user on your server. Next, tell Supervisor to read your config and start your workers:
$ sudo supervisorctl reread
$ sudo supervisorctl update
$ sudo supervisorctl start messenger-consume:*
See the Supervisor docs for more details.
Graceful Shutdown
If you install the PCNTL PHP extension in your project, workers will handle the SIGTERM
POSIX signal to finish processing their current message before exiting.
In some cases the SIGTERM
signal is sent by Supervisor itself (e.g. stopping a Docker container having Supervisor as its entrypoint). In these cases you need to add a stopwaitsecs
key to the program configuration (with a value of the desired grace period in seconds) in order to perform a graceful shutdown:
[program:x]
stopwaitsecs=20
Retries & Failures
If an exception is thrown while consuming a message from a transport it will automatically be re-sent to the transport to be tried again. By default, a message will be retried 3 times before being discarded or sent to the failure transport. Each retry will also be delayed, in case the failure was due to a temporary issue. All of this is configurable for each transport:
YAML
# config/packages/messenger.yaml
framework:
messenger:
transports:
async_priority_high:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
# default configuration
retry_strategy:
max_retries: 3
# milliseconds delay
delay: 1000
# causes the delay to be higher before each retry
# e.g. 1 second delay, 2 seconds, 4 seconds
multiplier: 2
max_delay: 0
# override all of this with a service that
# implements Symfony\Component\Messenger\Retry\RetryStrategyInterface
# service: null
XML
<!-- config/packages/messenger.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services
https://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
<framework:config>
<framework:messenger>
<framework:transport name="async_priority_high" dsn="%env(MESSENGER_TRANSPORT_DSN)%?queue_name=high_priority">
<framework:retry-strategy max-retries="3" delay="1000" multiplier="2" max-delay="0"/>
</framework:transport>
</framework:messenger>
</framework:config>
</container>
PHP
// config/packages/messenger.php
$container->loadFromExtension('framework', [
'messenger' => [
'transports' => [
'async_priority_high' => [
'dsn' => '%env(MESSENGER_TRANSPORT_DSN)%',
// default configuration
'retry_strategy' => [
'max_retries' => 3,
// milliseconds delay
'delay' => 1000,
// causes the delay to be higher before each retry
// e.g. 1 second delay, 2 seconds, 4 seconds
'multiplier' => 2,
'max_delay' => 0,
// override all of this with a service that
// implements Symfony\Component\Messenger\Retry\RetryStrategyInterface
// 'service' => null,
],
],
],
],
]);
Avoiding Retrying
Sometimes handling a message might fail in a way that you know is permanent and should not be retried. If you throw Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException
, the message will not be retried.
Saving & Retrying Failed Messages
If a message fails it is retried multiple times (max_retries
) and then will be discarded. To avoid this happening, you can instead configure a failure_transport
:
YAML
# config/packages/messenger.yaml
framework:
messenger:
# after retrying, messages will be sent to the "failed" transport
failure_transport: failed
transports:
# ... other transports
failed: 'doctrine://default?queue_name=failed'
XML
<!-- config/packages/messenger.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services
https://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
<framework:config>
<!-- after retrying, messages will be sent to the "failed" transport -->
<framework:messenger failure-transport="failed">
<!-- ... other transports -->
<framework:transport name="failed" dsn="doctrine://default?queue_name=failed"/>
</framework:messenger>
</framework:config>
</container>
PHP
// config/packages/messenger.php
$container->loadFromExtension('framework', [
'messenger' => [
// after retrying, messages will be sent to the "failed" transport
'failure_transport' => 'failed',
'transports' => [
// ... other transports
'failed' => [
'dsn' => 'doctrine://default?queue_name=failed',
],
],
],
]);
In this example, if handling a message fails 3 times (default max_retries
), it will then be sent to the failed
transport. While you can use messenger:consume failed
to consume this like a normal transport, you’ll usually want to manually view the messages in the failure transport and choose to retry them:
# see all messages in the failure transport
$ php bin/console messenger:failed:show
# see details about a specific failure
$ php bin/console messenger:failed:show 20 -vv
# view and retry messages one-by-one
$ php bin/console messenger:failed:retry -vv
# retry specific messages
$ php bin/console messenger:failed:retry 20 30 --force
# remove a message without retrying it
$ php bin/console messenger:failed:remove 20
If the message fails again, it will be re-sent back to the failure transport due to the normal retry rules. Once the max retry has been hit, the message will be discarded permanently.
Transport Configuration
Messenger supports a number of different transport types, each with their own options.
AMQP Transport
The amqp
transport configuration looks like this:
# .env
MESSENGER_TRANSPORT_DSN=amqp://guest:[email protected]:5672/%2f/messages
To use Symfony’s built-in AMQP transport, you need the AMQP PHP extension.
Note
By default, the transport will automatically create any exchanges, queues and binding keys that are needed. That can be disabled, but some functionality may not work correctly (like delayed queues).
The transport has a number of other options, including ways to configure the exchange, queues binding keys and more. See the documentation on Symfony\Component\Messenger\Transport\AmqpExt\Connection
.
You can also configure AMQP-specific settings on your message by adding Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp
to your Envelope:
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp;
// ...
$attributes = [];
$bus->dispatch(new SmsNotification(), [
new AmqpStamp('custom-routing-key', AMQP_NOPARAM, $attributes),
]);
Caution
The consumers do not show up in an admin panel as this transport does not rely on \AmqpQueue::consume() which is blocking. Having a blocking receiver makes the
—time-limit/—memory-limitoptions of the
messenger:consumecommand as well as the
messenger:stop-workers` command inefficient, as they all rely on the fact that the receiver returns immediately no matter if it finds a message or not. The consume worker is responsible for iterating until it receives a message to handle and/or until one of the stop conditions is reached. Thus, the worker’s stop logic cannot be reached if it is stuck in a blocking call.
Doctrine Transport
New in version 4.3: The Doctrine transport was introduced in Symfony 4.3.
The Doctrine transport can be used to store messages in a database table.
# .env
MESSENGER_TRANSPORT_DSN=doctrine://default
The format is doctrine://<connection_name>
, in case you have multiple connections and want to use one other than the “default”. The transport will automatically create a table named messenger_messages
(this is configurable) when the transport is first used. You can disable that with the auto_setup
option and set the table up manually by calling the messenger:setup-transports
command.
Tip
To avoid tools like Doctrine Migrations from trying to remove this table because it’s not part of your normal schema, you can set the schema_filter
option:
YAML
# config/packages/doctrine.yaml
doctrine:
dbal:
schema_filter: '~^(?!messenger_messages)~'
XML
# config/packages/doctrine.xml
<doctrine:dbal schema-filter="~^(?!messenger_messages)~"/>
PHP
# config/packages/doctrine.php
$container->loadFromExtension('doctrine', [
'dbal' => [
'schema_filter' => '~^(?!messenger_messages)~',
// ...
],
// ...
]);
The transport has a number of options:
YAML
# config/packages/messenger.yaml
framework:
messenger:
transports:
async_priority_high: "%env(MESSENGER_TRANSPORT_DSN)%?queue_name=high_priority"
async_normal:
dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
options:
queue_name: normal_priority
XML
<!-- config/packages/messenger.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services
https://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
<framework:config>
<framework:messenger>
<framework:transport name="async_priority_high" dsn="%env(MESSENGER_TRANSPORT_DSN)%?queue_name=high_priority"/>
<framework:transport name="async_priority_low" dsn="%env(MESSENGER_TRANSPORT_DSN)%">
<framework:options>
<framework:queue>
<framework:name>normal_priority</framework:name>
</framework:queue>
</framework:options>
</framework:transport>
</framework:messenger>
</framework:config>
</container>
PHP
// config/packages/messenger.php
$container->loadFromExtension('framework', [
'messenger' => [
'transports' => [
'async_priority_high' => '%env(MESSENGER_TRANSPORT_DSN)%?queue_name=high_priority',
'async_priority_low' => [
'dsn' => '%env(MESSENGER_TRANSPORT_DSN)%',
'options' => [
'queue_name' => 'normal_priority',
],
],
],
],
]);
Options defined under options
take precedence over ones defined in the DSN.
Option | Description | Default |
---|---|---|
table_name | Name of the table | messenger_messages |
queue_name | Name of the queue (a column in the table, to use one table for multiple transports) | default |
redeliver_timeout | Timeout before retrying a message that’s in the queue but in the “handling” state (if a worker stopped for some reason, this will occur, eventually you should retry the message) - in seconds. | 3600 |
auto_setup | Whether the table should be created automatically during send / get. | true |
Caution
The datetime property of the messages stored in the database uses the timezone of the current system. This may cause issues if multiple machines with different timezone configuration use the same storage.
Redis Transport
New in version 4.3: The Redis transport was introduced in Symfony 4.3.
The Redis transport uses streams to queue messages.
# .env
MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
# Full DSN Example
MESSENGER_TRANSPORT_DSN=redis://[email protected]:6379/messages/symfony/consumer?auto_setup=true&serializer=1&stream_max_entries=0&dbindex=0
New in version 4.4: The dbindex
query parameter in Redis DSN was introduced in Symfony 4.4.
To use the Redis transport, you will need the Redis PHP extension (>=4.3) and a running Redis server (^5.0).
A number of options can be configured via the DSN or via the options
key under the transport in messenger.yaml
:
Option | Description | Default |
---|---|---|
stream | The Redis stream name | messages |
group | The Redis consumer group name | symfony |
consumer | Consumer name used in Redis | consumer |
auto_setup | Create the Redis group automatically? | true |
auth | The Redis password | |
serializer | How to serialize the final payload in Redis (the Redis::OPT_SERIALIZER option) | Redis::SERIALIZER_PHP |
stream_max_entries | The maximum number of entries which the stream will be trimmed to. Set it to a large enough number to avoid losing pending messages | 0 (which means “no trimming”) |
New in version 4.4: The stream_max_entries
option was introduced in Symfony 4.4.
In Memory Transport
New in version 4.3: The in-memory
transport was introduced in Symfony 4.3.
The in-memory
transport does not actually deliver messages. Instead, it holds them in memory during the request, which can be useful for testing. For example, if you have an async_priority_normal
transport, you could override it in the test
environment to use this transport:
YAML
# config/packages/test/messenger.yaml
framework:
messenger:
transports:
async_priority_normal: 'in-memory://'
XML
<!-- config/packages/test/messenger.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services
https://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
<framework:config>
<framework:messenger>
<framework:transport name="async_priority_normal" dsn="in-memory://"/>
</framework:messenger>
</framework:config>
</container>
PHP
// config/packages/test/messenger.php
$container->loadFromExtension('framework', [
'messenger' => [
'transports' => [
'async_priority_normal' => [
'dsn' => 'in-memory://',
],
],
],
]);
Then, while testing, messages will not be delivered to the real transport. Even better, in a test, you can check that exactly one message was sent during a request:
// tests/DefaultControllerTest.php
namespace App\Tests;
use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
use Symfony\Component\Messenger\Transport\InMemoryTransport;
class DefaultControllerTest extends WebTestCase
{
public function testSomething()
{
$client = static::createClient();
// ...
$this->assertSame(200, $client->getResponse()->getStatusCode());
/* @var InMemoryTransport $transport */
$transport = self::$container->get('messenger.transport.async_priority_normal');
$this->assertCount(1, $transport->getSent());
}
}
Note
All in-memory
transports will be reset automatically after each test in test classes extending Symfony\Bundle\FrameworkBundle\Test\KernelTestCase
or Symfony\Bundle\FrameworkBundle\Test\WebTestCase
.
Serializing Messages
New in version 4.3: The default serializer changed in 4.3 from the Symfony serializer to the native PHP serializer. Existing applications should configure their transports to use the Symfony serializer to avoid losing already-queued messages after upgrading.
When messages are sent to (and received from) a transport, they’re serialized using PHP’s native serialize() &
unserialize() functions. You can change this globally (or for each transport) to a service that implements Symfony\Component\Messenger\Transport\Serialization\SerializerInterface
:
YAML
# config/packages/messenger.yaml
framework:
messenger:
serializer:
default_serializer: messenger.transport.symfony_serializer
symfony_serializer:
format: json
context: { }
transports:
async_priority_normal:
dsn: # ...
serializer: messenger.transport.symfony_serializer
XML
<!-- config/packages/messenger.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services
https://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
<framework:config>
<framework:messenger>
<framework:serializer default-serializer="messenger.transport.symfony_serializer">
<framework:symfony-serializer format="json">
<framework:context/>
</framework:symfony-serializer>
</framework:serializer>
<framework:transport name="async_priority_normal" dsn="..." serializer="messenger.transport.symfony_serializer"/>
</framework:messenger>
</framework:config>
</container>
PHP
// config/packages/messenger.php
$container->loadFromExtension('framework', [
'messenger' => [
'serializer' => [
'default_serializer' => 'messenger.transport.symfony_serializer',
'symfony_serializer' => [
'format' => 'json',
'context' => [],
],
],
'transports' => [
'async_priority_normal' => [
'dsn' => ...,
'serializer' => 'messenger.transport.symfony_serializer',
],
],
],
]);
The messenger.transport.symfony_serializer
is a built-in service that uses the Serializer component and can be configured in a few ways. If you do choose to use the Symfony serializer, you can control the context on a case-by-case basis via the Symfony\Component\Messenger\Stamp\SerializerStamp
(see Envelopes & Stamps).
Tip
When sending/receiving messages to/from another application, you may need more control over the serialization process. Using a custom serializer provides that control. See SymfonyCasts’ message serializer tutorial for details.
Customizing Handlers
Manually Configuring Handlers
Symfony will normally find and register your handler automatically. But, you can also configure a handler manually - and pass it some extra config - by tagging the handler service with messenger.message_handler
YAML
# config/services.yaml
services:
App\MessageHandler\SmsNotificationHandler:
tags: [messenger.message_handler]
# or configure with options
tags:
-
name: messenger.message_handler
# only needed if can't be guessed by type-hint
handles: App\Message\SmsNotification
XML
<!-- config/services.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://symfony.com/schema/dic/services
https://symfony.com/schema/dic/services/services-1.0.xsd">
<services>
<service id="App\MessageHandler\SmsNotificationHandler">
<!-- handles is only needed if it can't be guessed by type-hint -->
<tag name="messenger.message_handler"
handles="App\Message\SmsNotification"/>
</service>
</services>
</container>
PHP
// config/services.php
use App\Message\SmsNotification;
use App\MessageHandler\SmsNotificationHandler;
$container->register(SmsNotificationHandler::class)
->addTag('messenger.message_handler', [
// only needed if can't be guessed by type-hint
'handles' => SmsNotification::class,
]);
Possible options to configure with tags are:
bus
from_transport
handles
method
priority
New in version 4.4: The ability to specify from_transport
on the tag, was added in Symfony 4.4.
Handler Subscriber & Options
A handler class can handle multiple messages or configure itself by implementing Symfony\Component\Messenger\Handler\MessageSubscriberInterface
:
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;
use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
class SmsNotificationHandler implements MessageSubscriberInterface
{
public function __invoke(SmsNotification $message)
{
// ...
}
public function handleOtherSmsNotification(OtherSmsNotification $message)
{
// ...
}
public static function getHandledMessages(): iterable
{
// handle this message on __invoke
yield SmsNotification::class;
// also handle this message on handleOtherSmsNotification
yield OtherSmsNotification::class => [
'method' => 'handleOtherSmsNotification',
//'priority' => 0,
//'bus' => 'messenger.bus.default',
];
}
}
Binding Handlers to Different Transports
Each message can have multiple handlers, and when a message is consumed all of its handlers are called. But you can also configure a handler to only be called when it’s received from a specific transport. This allows you to have a single message where each handler is called by a different “worker” that’s consuming a different transport.
Suppose you have an UploadedImage
message with two handlers:
ThumbnailUploadedImageHandler
: you want this to be handled by a transport calledimage_transport
NotifyAboutNewUploadedImageHandler
: you want this to be handled by a transport calledasync_priority_normal
To do this, add the from_transport
option to each handler. For example:
// src/MessageHandler/ThumbnailUploadedImageHandler.php
namespace App\MessageHandler;
use App\Message\UploadedImage;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
class ThumbnailUploadedImageHandler implements MessageSubscriberInterface
{
public function __invoke(UploadedImage $uploadedImage)
{
// do some thumbnailing
}
public static function getHandledMessages(): iterable
{
yield UploadedImage::class => [
'from_transport' => 'image_transport',
];
}
}
And similarly:
// src/MessageHandler/NotifyAboutNewUploadedImageHandler.php
// ...
class NotifyAboutNewUploadedImageHandler implements MessageSubscriberInterface
{
// ...
public static function getHandledMessages(): iterable
{
yield UploadedImage::class => [
'from_transport' => 'async_priority_normal',
];
}
}
Then, make sure to “route” your message to both transports:
YAML
# config/packages/messenger.yaml
framework:
messenger:
transports:
async_priority_normal: # ...
image_transport: # ...
routing:
# ...
'App\Message\UploadedImage': [image_transport, async_priority_normal]
XML
<!-- config/packages/messenger.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services
https://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
<framework:config>
<framework:messenger>
<framework:transport name="async_priority_normal" dsn="..."/>
<framework:transport name="image_transport" dsn="..."/>
<framework:routing message-class="App\Message\UploadedImage">
<framework:sender service="image_transport"/>
<framework:sender service="async_priority_normal"/>
</framework:routing>
</framework:messenger>
</framework:config>
</container>
PHP
// config/packages/messenger.php
$container->loadFromExtension('framework', [
'messenger' => [
'transports' => [
'async_priority_normal' => '...',
'image_transport' => '...',
],
'routing' => [
'App\Message\UploadedImage' => ['image_transport', 'async_priority_normal'],
],
],
]);
That’s it! You can now consume each transport:
# will only call ThumbnailUploadedImageHandler when handling the message
$ php bin/console messenger:consume image_transport -vv
$ php bin/console messenger:consume async_priority_normal -vv
Caution
If a handler does not have from_transport
config, it will be executed on every transport that the message is received from.
Extending Messenger
Envelopes & Stamps
A message can be any PHP object. Sometimes, you may need to configure something extra about the message - like the way it should be handled inside AMQP or adding a delay before the message should be handled. You can do that by adding a “stamp” to your message:
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
public function index(MessageBusInterface $bus)
{
$bus->dispatch(new SmsNotification('...'), [
// wait 5 seconds before processing
new DelayStamp(5000),
]);
// or explicitly create an Envelope
$bus->dispatch(new Envelope(new SmsNotification('...'), [
new DelayStamp(5000),
]));
// ...
}
Internally, each message is wrapped in an Envelope
, which holds the message and stamps. You can create this manually or allow the message bus to do it. There are a variety of different stamps for different purposes and they’re used internally to track information about a message - like the message bus that’s handling it or if it’s being retried after failure.
Middleware
What happens when you dispatch a message to a message bus depends on its collection of middleware and their order. By default, the middleware configured for each bus looks like this:
add_bus_name_stamp_middleware
- adds a stamp to record which bus this message was dispatched into;dispatch_after_current_bus
- see Transactional Messages: Handle New Messages After Handling is Done;failed_message_processing_middleware
- processes messages that are being retried via the failure transport to make them properly function as if they were being received from their original transport;- Your own collection of middleware;
send_message
- if routing is configured for the transport, this sends messages to that transport and stops the middleware chain;handle_message
- calls the message handler(s) for the given message.
Note
These middleware names are actually shortcut names. The real service ids are prefixed with messenger.middleware.
(e.g. messenger.middleware.handle_message
).
The middleware are executed when the message is dispatched but also again when a message is received via the worker (for messages that were sent to a transport to be handled asynchronously). Keep this in mind if you create your own middleware.
You can add your own middleware to this list, or completely disable the default middleware and only include your own:
YAML
# config/packages/messenger.yaml
framework:
messenger:
buses:
messenger.bus.default:
# disable the default middleware
default_middleware: false
# and/or add your own
middleware:
# service ids that implement Symfony\Component\Messenger\Middleware\MiddlewareInterface
- 'App\Middleware\MyMiddleware'
- 'App\Middleware\AnotherMiddleware'
XML
<!-- config/packages/messenger.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services
https://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
<framework:config>
<framework:messenger>
<!-- default-middleware: disable the default middleware -->
<framework:bus name="messenger.bus.default" default-middleware="false"/>
<!-- and/or add your own -->
<framework:middleware id="App\Middleware\MyMiddleware"/>
<framework:middleware id="App\Middleware\AnotherMiddleware"/>
</framework:messenger>
</framework:config>
</container>
PHP
// config/packages/messenger.php
$container->loadFromExtension('framework', [
'messenger' => [
'buses' => [
'messenger.bus.default' => [
// disable the default middleware
'default_middleware' => false,
// and/or add your own
'middleware' => [
'App\Middleware\MyMiddleware',
'App\Middleware\AnotherMiddleware',
],
],
],
],
]);
Note
If a middleware service is abstract, a different instance of the service will be created per bus.
Middleware for Doctrine
New in version 1.11: The following Doctrine middleware were introduced in DoctrineBundle 1.11.
If you use Doctrine in your app, a number of optional middleware exist that you may want to use:
YAML
# config/packages/messenger.yaml
framework:
messenger:
buses:
command_bus:
middleware:
# each time a message is handled, the Doctrine connection
# is "pinged" and reconnected if it's closed. Useful
# if your workers run for a long time and the database
# connection is sometimes lost
- doctrine_ping_connection
# After handling, the Doctrine connection is closed,
# which can free up database connections in a worker,
# instead of keeping them open forever
- doctrine_close_connection
# wraps all handlers in a single Doctrine transaction
# handlers do not need to call flush() and an error
# in any handler will cause a rollback
- doctrine_transaction
# or pass a different entity manager to any
#- doctrine_transaction: ['custom']
XML
<!-- config/packages/messenger.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services
https://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
<framework:config>
<framework:messenger>
<framework:bus name="command_bus">
<framework:middleware id="doctrine_transaction"/>
<framework:middleware id="doctrine_ping_connection"/>
<framework:middleware id="doctrine_close_connection"/>
<!-- or pass a different entity manager to any -->
<!--
<framework:middleware id="doctrine_transaction">
<framework:argument>custom</framework:argument>
</framework:middleware>
-->
</framework:bus>
</framework:messenger>
</framework:config>
</container>
PHP
// config/packages/messenger.php
$container->loadFromExtension('framework', [
'messenger' => [
'buses' => [
'command_bus' => [
'middleware' => [
'doctrine_transaction',
'doctrine_ping_connection',
'doctrine_close_connection',
// Using another entity manager
['id' => 'doctrine_transaction', 'arguments' => ['custom']],
],
],
],
],
]);
Messenger Events
In addition to middleware, Messenger also dispatches several events. You can create an event listener to hook into various parts of the process. For each, the event class is the event name:
Symfony\Component\Messenger\Event\WorkerStartedEvent
Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent
Symfony\Component\Messenger\Event\SendMessageToTransportsEvent
Symfony\Component\Messenger\Event\WorkerMessageFailedEvent
Symfony\Component\Messenger\Event\WorkerMessageHandledEvent
Symfony\Component\Messenger\Event\WorkerRunningEvent
Symfony\Component\Messenger\Event\WorkerStoppedEvent
Multiple Buses, Command & Event Buses
Messenger gives you a single message bus service by default. But, you can configure as many as you want, creating “command”, “query” or “event” buses and controlling their middleware. See Multiple Buses.
Learn more
- How to Create Your own Messenger Transport
- Transactional Messages: Handle New Messages After Handling is Done
- Getting Results from your Handler
- Multiple Buses
This work, including the code samples, is licensed under a Creative Commons BY-SA 3.0 license.