Skip to content

Commit

Permalink
Merge pull request #2 from tejerka/master
Browse files Browse the repository at this point in the history
fix PurgeQueues Command
  • Loading branch information
bobey committed Mar 1, 2016
2 parents 9334c87 + 306bded commit 8608ea1
Showing 1 changed file with 18 additions and 8 deletions.
26 changes: 18 additions & 8 deletions Command/PurgeQueuesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ protected function configure()
->setDescription('Purge queues')
->addOption('file', 'f', InputOption::VALUE_REQUIRED, 'File to read')
->addOption('force', null, InputOption::VALUE_NONE, 'If set, the task will not ask for confirm purge')
->addOption('priority', 'p', InputOption::VALUE_OPTIONAL, 'Get messages from specific priority')
->addArgument('queues', InputArgument::IS_ARRAY, 'queues to purge')
->setHelp(<<<HELP
This command purges queues.
Expand All @@ -52,9 +53,10 @@ protected function configure()
/**
* @param QueueClientInterface $queueClient
* @param string $fileName
* @param string|null $priority
* @return int
*/
private function purgeFromFile($queueClient, $fileName)
private function purgeFromFile($queueClient, $fileName, $priority)
{
try {
$processor = new Processor();
Expand All @@ -67,17 +69,17 @@ private function purgeFromFile($queueClient, $fileName)
return 1;
}
array_walk_recursive($processedConfiguration, 'ReputationVIP\Bundle\QueueClientBundle\QueueClientFactory::resolveParameters', $this->getContainer());
$this->output->write('Start delete queue.', Output::INFO);
$this->output->write('Start purge queue.', Output::INFO);
foreach ($processedConfiguration[QueuesConfiguration::QUEUES_NODE] as $queue) {
$queueName = $queue[QueuesConfiguration::QUEUE_NAME_NODE];
try {
$queueClient->deleteQueue($queueName);
$this->output->write('Queue ' . $queueName . ' deleted.', Output::INFO);
$queueClient->purgeQueue($queueName, $priority);
$this->output->write('Queue ' . $queueName . ' purged.', Output::INFO);
} catch (\Exception $e) {
$this->output->write($e->getMessage(), Output::WARNING);
}
}
$this->output->write('End delete queue.', Output::INFO);
$this->output->write('End purge queue.', Output::INFO);

return 0;
}
Expand All @@ -91,6 +93,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
{
$helper = $this->getHelper('question');
$force = $input->getOption('force') ? true : false;

try {
/** @var LoggerInterface $logger */
$logger = $this->getContainer()->get('logger');
Expand All @@ -106,14 +109,21 @@ protected function execute(InputInterface $input, OutputInterface $output)

return 1;
}
$priority = null;
if ($input->getOption('priority')) {
$priority = $input->getOption('priority');
if (!in_array($priority, $queueClient->getPriorityHandler()->getAll())) {
throw new \InvalidArgumentException('Priority "' . $priority . '" not found.');
}
}
if ($input->getOption('file')) {
$fileName = $input->getOption('file');
if (!($force || $helper->ask($input, $output, new ConfirmationQuestion('Purge queues in file "' . $fileName . '"?', false)))) {

return 0;
}

return $this->purgeFromFile($queueClient, $fileName);
return $this->purgeFromFile($queueClient, $fileName, $priority);
} else {
$queues = $input->getArgument('queues');
if (count($queues)) {
Expand All @@ -123,7 +133,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
}
foreach ($queues as $queue) {
try {
$queueClient->purgeQueue($queue);
$queueClient->purgeQueue($queue, $priority);
$this->output->write('Queue ' . $queue . ' purged.', Output::INFO);
} catch (\Exception $e) {
$this->output->write($e->getMessage(), Output::WARNING);
Expand All @@ -139,7 +149,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
return 0;
}

return $this->purgeFromFile($queueClient, $fileName);
return $this->purgeFromFile($queueClient, $fileName, $priority);
} catch (InvalidArgumentException $e) {
$this->output->write('No queue_client.queues_file parameter found.', Output::CRITICAL);

Expand Down

0 comments on commit 8608ea1

Please sign in to comment.