(En)queue Symfony console commands
[BLOG] Run and schedule Symfony console commands with Enqueue.

What and why Enqueue?
At Yappa, we have always used Johannes' JMSJobQueueBundle to run and schedule Symfony console commands for background jobs.
However, we've stumbled upon a much more elegant solution called Enqueue, created by Forma-Pro. In their words:
"Enqueue is production ready, battle-tested messaging solution for PHP. Provides a common way for programs to create, send, read messages."
It's packed with features, supports major brokers such as RabbitMQ, Kafka, Amazon SQS, Google PubSub, Redis etc. and has a bundle ready to be used with Symfony.
Check their quick tour page on github to learn more!
One downside is that the Enqueue Symfony bundle doesn't provide an out of the box solution to queue Symfony console commands and there's no 100% straight forward way to implement this.
In this post I'll cover the basics in setting up the Enqueue Symfony bundle so we can easily queue Symfony console commands!
Installing the Enqueue Symfony bundle
Nothing special here, we'll install the bundle as described in the quick tour. However, we'll use the file transport instead of AMPQ transport layer. Let's get to it!
Require the packages with composer and load the bundle in your AppKernel.
composer require enqueue/enqueue-bundle enqueue/fs
The Filesystem transport solution is simple and easy to get you started. When your application needs to scale, you might want to take a look at Amazon' SQS or one of the other +10 MQ brokers. <?php
// app/AppKernel.php
// ...
class AppKernel extends Kernel
public function registerBundles()
$bundles = [
// ...
new Enqueue\Bundle\EnqueueBundle(),
// ...
// ...
Configuring the bundle
Configure the basic Filesystem transport settings in config.yml (or the configuration file/format you're using). This is all configuration required before you can benefit from the Enqueue bundle.
parameters: | |
persistent_cache_dir: /tmp | |
app_name: ProjectX | |
enqueue: | |
transport: | |
default: 'fs' | |
fs: | |
dsn: "file://%persistent_cache_dir%/" | |
path: "./enqueue" // The directory where all queue\topic files remain. | |
pre_fetch_count: 1 // Defines how many messages to fetch from the file. | |
chmod: 600 // Defines a mode the files are created with | |
polling_interval: 100 // How often query for new messages, default 100 (milliseconds) | |
client: | |
app_name: '%app_name%_%kernel.environment%' | |
default_processor_queue: 'default' | |
traceable_producer: true | |
async_events: false | |
job: false |
Creating the processor
When a command is pushed into the queue, this command needs to be processed. This means that our Symfony application should pick up a message from the queue and execute it.
We'll make a new processor called RunCommandProcessor which is subscribed to the run_command topic, so it can only process messages for that topic.
The functionality of this RunCommandProcessor is fairly easy, it just creates a Process and runs it.
<?php | |
namespace AppBundle\Service\Queue\Processor; | |
use Enqueue\Client\CommandSubscriberInterface; | |
use Enqueue\Consumption\Result; | |
use Interop\Queue\PsrContext; | |
use Interop\Queue\PsrMessage; | |
use Interop\Queue\PsrProcessor; | |
use Symfony\Component\Process\Exception\ProcessFailedException; | |
use Symfony\Component\Process\Process; | |
class RunCommandProcessor implements PsrProcessor, CommandSubscriberInterface | |
{ | |
/** | |
* @var string | |
*/ | |
private $projectDir; | |
public function __construct(string $projectDir) | |
{ | |
$this->projectDir = $projectDir; | |
} | |
public function process(PsrMessage $message, PsrContext $context) | |
{ | |
$commandline = $message->getBody(); | |
$process = new Process('./bin/console '.$commandline, $this->projectDir); | |
try { | |
$process->mustRun(); | |
return Result::ACK; | |
} catch (ProcessFailedException $e) { | |
return Result::reject(sprintf('The process failed with exception: "%s" in %s at %s', $e->getMessage(), $e->getFile(), $e->getLine())); | |
} | |
} | |
public static function getSubscribedCommand() | |
{ | |
return 'run_command'; | |
} | |
} |
Register the RunCommandProcessor as a service and tag it as a enqueue.client.processor. This way the BuildProcessorRegistryPass will pick it up and add it to the ContainerAwareProcessorRegistry.
This registry will take care of all processors and look them up when needed.
- "%kernel.project_dir%"
- { name: 'enqueue.client.processor', topicName: 'run_command' }
Creating a QueuedCommand value object
For simplicity and ease of use in our application, I've created a QueuedCommand value object.This will hold the Console Command name as a string and parameters as a assoc array.
<?php | |
namespace AppBundle\Model\Queue; | |
class QueuedCommand | |
{ | |
/** | |
* @var string | |
*/ | |
private $name; | |
/** | |
* @var array | |
*/ | |
private $parameters; | |
public function __construct(string $name, array $parameters) | |
{ | |
$this->name = $name; | |
$this->parameters = $parameters; | |
} | |
/** | |
* @return string | |
*/ | |
public function getName(): string | |
{ | |
return $this->name; | |
} | |
/** | |
* @return array | |
*/ | |
public function getParameters(): array | |
{ | |
return $this->parameters; | |
} | |
} |
Adding console commands to the queue
We'll need to add QueuedCommand objects to the correct queue and topic.A QueuedCommandHandler will handle this for us. This is a simple implementation for a simple use case.
In this implementation the createArgumentString is a bit naive and could use some finetuning.
<?php | |
namespace AppBundle\Service\Queue; | |
use AppBundle\Model\Queue\QueuedCommand; | |
use Enqueue\Client\ProducerInterface; | |
class QueuedCommandHandler | |
{ | |
/** | |
* @var ProducerInterface | |
*/ | |
protected $producer; | |
/** | |
* @var string | |
*/ | |
protected $env; | |
/** | |
* @param ProducerInterface $producer | |
* @param string $env | |
*/ | |
public function __construct(ProducerInterface $producer, $env) | |
{ | |
$this->producer = $producer; | |
$this->env = $env; | |
} | |
public function handle(QueuedCommand $command): void | |
{ | |
$argumentString = $this->createArgumentString($command->getParameters()); | |
$fullCommand = sprintf('%s %s', $command->getName(), $argumentString); | |
$this->producer->sendCommand('run_command', $fullCommand); | |
} | |
public function handleString(string $fullCommand): void | |
{ | |
$this->producer->sendCommand('run_command', $fullCommand); | |
} | |
private function createArgumentString(array $arguments) | |
{ | |
$optionList = []; | |
foreach ($arguments as $key => $value) { | |
if (!is_int($key)) { | |
$optionList[] = sprintf('--%s=%s', $key, $value); | |
continue; | |
} | |
$optionList[] = sprintf('%s', $value); | |
} | |
$optionList[] = sprintf('--env=%s', $this->env); | |
return implode(' ', $optionList); | |
} | |
} |
services: | |
AppBundle\Service\Queue\QueuedCommandHandler: | |
arguments: | |
$env: "%kernel.environment%" | |
AppBundle\Service\Queue\Processor\RunCommandProcessor: | |
arguments: | |
- "%kernel.project_dir%" | |
tags: | |
- { name: 'enqueue.client.processor', topicName: 'run_command' } |
Adding (test) commands to the queue
Use a simple controller action to add a console command to the queue.
If all goes well, the Symfony Profiler will show information about the sent messages in the Message Queue. This is an addition of the EnqueueBundle which makes it easy to debug messages.
<?php | |
namespace AppBundle\Controller; | |
use AppBundle\Model\Queue\QueuedCommand; | |
use AppBundle\Service\Queue\QueuedCommandHandler; | |
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route; | |
use Symfony\Bundle\FrameworkBundle\Controller\Controller; | |
use Symfony\Component\HttpFoundation\Response; | |
class QueueController extends Controller | |
{ | |
/** | |
* @Route("/enqueue") | |
*/ | |
public function addCommandsAction(QueuedCommandHandler $handler) | |
{ | |
$command = new QueuedCommand('cache:clear', ['--no-warmup']); | |
$handler->handle($command); | |
return new Response('OK'); | |
} | |
} |
There is also a file created in file://%persistent_cache_dir%/enqueue/ with all messages serialized and concatenated. This is part of the FileSystem transport solution.
|{"body":"cache:clear no-warmup --env=dev","properties":{"enqueue.topic_name":"__command__","enqueue.command_name":"run_command","enqueue.processor_name":"Enqueue\\Client\\RouterProcessor","enqueue.processor_queue_name":"default"},"headers":{"content_type":"text\/plain","message_id":"d7786e29-4479-4f0e-a052-ce2a514a63d9","timestamp":1518598153,"reply_to":null,"correlation_id":""}}
Consume the queue!
Last but not least, we need to start a client's worker that can process the messages. It connects to the queue and selects the appropriate message processor based on a message headers.
bin/console enqueue:consume --env=dev --setup-broker --time-limit=now+5min -vvv
In production environments you want this consume command to run at all times. You can read about it in the Enqueue docs.
Side notes
This is just an example on how you can use the Enqueue package. A lot of the above code is good for prototyping and getting you started.
- https://martinfowler.com/eaaCatalog/registry.html
- https://github.com/php-enqueue/enqueue-dev
- https://enqueue.forma-pro.com/
- https://aws.amazon.com/sqs/
- https://github.com/php-enqueue/enqueue-dev/tree/master/docs/transport
- https://github.com/php-enqueue/enqueue-dev/blob/master/docs/bundle/quick_tour.md
- https://github.com/php-enqueue/enqueue-dev/blob/master/docs/bundle/production_settings.md
- http://supervisord.org/
- https://symfony.com/