diff --git a/Command/CleanUpCommand.php b/Command/CleanUpCommand.php index d604e868..cc810731 100644 --- a/Command/CleanUpCommand.php +++ b/Command/CleanUpCommand.php @@ -7,12 +7,12 @@ use Doctrine\ORM\EntityManager; use JMS\JobQueueBundle\Entity\Job; use JMS\JobQueueBundle\Entity\Repository\JobManager; -use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; +use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; -class CleanUpCommand extends ContainerAwareCommand +class CleanUpCommand extends Command { protected static $defaultName = 'jms-job-queue:clean-up'; @@ -148,9 +148,7 @@ private function findExpiredJobs(EntityManager $em, InputInterface $input) ->setMaxResults(100) ->getResult(); }; - foreach ($this->whileResults($succeededJobs) as $job) { - yield $job; - } + yield from $this->whileResults( $succeededJobs ); $finishedJobs = function(array $excludedIds) use ($em, $input) { return $em->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.closedAt < :maxRetentionTime AND j.originalJob IS NULL AND j.id NOT IN (:excludedIds)") @@ -159,9 +157,7 @@ private function findExpiredJobs(EntityManager $em, InputInterface $input) ->setMaxResults(100) ->getResult(); }; - foreach ($this->whileResults($finishedJobs) as $job) { - yield $job; - } + yield from $this->whileResults( $finishedJobs ); $canceledJobs = function(array $excludedIds) use ($em, $input) { return $em->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.state = :canceled AND j.createdAt < :maxRetentionTime AND j.originalJob IS NULL AND j.id NOT IN (:excludedIds)") @@ -171,9 +167,7 @@ private function findExpiredJobs(EntityManager $em, InputInterface $input) ->setMaxResults(100) ->getResult(); }; - foreach ($this->whileResults($canceledJobs) as $job) { - yield $job; - } + yield from $this->whileResults( $canceledJobs ); } private function whileResults(callable $resultProducer) diff --git a/Command/MarkJobIncompleteCommand.php b/Command/MarkJobIncompleteCommand.php index 085dcea2..032b0cd8 100644 --- a/Command/MarkJobIncompleteCommand.php +++ b/Command/MarkJobIncompleteCommand.php @@ -5,13 +5,13 @@ use Doctrine\Common\Persistence\ManagerRegistry; use Doctrine\ORM\EntityManager; use JMS\JobQueueBundle\Entity\Job; +use Symfony\Component\Console\Command\Command; use JMS\JobQueueBundle\Entity\Repository\JobManager; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputArgument; -use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; -class MarkJobIncompleteCommand extends ContainerAwareCommand +class MarkJobIncompleteCommand extends Command { protected static $defaultName = 'jms-job-queue:mark-incomplete'; diff --git a/Command/RunCommand.php b/Command/RunCommand.php index fc6d5310..98898c1f 100644 --- a/Command/RunCommand.php +++ b/Command/RunCommand.php @@ -25,14 +25,15 @@ use JMS\JobQueueBundle\Event\StateChangeEvent; use JMS\JobQueueBundle\Exception\InvalidArgumentException; use Symfony\Bridge\Doctrine\ManagerRegistry; +use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; -use Symfony\Component\EventDispatcher\EventDispatcher; +use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Component\Process\Exception\ProcessFailedException; use Symfony\Component\Process\Process; -class RunCommand extends \Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand +class RunCommand extends Command { protected static $defaultName = 'jms-job-queue:run'; @@ -48,7 +49,10 @@ class RunCommand extends \Symfony\Bundle\FrameworkBundle\Command\ContainerAwareC /** @var ManagerRegistry */ private $registry; - /** @var EventDispatcher */ + /** @var JobManager */ + private $jobManager; + + /** @var EventDispatcherInterface */ private $dispatcher; /** @var array */ @@ -57,6 +61,23 @@ class RunCommand extends \Symfony\Bundle\FrameworkBundle\Command\ContainerAwareC /** @var bool */ private $shouldShutdown = false; + /** @var array */ + private $queueOptionsDefault; + + /** @var array */ + private $queueOptions; + + public function __construct(ManagerRegistry $managerRegistry, JobManager $jobManager, EventDispatcherInterface $dispatcher, array $queueOptionsDefault, array $queueOptions) + { + parent::__construct(); + + $this->registry = $managerRegistry; + $this->jobManager = $jobManager; + $this->dispatcher = $dispatcher; + $this->queueOptionsDefault = $queueOptionsDefault; + $this->queueOptions = $queueOptions; + } + protected function configure() { $this @@ -79,7 +100,7 @@ protected function execute(InputInterface $input, OutputInterface $output) } if ($maxRuntime > 600) { - $maxRuntime += mt_rand(-120, 120); + $maxRuntime += random_int(-120, 120); } $maxJobs = (integer) $input->getOption('max-concurrent-jobs'); @@ -110,8 +131,6 @@ protected function execute(InputInterface $input, OutputInterface $output) $this->env = $input->getOption('env'); $this->verbose = $input->getOption('verbose'); $this->output = $output; - $this->registry = $this->getContainer()->get('doctrine'); - $this->dispatcher = $this->getContainer()->get('event_dispatcher'); $this->getEntityManager()->getConnection()->getConfiguration()->setSQLLogger(null); if ($this->verbose) { @@ -127,8 +146,8 @@ protected function execute(InputInterface $input, OutputInterface $output) $idleTime, $maxJobs, $restrictedQueues, - $this->getContainer()->getParameter('jms_job_queue.queue_options_defaults'), - $this->getContainer()->getParameter('jms_job_queue.queue_options') + $this->queueOptionsDefault, + $this->queueOptions ); } @@ -161,7 +180,7 @@ private function runJobs($workerName, $startTime, $maxRuntime, $idleTime, $maxJo $this->checkRunningJobs(); $this->startJobs($workerName, $idleTime, $maxJobs, $restrictedQueues, $queueOptionsDefaults, $queueOptions); - $waitTimeInMs = mt_rand(500, 1000); + $waitTimeInMs = random_int(500, 1000); usleep($waitTimeInMs * 1E3); } @@ -194,7 +213,7 @@ private function startJobs($workerName, $idleTime, $maxJobs, array $restrictedQu { $excludedIds = array(); while (count($this->runningJobs) < $maxJobs) { - $pendingJob = $this->getJobManager()->findStartableJob( + $pendingJob = $this->jobManager->findStartableJob( $workerName, $excludedIds, $this->getExcludedQueues($queueOptionsDefaults, $queueOptions, $maxJobs), @@ -291,7 +310,7 @@ private function checkRunningJobs() $data['process']->stop(5); $this->output->writeln($data['job'].' terminated; maximum runtime exceeded.'); - $this->getJobManager()->closeJob($data['job'], Job::STATE_TERMINATED); + $this->jobManager->closeJob($data['job'], Job::STATE_TERMINATED); unset($this->runningJobs[$i]); continue; @@ -321,7 +340,7 @@ private function checkRunningJobs() $data['job']->setRuntime(time() - $data['start_time']); $newState = 0 === $data['process']->getExitCode() ? Job::STATE_FINISHED : Job::STATE_FAILED; - $this->getJobManager()->closeJob($data['job'], $newState); + $this->jobManager->closeJob($data['job'], $newState); unset($this->runningJobs[$i]); } @@ -335,7 +354,7 @@ private function startJob(Job $job) $newState = $event->getNewState(); if (Job::STATE_CANCELED === $newState) { - $this->getJobManager()->closeJob($job, Job::STATE_CANCELED); + $this->jobManager->closeJob($job, Job::STATE_CANCELED); return; } @@ -428,12 +447,4 @@ private function getEntityManager(): EntityManager { return /** @var EntityManager */ $this->registry->getManagerForClass('JMSJobQueueBundle:Job'); } - - /** - * @return JobManager - */ - private function getJobManager() - { - return $this->getContainer()->get('jms_job_queue.job_manager'); - } } diff --git a/Command/ScheduleCommand.php b/Command/ScheduleCommand.php index bc8d4ba4..27a99c95 100644 --- a/Command/ScheduleCommand.php +++ b/Command/ScheduleCommand.php @@ -45,7 +45,7 @@ protected function execute(InputInterface $input, OutputInterface $output) { $maxRuntime = $input->getOption('max-runtime'); if ($maxRuntime > 300) { - $maxRuntime += mt_rand(0, (integer)($input->getOption('max-runtime') * 0.05)); + $maxRuntime += random_int(0, (integer)($input->getOption('max-runtime') * 0.05)); } if ($maxRuntime <= 0) { throw new \RuntimeException('Max. runtime must be greater than zero.'); diff --git a/Resources/config/console.xml b/Resources/config/console.xml index 6ace5aa0..fed5a7ac 100644 --- a/Resources/config/console.xml +++ b/Resources/config/console.xml @@ -21,6 +21,11 @@ + + + + %jms_job_queue.queue_options_defaults% + %jms_job_queue.queue_options%