(En)queue Symfony console commands

[BLOG] Run and schedule Symfony console commands with Enqueue.

What and why Enqueue?

At Yappa, we have always used Johannes' JMSJobQueueBundle to run and schedule Symfony console commands for background jobs.

However, we've stumbled upon a much more elegant solution called Enqueue, created by Forma-Pro. In their words:

"Enqueue is production ready, battle-tested messaging solution for PHP. Provides a common way for programs to create, send, read messages."

It's packed with features, supports major brokers such as RabbitMQ, Kafka, Amazon SQS, Google PubSub, Redis etc. and has a bundle ready to be used with Symfony.

Check their quick tour page on github to learn more!

One downside is that the Enqueue Symfony bundle doesn't provide an out of the box solution to queue Symfony console commands and there's no 100% straight forward way to implement this.

In this post I'll cover the basics in setting up the Enqueue Symfony bundle so we can easily queue Symfony console commands!

Installing the Enqueue Symfony bundle

Nothing special here, we'll install the bundle as described in the quick tour. However, we'll use the file transport instead of AMPQ transport layer. Let's get to it!

Require the packages with composer and load the bundle in your AppKernel.

composer require enqueue/enqueue-bundle enqueue/fs
The Filesystem transport solution is simple and easy to get you started. When your application needs to scale, you might want to take a look at Amazon' SQS  or one of the other +10 MQ brokers.
<?php

// app/AppKernel.php

// ...
class AppKernel extends Kernel
{
    public function registerBundles()
    {
        $bundles = [
            // ...

            new Enqueue\Bundle\EnqueueBundle(),
        ];

        // ...
    }

    // ...
}

Configuring the bundle

Configure the basic Filesystem transport settings in config.yml (or the configuration file/format you're using). This is all configuration required before you can benefit from the Enqueue bundle.

Creating the processor

When a command is pushed into the queue, this command needs to be processed. This means that our Symfony application should pick up a message from the queue and execute it.

We'll make a new processor called RunCommandProcessor which is subscribed to the run_command topic, so it can only process messages for that topic.

The functionality of this RunCommandProcessor is fairly easy, it just creates a Process and runs it.

Register the RunCommandProcessor as a service and tag it as a enqueue.client.processor. This way the BuildProcessorRegistryPass will pick it up and add it to the ContainerAwareProcessorRegistry.

This registry will take care of all processors and look them up when needed.

services:
    AppBundle\Service\Queue\Processor\RunCommandProcessor:
        arguments:
            - "%kernel.project_dir%"
        tags:
            - { name: 'enqueue.client.processor', topicName: 'run_command' }

Creating a QueuedCommand value object

For simplicity and ease of use in our application, I've created a QueuedCommand value object.
This will hold the Console Command name as a string and parameters as a assoc array.

Adding console commands to the queue

We'll need to add QueuedCommand objects to the correct queue and topic.
A QueuedCommandHandler will handle this for us. This is a simple implementation for a simple use case.
In this implementation the createArgumentString is a bit naive and could use some finetuning.

Register the QueuedCommandHandler as a service and pass the additional parameters in the constructor.

Adding (test) commands to the queue

Use a simple controller action to add a console command to the queue.
If all goes well, the Symfony Profiler will show information about the sent messages in the Message Queue. This is an addition of the EnqueueBundle which makes it easy to debug messages.

There is also a file created in file://%persistent_cache_dir%/enqueue/ with all messages serialized and concatenated. This is part of the FileSystem transport solution.

|{"body":"cache:clear no-warmup --env=dev","properties":{"enqueue.topic_name":"__command__","enqueue.command_name":"run_command","enqueue.processor_name":"Enqueue\\Client\\RouterProcessor","enqueue.processor_queue_name":"default"},"headers":{"content_type":"text\/plain","message_id":"d7786e29-4479-4f0e-a052-ce2a514a63d9","timestamp":1518598153,"reply_to":null,"correlation_id":""}}

Consume the queue!

Last but not least, we need to start a client's worker that can process the messages. It connects to the queue and selects the appropriate message processor based on a message headers.

bin/console enqueue:consume --env=dev --setup-broker --time-limit=now+5min -vvv

In production environments you want this consume command to run at all times. You can read about it in the Enqueue docs.

Side notes

This is just an example on how you can use the Enqueue package. A lot of the above code is good for prototyping and getting you started.

 

References

  • https://martinfowler.com/eaaCatalog/registry.html
  • https://github.com/php-enqueue/enqueue-dev
  • https://enqueue.forma-pro.com/
  • https://aws.amazon.com/sqs/
  • https://github.com/php-enqueue/enqueue-dev/tree/master/docs/transport
  • https://github.com/php-enqueue/enqueue-dev/blob/master/docs/bundle/quick_tour.md
  • https://github.com/php-enqueue/enqueue-dev/blob/master/docs/bundle/production_settings.md
  • http://supervisord.org/
  • https://symfony.com/