Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supporting protobuf message support #1329

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ services:
- PREDIS_DSN=redis+predis://redis
- PHPREDIS_DSN=redis+phpredis://redis
- GPS_DSN=gps:?projectId=mqdev&emulatorHost=http://google-pubsub:8085
- SQS_DSN=sqs:?key=key&secret=secret&region=us-east-1&endpoint=http://localstack:4576&version=latest
- SNS_DSN=sns:?key=key&secret=secret&region=us-east-1&endpoint=http://localstack:4575&version=latest
- SNSQS_DSN=snsqs:?key=key&secret=secret&region=us-east-1&sns_endpoint=http://localstack:4575&sqs_endpoint=http://localstack:4576&version=latest
- SQS_DSN=sqs:?key=key&secret=secret&region=us-east-1&endpoint=http://localstack:4566&version=latest
- SNS_DSN=sns:?key=key&secret=secret&region=us-east-1&endpoint=http://localstack:4566&version=latest
- SNSQS_DSN=snsqs:?key=key&secret=secret&region=us-east-1&sns_endpoint=http://localstack:4566&sqs_endpoint=http://localstack:4566&version=latest
- WAMP_DSN=wamp://thruway:9090
- REDIS_HOST=redis
- REDIS_PORT=6379
- AWS_SQS_KEY=key
- AWS_SQS_SECRET=secret
- AWS_SQS_REGION=us-east-1
- AWS_SQS_ENDPOINT=http://localstack:4576
- AWS_SQS_ENDPOINT=http://localstack:4566
- AWS_SQS_VERSION=latest
- BEANSTALKD_DSN=beanstalk://beanstalkd:11300
- GEARMAN_DSN=gearman://gearmand:4730
Expand Down Expand Up @@ -127,10 +127,9 @@ services:
- '9090:9090'

localstack:
image: 'localstack/localstack:0.8.10'
image: 'localstack/localstack:3.0.2'
ports:
- '4576:4576'
- '4575:4575'
- '4566:4566'
environment:
HOSTNAME_EXTERNAL: 'localstack'
SERVICES: 'sqs,sns'
Expand Down
2 changes: 1 addition & 1 deletion docker/bin/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ waitForService gearmand 4730 50
waitForService kafka 9092 50
waitForService mongo 27017 50
waitForService thruway 9090 50
waitForService localstack 4576 50
waitForService localstack 4566 50

php docker/bin/refresh-mysql-database.php || exit 1
php docker/bin/refresh-postgres-database.php || exit 1
Expand Down
6 changes: 4 additions & 2 deletions pkg/amqp-ext/Tests/Functional/AmqpCommonUseCasesTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,11 @@ public function testConsumerReceiveMessageFromTopicDirectly()
$this->amqpContext->declareTopic($topic);

$consumer = $this->amqpContext->createConsumer($topic);
//guard
// guard
$this->assertNull($consumer->receive(1000));

$message = $this->amqpContext->createMessage(__METHOD__);
$message->setDeliveryTag(0);

$producer = $this->amqpContext->createProducer();
$producer->send($topic, $message);
Expand All @@ -181,10 +182,11 @@ public function testConsumerReceiveMessageWithZeroTimeout()
$this->amqpContext->declareTopic($topic);

$consumer = $this->amqpContext->createConsumer($topic);
//guard
// guard
$this->assertNull($consumer->receive(1000));

$message = $this->amqpContext->createMessage(__METHOD__);
$message->setDeliveryTag(0);

$producer = $this->amqpContext->createProducer();
$producer->send($topic, $message);
Expand Down
15 changes: 9 additions & 6 deletions pkg/gps/GpsConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,22 @@ public function createContext(): Context
if ($this->config['lazy']) {
return new GpsContext(function () {
return $this->establishConnection();
});
}, [
'serilalizeToJson' => $this->config['serilalizeToJson'],
]);
}

return new GpsContext($this->establishConnection());
return new GpsContext($this->establishConnection(), [
'serilalizeToJson' => $this->config['serilalizeToJson'],
]);
}

