Skip to content

Commit

Permalink
[Messenger] Extend SQS visibility timeout for messages that are still…
Browse files Browse the repository at this point in the history
… being processed
  • Loading branch information
valtzu committed Oct 14, 2024
1 parent d2123d5 commit cd8b6f9
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 5 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
CHANGELOG
=========

7.2
---

* Implement the `KeepaliveReceiverInterface` to enable asynchronously notifying SQS that the job is still being processed, in order to avoid timeouts

6.4
---

Expand Down
17 changes: 16 additions & 1 deletion Tests/Transport/AmazonSqsIntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ public function testConnectionSendAndGet()

private function execute(string $dsn): void
{
$connection = Connection::fromDsn($dsn, []);
$connection = Connection::fromDsn($dsn, ['visibility_timeout' => 1]);
$connection->setup();
$this->clearSqs($dsn);

$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class, DummyMessage::class => 'special']);
$messageSentAt = microtime(true);
$this->assertSame(1, $connection->getMessageCount());

$wait = 0;
Expand All @@ -55,6 +56,20 @@ private function execute(string $dsn): void

$this->assertEquals('{"message": "Hi"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class, DummyMessage::class => 'special'], $encoded['headers']);

$this->waitUntilElapsed(seconds: 1.0, since: $messageSentAt);
$connection->keepalive($encoded['id']);
$this->waitUntilElapsed(seconds: 2.0, since: $messageSentAt);
$this->assertSame(0, $connection->getMessageCount(), 'The queue should be empty since visibility timeout was extended');
$connection->delete($encoded['id']);
}

private function waitUntilElapsed(float $seconds, float $since): void
{
$waitTime = $seconds - (microtime(true) - $since);
if ($waitTime > 0) {
usleep((int) ($waitTime * 1e6));
}
}

private function clearSqs(string $dsn): void
Expand Down
13 changes: 13 additions & 0 deletions Tests/Transport/AmazonSqsReceiverTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceiver;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
Expand Down Expand Up @@ -54,6 +56,17 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
iterator_to_array($receiver->get());
}

public function testKeepalive()
{
$serializer = $this->createSerializer();

$connection = $this->createMock(Connection::class);
$connection->expects($this->once())->method('keepalive')->with('123', 10);

$receiver = new AmazonSqsReceiver($connection, $serializer);
$receiver->keepalive(new Envelope(new DummyMessage('foo'), [new AmazonSqsReceivedStamp('123')]), 10);
}

private function createSqsEnvelope()
{
return [
Expand Down
26 changes: 26 additions & 0 deletions Tests/Transport/AmazonSqsTransportTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceiver;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsTransport;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
Expand Down Expand Up @@ -151,6 +152,31 @@ public function testItConvertsHttpExceptionDuringResetIntoTransportException()
$this->transport->reset();
}

public function testKeepalive()
{
$transport = $this->getTransport(
null,
$connection = $this->createMock(Connection::class),
);

$connection->expects($this->once())->method('keepalive')->with('123', 10);
$transport->keepalive(new Envelope(new DummyMessage('foo'), [new AmazonSqsReceivedStamp('123')]), 10);
}

public function testKeepaliveWhenASqsExceptionOccurs()
{
$transport = $this->getTransport(
null,
$connection = $this->createMock(Connection::class),
);

$exception = $this->createHttpException();
$connection->expects($this->once())->method('keepalive')->with('123')->willThrowException($exception);

$this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception));
$transport->keepalive(new Envelope(new DummyMessage('foo'), [new AmazonSqsReceivedStamp('123')]));
}

private function getTransport(?SerializerInterface $serializer = null, ?Connection $connection = null)
{
$serializer ??= $this->createMock(SerializerInterface::class);
Expand Down
28 changes: 28 additions & 0 deletions Tests/Transport/ConnectionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use Symfony\Component\HttpClient\MockHttpClient;
use Symfony\Component\HttpClient\Response\MockResponse;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Contracts\HttpClient\HttpClientInterface;

class ConnectionTest extends TestCase
Expand Down Expand Up @@ -357,6 +358,33 @@ public function testLoggerWithDebugOption()
$connection->get();
}

