This bundle aims to provide a simple Kafka transport for Symfony Messenger.
Open a command console, enter your project directory and execute:
$ composer require koco/messenger-kafka
Open a command console, enter your project directory and execute the following command to download the latest stable version of this bundle:
$ composer require koco/messenger-kafka
This command requires you to have Composer installed globally, as explained in the installation chapter of the Composer documentation.
Then, enable the bundle by adding it to the list of registered bundles
in the config/bundles.php
file of your project:
// config/bundles.php
return [
// ...
Koco\Kafka\KocoKafkaBundle::class => ['all' => true],
];
Specify a DSN starting with either kafka://
or kafka+ssl://
. Multiple brokers are separated by ,
.
kafka://my-local-kafka:9092
kafka+ssl://my-staging-kafka:9093
kafka+ssl://prod-kafka-01:9093,kafka+ssl://prod-kafka-02:9093,kafka+ssl://prod-kafka-03:9093
The configuration options for kafka_conf
and topic_conf
can be found here.
It is highly recommended to set enable.auto.offset.store
to false
for consumers. Otherwise, every message will be acknowledged, regardless of any error thrown by the message handlers.
framework:
messenger:
transports:
producer:
dsn: '%env(KAFKA_URL)%'
# serializer: App\Infrastructure\Messenger\MySerializer
options:
flushTimeout: 10000
topic:
name: 'events'
kafka_conf:
security.protocol: 'sasl_ssl'
ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem'
sasl.username: '%env(KAFKA_SASL_USERNAME)%'
sasl.password: '%env(KAFKA_SASL_PASSWORD)%'
sasl.mechanisms: 'SCRAM-SHA-256'
consumer:
dsn: '%env(KAFKA_URL)%'
# serializer: App\Infrastructure\Messenger\MySerializer
options:
commitAsync: true
receiveTimeout: 10000
topic:
name: "events"
kafka_conf:
enable.auto.offset.store: 'false'
group.id: 'my-group-id' # should be unique per consumer
security.protocol: 'sasl_ssl'
ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem'
sasl.username: '%env(KAFKA_SASL_USERNAME)%'
sasl.password: '%env(KAFKA_SASL_PASSWORD)%'
sasl.mechanisms: 'SCRAM-SHA-256'
max.poll.interval.ms: '45000'
topic_conf:
auto.offset.reset: 'earliest'
If you consume messages generated by other systems, you will most likely need to implement your own Serializer. Please see: https://symfony.com/doc/current/messenger.html#serializing-messages
<?php
namespace App\Infrastructure\Messenger;
use App\Messages\MyPHPClass;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
final class MySerializer extends Serializer
{
public function decode(array $encodedEnvelope): Envelope
{
$headers = [
'Content-Type' => 'application/json'
];
if ($encodedEnvelope['headers']['__TypeId__'] === 'MyJavaClass') {
$headers['type'] = MyPHPClass::class;
}
return parent::decode([
'headers' => $headers,
'body' => $encodedEnvelope['body']
]);
}
}