private function parseDsn(string $dsn): array
{
$dsn = Dsn::parseFirst($dsn);

if ('gps' !== $dsn->getSchemeProtocol()) {
throw new \LogicException(sprintf(
'The given scheme protocol "%s" is not supported. It must be "gps"',
$dsn->getSchemeProtocol()
));
throw new \LogicException(sprintf('The given scheme protocol "%s" is not supported. It must be "gps"', $dsn->getSchemeProtocol()));
}

$emulatorHost = $dsn->getString('emulatorHost');
Expand All @@ -105,6 +106,7 @@ private function parseDsn(string $dsn): array
'emulatorHost' => $emulatorHost,
'hasEmulator' => $hasEmulator,
'lazy' => $dsn->getBool('lazy'),
'serilalizeToJson' => $dsn->getBool('serilalizeToJson'),
]), function ($value) { return null !== $value; });
}

Expand All @@ -121,6 +123,7 @@ private function defaultConfig(): array
{
return [
'lazy' => true,
'serilalizeToJson' => true,
];
}
}
9 changes: 8 additions & 1 deletion pkg/gps/GpsConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,14 @@ private function getSubscription(): Subscription

private function convertMessage(GoogleMessage $message): GpsMessage
{
$gpsMessage = GpsMessage::jsonUnserialize($message->data());
makasim marked this conversation as resolved.
Show resolved Hide resolved
$options = $this->context->getOptions();

if ($options['serilalizeToJson']) {
$gpsMessage = GpsMessage::jsonUnserialize($message->data());
} else {
$gpsMessage = new GpsMessage($message->data(), $message->attributes());
}

$gpsMessage->setNativeMessage($message);

return $gpsMessage;
Expand Down
17 changes: 7 additions & 10 deletions pkg/gps/GpsContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,7 @@ public function __construct($client, array $options = [])
} elseif (is_callable($client)) {
$this->clientFactory = $client;
} else {
throw new \InvalidArgumentException(sprintf(
'The $client argument must be either %s or callable that returns %s once called.',
PubSubClient::class,
PubSubClient::class
));
throw new \InvalidArgumentException(sprintf('The $client argument must be either %s or callable that returns %s once called.', PubSubClient::class, PubSubClient::class));
}
}

Expand Down Expand Up @@ -148,16 +144,17 @@ public function getClient(): PubSubClient
if (false == $this->client) {
$client = call_user_func($this->clientFactory);
if (false == $client instanceof PubSubClient) {
throw new \LogicException(sprintf(
'The factory must return instance of %s. It returned %s',
PubSubClient::class,
is_object($client) ? get_class($client) : gettype($client)
));
throw new \LogicException(sprintf('The factory must return instance of %s. It returned %s', PubSubClient::class, is_object($client) ? get_class($client) : gettype($client)));
}

$this->client = $client;
}

return $this->client;
}

public function getOptions(): array
{
return $this->options;
}
}
21 changes: 18 additions & 3 deletions pkg/gps/Tests/GpsConnectionFactoryConfigTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ public function testThrowIfDsnCouldNotBeParsed()

/**
* @dataProvider provideConfigs
*
* @param mixed $config
* @param mixed $expectedConfig
*/
public function testShouldParseConfigurationAsExpected($config, $expectedConfig)
{
Expand All @@ -58,20 +55,23 @@ public static function provideConfigs()
null,
[
'lazy' => true,
'serilalizeToJson' => true,
],
];

yield [
'gps:',
[
'lazy' => true,
'serilalizeToJson' => true,
],
];

yield [
[],
[
'lazy' => true,
'serilalizeToJson' => true,
],
];

Expand All @@ -83,6 +83,7 @@ public static function provideConfigs()
'emulatorHost' => 'http://google-pubsub:8085',
'hasEmulator' => true,
'lazy' => true,
'serilalizeToJson' => true,
],
];

Expand All @@ -94,6 +95,7 @@ public static function provideConfigs()
'emulatorHost' => 'http://google-pubsub:8085',
'hasEmulator' => true,
'lazy' => true,
'serilalizeToJson' => true,
],
];

Expand All @@ -104,6 +106,19 @@ public static function provideConfigs()
'projectId' => 'mqdev',
'emulatorHost' => 'http://Fgoogle-pubsub:8085',
'lazy' => false,
'serilalizeToJson' => true,
],
];

yield [
['dsn' => 'gps:?foo=fooVal&projectId=mqdev&emulatorHost=http%3A%2F%2Fgoogle-pubsub%3A8085&serilalizeToJson=false'],
[
'foo' => 'fooVal',
'projectId' => 'mqdev',
'emulatorHost' => 'http://google-pubsub:8085',
'hasEmulator' => true,
'lazy' => true,
'serilalizeToJson' => false,
],
];
}
Expand Down
98 changes: 98 additions & 0 deletions pkg/gps/Tests/GpsConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ public function testShouldReceiveMessageNoWait()
->willReturn($client)
;

