Skip to content

Commit

Permalink
FEATURE: Hand message over via cache
Browse files Browse the repository at this point in the history
  • Loading branch information
daniellienert committed Feb 8, 2018
1 parent a89cb33 commit 97eff7a
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 5 deletions.
18 changes: 15 additions & 3 deletions Classes/Command/JobCommandController.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Flowpack\JobQueue\Common\Job\JobManager;
use Flowpack\JobQueue\Common\Queue\Message;
use Flowpack\JobQueue\Common\Queue\QueueManager;
use Neos\Cache\Frontend\VariableFrontend;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\Cli\CommandController;

Expand All @@ -35,6 +36,12 @@ class JobCommandController extends CommandController
*/
protected $queueManager;

/**
* @Flow\Inject
* @var VariableFrontend
*/
protected $messageCache;

/**
* Work on a queue and execute jobs
*
Expand Down Expand Up @@ -135,14 +142,19 @@ public function listCommand($queue, $limit = 1)
* Execute one job
*
* @param string $queue
* @param string $serializedMessage An instance of Message serialized and base64-encoded
* @param string $messageCacheIdentifier An identifier to receive the message from the cache
* @return void
* @internal This command is mainly used by the JobManager and FakeQueue in order to execute commands in sub requests
* @throws JobQueueException
*/
public function executeCommand($queue, $serializedMessage)
public function executeCommand($queue, $messageCacheIdentifier)
{
if(!$this->messageCache->has($messageCacheIdentifier)) {
throw new JobQueueException(sprintf('No message with identifier %s was found in the message cache.', $messageCacheIdentifier), 1517868903);
}

/** @var Message $message */
$message = unserialize(base64_decode($serializedMessage));
$message = unserialize($this->messageCache->get($messageCacheIdentifier));
$queue = $this->queueManager->getQueue($queue);
$this->jobManager->executeJobForMessage($queue, $message);
}
Expand Down
14 changes: 12 additions & 2 deletions Classes/Job/JobManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/

use Flowpack\JobQueue\Common\Queue\QueueInterface;
use Neos\Cache\Frontend\VariableFrontend;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\Core\Booting\Scripts;
use Neos\Flow\Property\PropertyMapper;
Expand All @@ -38,6 +39,12 @@ class JobManager
*/
protected $propertyMapper;

/**
* @Flow\Inject
* @var VariableFrontend
*/
protected $messageCache;

/**
* @Flow\InjectConfiguration
* @var array
Expand Down Expand Up @@ -92,7 +99,10 @@ public function waitAndExecute($queueName, $timeout = null)
$queueSettings = $this->queueManager->getQueueSettings($queueName);
try {
if (isset($queueSettings['executeIsolated']) && $queueSettings['executeIsolated'] === true) {
Scripts::executeCommand('flowpack.jobqueue.common:job:execute', $this->flowSettings, false, [$queue->getName(), base64_encode(serialize($message))]);
$messageCacheIdentifier = sha1(serialize($message));
$this->messageCache->set($messageCacheIdentifier, serialize($message));
Scripts::executeCommand('flowpack.jobqueue.common:job:execute', $this->flowSettings, false, [$queue->getName(), $messageCacheIdentifier]);
$this->messageCache->remove($messageCacheIdentifier);
} else {
$this->executeJobForMessage($queue, $message);
}
Expand Down Expand Up @@ -235,4 +245,4 @@ protected function emitMessageFailed(QueueInterface $queue, Message $message, \E
{
}

}
}
3 changes: 3 additions & 0 deletions Configuration/Caches.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
FlowPackJobQueueCommon_MessageCache:
frontend: Neos\Cache\Frontend\VariableFrontend
persistent: true
19 changes: 19 additions & 0 deletions Configuration/Objects.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Flowpack\JobQueue\Common\Job\JobManager:
properties:
messageCache:
object:
factoryObjectName: Neos\Flow\Cache\CacheManager
factoryMethodName: getCache
arguments:
1:
value: FlowPackJobQueueCommon_MessageCache

Flowpack\JobQueue\Common\Command\JobCommandController:
properties:
messageCache:
object:
factoryObjectName: Neos\Flow\Cache\CacheManager
factoryMethodName: getCache
arguments:
1:
value: FlowPackJobQueueCommon_MessageCache

0 comments on commit 97eff7a

Please sign in to comment.