diff --git a/Classes/Command/JobCommandController.php b/Classes/Command/JobCommandController.php index ee10638..f3dc811 100644 --- a/Classes/Command/JobCommandController.php +++ b/Classes/Command/JobCommandController.php @@ -15,6 +15,7 @@ use Flowpack\JobQueue\Common\Job\JobManager; use Flowpack\JobQueue\Common\Queue\Message; use Flowpack\JobQueue\Common\Queue\QueueManager; +use Neos\Cache\Frontend\VariableFrontend; use Neos\Flow\Annotations as Flow; use Neos\Flow\Cli\CommandController; @@ -35,6 +36,12 @@ class JobCommandController extends CommandController */ protected $queueManager; + /** + * @Flow\Inject + * @var VariableFrontend + */ + protected $messageCache; + /** * Work on a queue and execute jobs * @@ -135,14 +142,19 @@ public function listCommand($queue, $limit = 1) * Execute one job * * @param string $queue - * @param string $serializedMessage An instance of Message serialized and base64-encoded + * @param string $messageCacheIdentifier An identifier to receive the message from the cache * @return void * @internal This command is mainly used by the JobManager and FakeQueue in order to execute commands in sub requests + * @throws JobQueueException */ - public function executeCommand($queue, $serializedMessage) + public function executeCommand($queue, $messageCacheIdentifier) { + if(!$this->messageCache->has($messageCacheIdentifier)) { + throw new JobQueueException(sprintf('No message with identifier %s was found in the message cache.', $messageCacheIdentifier), 1517868903); + } + /** @var Message $message */ - $message = unserialize(base64_decode($serializedMessage)); + $message = unserialize($this->messageCache->get($messageCacheIdentifier)); $queue = $this->queueManager->getQueue($queue); $this->jobManager->executeJobForMessage($queue, $message); } diff --git a/Classes/Job/JobManager.php b/Classes/Job/JobManager.php index 3af7c0d..14027b0 100644 --- a/Classes/Job/JobManager.php +++ b/Classes/Job/JobManager.php @@ -12,6 +12,7 @@ */ use Flowpack\JobQueue\Common\Queue\QueueInterface; +use Neos\Cache\Frontend\VariableFrontend; use Neos\Flow\Annotations as Flow; use Neos\Flow\Core\Booting\Scripts; use Neos\Flow\Property\PropertyMapper; @@ -38,6 +39,12 @@ class JobManager */ protected $propertyMapper; + /** + * @Flow\Inject + * @var VariableFrontend + */ + protected $messageCache; + /** * @Flow\InjectConfiguration * @var array @@ -92,7 +99,10 @@ public function waitAndExecute($queueName, $timeout = null) $queueSettings = $this->queueManager->getQueueSettings($queueName); try { if (isset($queueSettings['executeIsolated']) && $queueSettings['executeIsolated'] === true) { - Scripts::executeCommand('flowpack.jobqueue.common:job:execute', $this->flowSettings, false, [$queue->getName(), base64_encode(serialize($message))]); + $messageCacheIdentifier = sha1(serialize($message)); + $this->messageCache->set($messageCacheIdentifier, serialize($message)); + Scripts::executeCommand('flowpack.jobqueue.common:job:execute', $this->flowSettings, false, [$queue->getName(), $messageCacheIdentifier]); + $this->messageCache->remove($messageCacheIdentifier); } else { $this->executeJobForMessage($queue, $message); } @@ -235,4 +245,4 @@ protected function emitMessageFailed(QueueInterface $queue, Message $message, \E { } -} \ No newline at end of file +} diff --git a/Configuration/Caches.yaml b/Configuration/Caches.yaml new file mode 100644 index 0000000..d231cfd --- /dev/null +++ b/Configuration/Caches.yaml @@ -0,0 +1,3 @@ +FlowPackJobQueueCommon_MessageCache: + frontend: Neos\Cache\Frontend\VariableFrontend + persistent: true diff --git a/Configuration/Objects.yaml b/Configuration/Objects.yaml new file mode 100644 index 0000000..8fb3856 --- /dev/null +++ b/Configuration/Objects.yaml @@ -0,0 +1,19 @@ +Flowpack\JobQueue\Common\Job\JobManager: + properties: + messageCache: + object: + factoryObjectName: Neos\Flow\Cache\CacheManager + factoryMethodName: getCache + arguments: + 1: + value: FlowPackJobQueueCommon_MessageCache + +Flowpack\JobQueue\Common\Command\JobCommandController: + properties: + messageCache: + object: + factoryObjectName: Neos\Flow\Cache\CacheManager + factoryMethodName: getCache + arguments: + 1: + value: FlowPackJobQueueCommon_MessageCache