RabbitMQ is an optional dependency used for async message queuing. The framework provides two classes — RabbitMQ for dispatching messages and RabbitMQConsumer for consuming them — both driven by QueueEnum which defines the available queues and ties them to the Symfony Messenger transport configuration.
RabbitMQ requires:
php-amqplib/php-amqplib Composer package (included in the template)ext-amqp PHP extensionMESSENGER_TRANSPORT_DSN set in .envconfig/packages/messenger.yamlIf you're not using async queues, none of this is needed and RabbitMQ can be left out entirely.
MESSENGER_TRANSPORT_DSN=amqp://admin:admin@127.0.0.1:7004/%2f
The DSN format is amqp://user:password@host:port/vhost. The default vhost in RabbitMQ is /, which URL-encodes to %2f.
Queues are defined as Symfony Messenger transports in config/packages/messenger.yaml:
framework:
messenger:
failure_transport: async
transports:
async: 'in-memory://'
default_queue: '%env(MESSENGER_TRANSPORT_DSN)%/default_queue'
routing:
'stdClass': default_queue
Each transport name under transports must have a matching case in QueueEnum. The transport name is the RabbitMQ queue name — default_queue in the example above.
QueueEnum is the central place to define and access queues. It's a backed enum where each case value is the queue name as defined in messenger.yaml:
// App/Enums/Amqp/QueueEnum.php
enum QueueEnum: string
{
case DEFAULT = 'default_queue';
public function getMessageBus(): RabbitMQ
{
return RabbitMQ::getInstance( $this );
}
public function consume( callable $callback ): void
{
( new RabbitMQConsumer() )
->consume( $this, $callback );
}
}
Adding a new queue means two steps — adding a case to QueueEnum and adding the matching transport to messenger.yaml. If the queue name in QueueEnum doesn't match a transport in messenger.yaml, RabbitMQ::getInstance() will throw InvalidRabbitMQConfigurationException on first use.
Use QueueEnum to get a RabbitMQ instance and dispatch:
// Dispatch a JSON string to the default queue
QueueEnum::DEFAULT
->getMessageBus()
->dispatch( json_encode( [
'type' => 'player_action',
'user_id' => $userId,
'action' => 'build',
'data' => $actionData,
] ) );
dispatch() accepts a string. Serialize your payload before passing it in — JSON is the recommended format.
RabbitMQ is a singleton per queue — RabbitMQ::getInstance( QueueEnum::DEFAULT ) always returns the same connection for that queue within a request. The AMQP connection and channel are opened once on first call and closed in __destruct().
Consumption is a blocking operation intended for long-running console commands, not HTTP requests. Create a Symfony console command and use QueueEnum::consume():
// App/Command/ProcessPlayerActionsCommand.php
#[AsCommand(
name: 'app:queue:process-player-actions',
description: 'Process incoming player action messages',
)]
final class ProcessPlayerActionsCommand extends Command
{
protected function execute( InputInterface $input, OutputInterface $output ): int
{
$io = new SymfonyStyle( $input, $output );
$io->info( 'Listening for player actions...' );
QueueEnum::DEFAULT->consume( function ( array $data ) use ( $io ): void {
$io->text( sprintf(
'Processing action "%s" for user %d',
$data['action'],
$data['user_id']
) );
// handle the message
} );
return Command::SUCCESS;
}
}
The callback receives the decoded JSON payload as an array. RabbitMQConsumer calls json_decode() on the raw message body before passing it to your callback, so you always receive an array rather than a raw string.
The consumer blocks indefinitely in while ( $this->channel->is_consuming() ) until the process is killed or the connection drops. Run it under a process supervisor (systemd, Supervisor, etc.) in production.
Step 1 — Add the transport to messenger.yaml:
transports:
async: 'in-memory://'
default_queue: '%env(MESSENGER_TRANSPORT_DSN)%/default_queue'
game_events: '%env(MESSENGER_TRANSPORT_DSN)%/game_events'
Step 2 — Add the case to QueueEnum:
enum QueueEnum: string
{
case DEFAULT = 'default_queue';
case GAME_EVENTS = 'game_events';
}
Step 3 — Use it:
QueueEnum::GAME_EVENTS
->getMessageBus()
->dispatch( json_encode( $eventPayload ) );
RabbitMQ is a singleton per queue. On first instantiation for a given QueueEnum case it:
config/packages/messenger.yaml to validate the queue existsMESSENGER_TRANSPORT_DSN to extract host, port, user, passwordAMQPStreamConnectiondurable: trueThe connection and channel are stored as instance properties and reused for all dispatch() calls on the same queue within a request. Both are closed cleanly in __destruct().
// Internal connection parsing — handled automatically
$dsn = parse_url( env( 'MESSENGER_TRANSPORT_DSN' ) );
// host, port, user, pass extracted from DSN
Unlike RabbitMQ, RabbitMQConsumer is not a singleton — instantiate it fresh each time you need to consume. It opens its own connection and channel, declares the same queue, and sets up a basic consumer with basic_consume().
The internal callback wraps your callable and handles JSON decoding:
$internalCallback = function ( AMQPMessage $msg ) use ( $callback ): void {
$data = json_decode( $msg->getBody(), true );
$callback( $data );
};
auto_ack is set to true — messages are acknowledged automatically on delivery. If your callback throws, the message will not be requeued. Add your own try/catch inside the callback if you need error handling or dead-letter routing.
Process supervision — consumer commands must be kept alive by a process manager. If the command exits (connection drop, OOM kill, deploy restart), messages will queue up in RabbitMQ until the consumer restarts. Use systemd or Supervisor with autorestart=true.
Message acknowledgement — the current implementation uses auto-ack. For critical workloads where message loss is unacceptable, you'll need to modify RabbitMQConsumer to use manual acknowledgement (auto_ack: false) and call $msg->ack() after successful processing.
Failure transport — messenger.yaml sets failure_transport: async which routes failed messages back to the async (in-memory) transport. For production you likely want a dedicated dead-letter queue instead.
Multiple consumers — you can run multiple instances of the same consumer command in parallel. RabbitMQ will round-robin messages between them automatically.
Queue name mismatch — the QueueEnum case value must exactly match the transport name in messenger.yaml. A mismatch throws InvalidRabbitMQConfigurationException with a clear message pointing to the yaml file.
Dispatching in a consumer callback — dispatching a message from inside a consumer callback on the same queue will work but can cause processing loops if the dispatch is unconditional. Always guard dispatch calls inside callbacks with a condition.
Running consumers in HTTP requests — consume() blocks indefinitely. Never call it in a controller or middleware. It belongs exclusively in console commands.
Missing MESSENGER_TRANSPORT_DSN — if the env var is not set, parse_url() will receive null and the connection will fail with a confusing error. The env var must be set even if you define it with a default in the class — the class default (amqp://guest:guest@localhost:7004) is a framework fallback and may not match your Docker setup (the template project uses admin:admin).