layout | parent | title | nav_order |
---|---|---|---|
default |
Symfony bundle |
Quick tour |
1 |
{% include support.md %}
The EnqueueBundle integrates enqueue library. It adds easy to use configuration layer, register services, adds handy cli commands.
$ composer require enqueue/enqueue-bundle enqueue/fs
Note: You could various other transports.
Note: If you are looking for a way to migrate from php-amqplib/rabbitmq-bundle
read this article.
Then, enable the bundle by adding new Enqueue\Bundle\EnqueueBundle()
to the bundles array of the registerBundles method in your project's app/AppKernel.php
file:
<?php
// src/Kernel.php
namespace App;
use Symfony\Component\HttpKernel\Kernel as BaseKernel;
class Kernel extends BaseKernel
{
public function registerBundles()
{
$bundles = array(
// ...
new Enqueue\Bundle\EnqueueBundle(),
);
// ...
}
// ...
}
First, you have to configure a transport layer. You can optionally configure multiple transports if you want to. One of them will automatically become the default, based on the following:
- If there is a transport named
default
, then it will become the default. - First one specified otherwise.
Default transport's services will be available to you in the usual Symfony container under their respective class interfaces (see below)
# app/config/config.yml
enqueue:
default:
transport: "amqp:"
client: ~
some_other_transport:
transport: "amqp:"
client: ~
Once you configured everything you can start producing messages.
As stated previously, default transport services are available in container. Here we are using ProducerInterface
to
produce message to the default
transport.
<?php
use Enqueue\Client\Message;
use Enqueue\Client\ProducerInterface;
/** @var ProducerInterface $producer **/
$producer = $container->get(ProducerInterface::class);
// If you want a different producer than default (for example the other specified in sample above) then use
// $producer = $container->get('enqueue.client.some_other_transport.producer');
// send event to many consumers
$producer->sendEvent('aFooTopic', 'Something has happened');
// You can also pass an instance of Enqueue\Client\Message as second argument if you need more flexibility.
$properties = [];
$headers = [];
$message = new Message('Message body', $properties, $headers);
$producer->sendEvent('aBarTopic', $message);
// send command to ONE consumer
$producer->sendCommand('aProcessorName', 'Something has happened');
To consume messages you have to first create a message processor.
Example below shows how to create a Processor that will receive messages from aFooTopic
topic (and only that one).
It assumes that you're using default Symfony services configuration and this class is
autoconfigured. Otherwise you'll
have to tag it manually. This is especially true if you're using multiple transports: if left autoconfigured, processor
will be attached to the default transport only.
Note: Topic in enqueue and topic on some transports (for example Kafka) are two different things.
<?php
use Interop\Queue\Message;
use Interop\Queue\Context;
use Interop\Queue\Processor;
use Enqueue\Client\TopicSubscriberInterface;
class FooProcessor implements Processor, TopicSubscriberInterface
{
public function process(Message $message, Context $session)
{
echo $message->getBody();
return self::ACK;
// return self::REJECT; // when the message is broken
// return self::REQUEUE; // the message is fine but you want to postpone processing
}
public static function getSubscribedTopics()
{
return ['aFooTopic'];
}
}
Register it as a container service. Subscribe it to the topic if you are not using autowiring.
foo_message_processor:
class: 'FooProcessor'
tags:
- { name: 'enqueue.topic_subscriber' }
# Use the variant below to attach to a specific client
# Also note that if you don't disable autoconfigure, above tag will be applied automatically for default client
# - { name: 'enqueue.topic_subsciber', client: 'some_other_transport' }
Now you can start consuming messages:
$ ./bin/console enqueue:consume --setup-broker -vvv
You can select a specific client for consumption:
$ ./bin/console enqueue:consume --setup-broker --client="some_other_transport" -vvv
Note: Add -vvv to find out what is going while you are consuming messages. There is a lot of valuable debug info there.