diff --git a/src/Messenger/NsqReceiver.php b/src/Messenger/NsqReceiver.php index dc461b5..9990a54 100644 --- a/src/Messenger/NsqReceiver.php +++ b/src/Messenger/NsqReceiver.php @@ -11,6 +11,7 @@ use Psr\Log\LoggerInterface; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; +use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -76,6 +77,7 @@ public function get(): iterable $envelope->with( new NsqReceivedStamp($message), new TransportMessageIdStamp($message->id), + new RedeliveryStamp($message->attempts - 1), ), ]; } @@ -97,6 +99,10 @@ public function reject(Envelope $envelope): void { $message = NsqReceivedStamp::getMessageFromEnvelope($envelope); + if ($message->isProcessed()) { + return; + } + wait($message->finish()); } diff --git a/src/Messenger/NsqSender.php b/src/Messenger/NsqSender.php index 751e482..4e29b9c 100644 --- a/src/Messenger/NsqSender.php +++ b/src/Messenger/NsqSender.php @@ -35,14 +35,26 @@ public function send(Envelope $envelope): Envelope { $producer = $this->getProducer(); - $encodedMessage = $this->serializer->encode($envelope->withoutAll(NsqReceivedStamp::class)); - $encodedMessage = json_encode($encodedMessage, JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE); - /** @var DelayStamp|null $delayStamp */ $delayStamp = $envelope->last(DelayStamp::class); - $delay = null !== $delayStamp ? $delayStamp->getDelay() : null; + $delay = null !== $delayStamp ? $delayStamp->getDelay() : 0; + + $promise = null; - $promise = $producer->publish($this->topic, $encodedMessage, $delay); + if (null !== $envelope->last(NsqReceivedStamp::class)) { + $message = NsqReceivedStamp::getMessageFromEnvelope($envelope); + + if (!$message->isProcessed()) { + $promise = $message->requeue($delay); + } + } + + if (null === $promise) { + $encodedMessage = $this->serializer->encode($envelope->withoutAll(NsqReceivedStamp::class)); + $encodedMessage = json_encode($encodedMessage, JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE); + + $promise = $producer->publish($this->topic, $encodedMessage, $delay); + } wait($promise);