From 2ebdabeee2c68e95ad7260b792b1c5c7fe36e886 Mon Sep 17 00:00:00 2001 From: jyamin Date: Thu, 6 Sep 2018 15:02:40 +0200 Subject: [PATCH] persistent queue --- lib/Queue/Driver/Amqp.php | 4 ++-- tests/Queue/Driver/Amqp.php | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/lib/Queue/Driver/Amqp.php b/lib/Queue/Driver/Amqp.php index 7b54f70..bacafa0 100644 --- a/lib/Queue/Driver/Amqp.php +++ b/lib/Queue/Driver/Amqp.php @@ -105,7 +105,7 @@ public function push(Channel $channel, $message, $messageType = null) $queue = $this->getQueue($channel); $serialize = $channel->getSettings()->getSerialize(); $msg = (!$message instanceof AMQPMessage) - ? new AMQPMessage($serialize ? serialize($message) : (string)$message) + ? new AMQPMessage($serialize ? serialize($message) : (string)$message, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)) : $message; $queue->basic_publish($msg, '', $channel->getName()); return true; @@ -134,7 +134,7 @@ public function pull(Channel $channel, $messageType = null) array($this, 'onPull') ); do { - $queue->wait(); + $queue->wait(null, false, 650); } while (is_null($this->currentMsg)); } else { $message = $queue->basic_get($name, true); diff --git a/tests/Queue/Driver/Amqp.php b/tests/Queue/Driver/Amqp.php index 2d1662a..4b4982f 100644 --- a/tests/Queue/Driver/Amqp.php +++ b/tests/Queue/Driver/Amqp.php @@ -187,7 +187,10 @@ public function testPush() public function testPushNotMessageAndSerialize() { - $amqpMessage = new \PhpAmqpLib\Message\AMQPMessage(serialize('test')); + $amqpMessage = new \PhpAmqpLib\Message\AMQPMessage( + serialize('test'), + array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) + ); $channelResource = $this->getMockBuilder('\Tests\Queue\Driver\AMQPChannelMockQueueDriverAmqp') ->setMethods(array('basic_publish')) ->getMock();