Skip to content

Commit

Permalink
Use native requeue mechanism instead of symfony/messenger resends
Browse files Browse the repository at this point in the history
  • Loading branch information
grachevko committed Jun 9, 2021
1 parent 5eea27c commit 70c4667
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 5 deletions.
6 changes: 6 additions & 0 deletions src/Messenger/NsqReceiver.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
Expand Down Expand Up @@ -76,6 +77,7 @@ public function get(): iterable
$envelope->with(
new NsqReceivedStamp($message),
new TransportMessageIdStamp($message->id),
new RedeliveryStamp($message->attempts - 1),
),
];
}
Expand All @@ -97,6 +99,10 @@ public function reject(Envelope $envelope): void
{
$message = NsqReceivedStamp::getMessageFromEnvelope($envelope);

if ($message->isProcessed()) {
return;
}

wait($message->finish());
}

Expand Down
22 changes: 17 additions & 5 deletions src/Messenger/NsqSender.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,26 @@ public function send(Envelope $envelope): Envelope
{
$producer = $this->getProducer();

$encodedMessage = $this->serializer->encode($envelope->withoutAll(NsqReceivedStamp::class));
$encodedMessage = json_encode($encodedMessage, JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE);

/** @var DelayStamp|null $delayStamp */
$delayStamp = $envelope->last(DelayStamp::class);
$delay = null !== $delayStamp ? $delayStamp->getDelay() : null;
$delay = null !== $delayStamp ? $delayStamp->getDelay() : 0;

$promise = null;

$promise = $producer->publish($this->topic, $encodedMessage, $delay);
if (null !== $envelope->last(NsqReceivedStamp::class)) {
$message = NsqReceivedStamp::getMessageFromEnvelope($envelope);

if (!$message->isProcessed()) {
$promise = $message->requeue($delay);
}
}

if (null === $promise) {
$encodedMessage = $this->serializer->encode($envelope->withoutAll(NsqReceivedStamp::class));
$encodedMessage = json_encode($encodedMessage, JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE);

$promise = $producer->publish($this->topic, $encodedMessage, $delay);
}

wait($promise);

Expand Down

0 comments on commit 70c4667

Please sign in to comment.