public function testKeepalive()
{
$expectedParams = [
'QueueUrl' => $queueUrl = 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
'ReceiptHandle' => $id = 'abc',
'VisibilityTimeout' => $visibilityTimeout = 30,
];

$client = $this->createMock(SqsClient::class);
$client->expects($this->once())->method('changeMessageVisibility')->with($expectedParams);

$connection = new Connection(['visibility_timeout' => $visibilityTimeout], $client, $queueUrl);
$connection->keepalive($id);
}

public function testKeepaliveWithTooSmallTtl()
{
$client = $this->createMock(SqsClient::class);
$client->expects($this->never())->method($this->anything());

$connection = new Connection(['visibility_timeout' => 1], $client, 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue');

$this->expectException(TransportException::class);
$this->expectExceptionMessage('SQS visibility_timeout (1s) cannot be smaller than the keepalive interval (2s).');
$connection->keepalive('123', 2);
}

private function getMockedQueueUrlResponse(): MockResponse
{
if ($this->isAsyncAwsSqsVersion2Installed()) {
Expand Down
13 changes: 11 additions & 2 deletions Transport/AmazonSqsReceiver.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

/**
* @author Jérémy Derussé <jeremy@derusse.com>
*/
class AmazonSqsReceiver implements ReceiverInterface, MessageCountAwareInterface
class AmazonSqsReceiver implements KeepaliveReceiverInterface, MessageCountAwareInterface
{
private SerializerInterface $serializer;

Expand Down Expand Up @@ -78,6 +78,15 @@ public function reject(Envelope $envelope): void
}
}

public function keepalive(Envelope $envelope, ?int $seconds = null): void
{
try {
$this->connection->keepalive($this->findSqsReceivedStamp($envelope)->getId(), $seconds);
} catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
}

public function getMessageCount(): int
{
try {
Expand Down
11 changes: 10 additions & 1 deletion Transport/AmazonSqsTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use AsyncAws\Core\Exception\Http\HttpException;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
Expand All @@ -26,7 +27,7 @@
/**
* @author Jérémy Derussé <jeremy@derusse.com>
*/
class AmazonSqsTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface, ResetInterface
class AmazonSqsTransport implements TransportInterface, KeepaliveReceiverInterface, SetupableTransportInterface, MessageCountAwareInterface, ResetInterface
{
private SerializerInterface $serializer;

Expand Down Expand Up @@ -54,6 +55,14 @@ public function reject(Envelope $envelope): void
$this->getReceiver()->reject($envelope);
}

public function keepalive(Envelope $envelope, ?int $seconds = null): void
{
$receiver = $this->getReceiver();
if ($receiver instanceof KeepaliveReceiverInterface) {
$receiver->keepalive($envelope, $seconds);
}
}

public function getMessageCount(): int
{
return $this->getReceiver()->getMessageCount();
Expand Down
17 changes: 17 additions & 0 deletions Transport/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,23 @@ public function delete(string $id): void
]);
}

/**
* @param int|null $seconds the minimum duration the message should be kept alive
*/
public function keepalive(string $id, ?int $seconds = null): void
{
$visibilityTimeout = $this->configuration['visibility_timeout'];
if (null !== $visibilityTimeout && null !== $seconds && $visibilityTimeout < $seconds) {
throw new TransportException(\sprintf('SQS visibility_timeout (%ds) cannot be smaller than the keepalive interval (%ds).', $visibilityTimeout, $seconds));
}

$this->client->changeMessageVisibility([
'QueueUrl' => $this->getQueueUrl(),
'ReceiptHandle' => $id,
'VisibilityTimeout' => $this->configuration['visibility_timeout'],
]);
}

public function getMessageCount(): int
{
$response = $this->client->getQueueAttributes([
Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"php": ">=8.2",
"async-aws/core": "^1.7",
"async-aws/sqs": "^1.0|^2.0",
"symfony/messenger": "^6.4|^7.0",
"symfony/messenger": "^7.2",
"symfony/service-contracts": "^2.5|^3",
"psr/log": "^1|^2|^3"
},
Expand Down

0 comments on commit cd8b6f9

Please sign in to comment.