$context
->expects($this->once())
->method('getOptions')
->willReturn(['serilalizeToJson' => true]);

$consumer = new GpsConsumer($context, new GpsQueue('queue-name'));

$message = $consumer->receiveNoWait();
Expand Down Expand Up @@ -171,6 +176,11 @@ public function testShouldReceiveMessage()
->willReturn($client)
;

$context
->expects($this->once())
->method('getOptions')
->willReturn(['serilalizeToJson' => true]);

$consumer = new GpsConsumer($context, new GpsQueue('queue-name'));

$message = $consumer->receive(12345);
Expand All @@ -179,6 +189,94 @@ public function testShouldReceiveMessage()
$this->assertSame('the body', $message->getBody());
}

public function testShouldReceiveMessageUnSerialize()
{
$message = new GpsMessage('the body');
$nativeMessage = new Message([
'data' => json_encode($message),
], []);

$subscription = $this->createSubscriptionMock();
$subscription
->expects($this->once())
->method('pull')
->with($this->identicalTo([
'maxMessages' => 1,
'requestTimeout' => 12.345,
]))
->willReturn([$nativeMessage]);

$client = $this->createPubSubClientMock();
$client
->expects($this->once())
->method('subscription')
->willReturn($subscription);

$context = $this->createContextMock();
$context
->expects($this->once())
->method('getClient')
->willReturn($client);
$context
->expects($this->once())
->method('getOptions')
->willReturn(['serilalizeToJson' => false]);

$consumer = new GpsConsumer($context, new GpsQueue('queue-name'));

$message = $consumer->receive(12345);

$this->assertInstanceOf(GpsMessage::class, $message);
$this->assertSame($nativeMessage->data(), $message->getBody());
$this->assertSame($nativeMessage->attributes(), $message->getProperties());
}

public function testShouldReceiveMessageUnSerializeWithAttributes()
{
$message = new GpsMessage('the body');
$nativeMessage = new Message([
'data' => json_encode($message),
'attributes' => [
'foo' => 'fooVal',
'bar' => 'barVal',
],
], []);

$subscription = $this->createSubscriptionMock();
$subscription
->expects($this->once())
->method('pull')
->with($this->identicalTo([
'maxMessages' => 1,
'requestTimeout' => 12.345,
]))
->willReturn([$nativeMessage]);

$client = $this->createPubSubClientMock();
$client
->expects($this->once())
->method('subscription')
->willReturn($subscription);

$context = $this->createContextMock();
$context
->expects($this->once())
->method('getClient')
->willReturn($client);
$context
->expects($this->once())
->method('getOptions')
->willReturn(['serilalizeToJson' => false]);

$consumer = new GpsConsumer($context, new GpsQueue('queue-name'));

$message = $consumer->receive(12345);

$this->assertInstanceOf(GpsMessage::class, $message);
$this->assertSame($nativeMessage->data(), $message->getBody());
$this->assertSame($nativeMessage->attributes(), $message->getProperties());
}

/**
* @return \PHPUnit\Framework\MockObject\MockObject|GpsContext
*/
Expand Down
7 changes: 7 additions & 0 deletions pkg/gps/Tests/GpsContextTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ public function testCreateConsumerShouldThrowExceptionIfInvalidDestination()
$context->createConsumer(new GpsTopic(''));
}

public function testShouldReturnOptions()
{
$context = new GpsContext($this->createPubSubClientMock(), ['foo' => 'fooVal']);

$this->assertSame(['ackDeadlineSeconds' => 10, 'foo' => 'fooVal'], $context->getOptions());
}

/**
* @return PubSubClient|\PHPUnit\Framework\MockObject\MockObject|PubSubClient
*/
Expand Down
2 changes: 1 addition & 1 deletion pkg/gps/Tests/GpsMessageTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public function testCouldBeUnserializedFromJson()

$json = json_encode($message);

//guard
// guard
$this->assertNotEmpty($json);

$unserializedMessage = GpsMessage::jsonUnserialize($json);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ protected function createContext()

/**
* @param GpsContext $context
* @param mixed $queueName
*/
protected function createQueue(Context $context, $queueName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ protected function createContext()

/**
* @param GpsContext $context
* @param mixed $queueName
*/
protected function createQueue(Context $context, $queueName)
{
Expand Down
Loading