Skip to content

Commit

Permalink
AKM-32: Move data from messages/RMQ to jobs/Database (#63)
Browse files Browse the repository at this point in the history
  • Loading branch information
dxops authored Mar 6, 2022
1 parent ad78ab8 commit b582797
Show file tree
Hide file tree
Showing 18 changed files with 332 additions and 174 deletions.
20 changes: 20 additions & 0 deletions .github/workflows/php.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
name: PHP Composer

on:
push:
branches: [ master 4.2 4.1 3.1 1.6 ]
pull_request:

jobs:
build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2

- name: Install dependencies
run: php -r "copy('https://cs.symfony.com/download/php-cs-fixer-v3.phar', 'php-cs-fixer.phar');"

- name: Run php-cs-fixer
run: php php-cs-fixer.phar fix --dry-run --config=.php_cs.php --cache-file=.php_cs.cache --verbose --show-progress=dots --diff --allow-risky=yes
22 changes: 13 additions & 9 deletions .php_cs.php
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
<?php

$finder = PhpCsFixer\Finder::create()
$finder = \PhpCsFixer\Finder::create()
->in(__DIR__);

return PhpCsFixer\Config::create()
return (new \PhpCsFixer\Config())
->setFinder($finder)
->setRules(
[
// generic PSRs
'@PSR1' => true,
'@PSR2' => true,
'psr0' => true,
'psr4' => true,
'@PSR12' => true,
'@PSR12:risky' => true,
'psr_autoloading' => true,

// imports
'ordered_imports' => true,
Expand Down Expand Up @@ -39,11 +40,13 @@
'escape_implicit_backslashes' => false,

// PHP
'@PHP71Migration' => true,
'@PHP71Migration:risky' => true,
'@PHP73Migration' => true,
'@PHP74Migration' => true,
'@PHP74Migration:risky' => true,

'use_arrow_functions' => false,
'get_class_to_class_keyword' => false,
'void_return' => false,
'visibility_required' => false,
'list_syntax' => ['syntax' => 'long'],
'declare_strict_types' => false,

Expand All @@ -64,11 +67,11 @@
'@Symfony:risky' => true,
'phpdoc_types_order' => false,
'phpdoc_separation' => false,
'phpdoc_inline_tag' => false,
'visibility_required' => ['elements' => ['property', 'method']],
'types_spaces' => false,
'native_function_invocation' => false,
'concat_space' => ['spacing' => 'one'],
'single_space_after_construct' => false,
'trailing_comma_in_multiline_array' => false,
'self_accessor' => false,
'yoda_style' => false,
'phpdoc_summary' => false,
Expand All @@ -87,6 +90,7 @@
'ternary_operator_spaces' => false,
'phpdoc_no_useless_inheritdoc' => false,
'class_definition' => false,
'string_length_to_empty' => false,
]
)
->setRiskyAllowed(true);
13 changes: 0 additions & 13 deletions .travis.yml

This file was deleted.

24 changes: 23 additions & 1 deletion Async/ImportProductProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
namespace Oro\Bundle\AkeneoBundle\Async;

use Doctrine\ORM\EntityManagerInterface;
use Oro\Bundle\AkeneoBundle\Tools\CacheProviderTrait;
use Oro\Bundle\EntityBundle\ORM\DoctrineHelper;
use Oro\Bundle\IntegrationBundle\Authentication\Token\IntegrationTokenAwareTrait;
use Oro\Bundle\IntegrationBundle\Entity\Channel as Integration;
use Oro\Bundle\IntegrationBundle\Entity\FieldsChanges;
use Oro\Bundle\IntegrationBundle\Provider\SyncProcessorRegistry;
use Oro\Bundle\MessageQueueBundle\Entity\Job;
use Oro\Component\MessageQueue\Client\TopicSubscriberInterface;
use Oro\Component\MessageQueue\Consumption\MessageProcessorInterface;
use Oro\Component\MessageQueue\Job\Job;
use Oro\Component\MessageQueue\Job\JobRunner;
use Oro\Component\MessageQueue\Transport\MessageInterface;
use Oro\Component\MessageQueue\Transport\SessionInterface;
Expand All @@ -19,6 +21,7 @@

class ImportProductProcessor implements MessageProcessorInterface, TopicSubscriberInterface
{
use CacheProviderTrait;
use IntegrationTokenAwareTrait;

/** @var DoctrineHelper */
Expand Down Expand Up @@ -97,12 +100,31 @@ public function process(MessageInterface $message, SessionInterface $session)
function (JobRunner $jobRunner, Job $child) use ($integration, $body) {
$this->doctrineHelper->refreshIncludingUnitializedRelations($integration);
$processor = $this->syncProcessorRegistry->getProcessorForIntegration($integration);

$em = $this->doctrineHelper->getEntityManager(FieldsChanges::class);
/** @var FieldsChanges $fieldsChanges */
$fieldsChanges = $em
->getRepository(FieldsChanges::class)
->findOneBy(['entityId' => $child->getId(), 'entityClass' => Job::class]);

if (!$fieldsChanges) {
$this->logger->error(
sprintf('Source data from Akeneo not found for job: %s', $child->getId())
);

return false;
}

$this->cacheProvider->save('akeneo', $fieldsChanges->getChangedFields());

$status = $processor->process(
$integration,
$body['connector'] ?? null,
$body['connector_parameters'] ?? []
);

$em->clear(FieldsChanges::class);

return $status;
}
);
Expand Down
104 changes: 104 additions & 0 deletions Command/CleanupCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
<?php

namespace Oro\Bundle\AkeneoBundle\Command;

use Oro\Bundle\BatchBundle\ORM\Query\BufferedIdentityQueryResultIterator;
use Oro\Bundle\CronBundle\Command\CronCommandInterface;
use Oro\Bundle\EntityBundle\ORM\DoctrineHelper;
use Oro\Bundle\IntegrationBundle\Entity\FieldsChanges;
use Oro\Bundle\MessageQueueBundle\Entity\Job;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

/**
* Clears old records from oro_integration_fields_changes table before repack.
*/
class CleanupCommand extends Command implements CronCommandInterface
{
/** @var string */
protected static $defaultName = 'oro:akeneo:cleanup';

/** @var DoctrineHelper */
private $doctrineHelper;

public function __construct(DoctrineHelper $doctrineHelper)
{
$this->doctrineHelper = $doctrineHelper;
parent::__construct();
}

public function isActive()
{
return true;
}

public function getDefaultDefinition()
{
return '0 2 * * 6';
}

public function configure()
{
$this
->setDescription('Clears old records from oro_integration_fields_changes table.')
->setHelp(
<<<'HELP'
The <info>%command.name%</info> command clears fields changes for complete job records
from <comment>oro_integration_fields_changes</comment> table.
<info>php %command.full_name%</info>
HELP
);
}

protected function execute(InputInterface $input, OutputInterface $output)
{
$output->writeln(sprintf(
'<comment>Number of fields changes that has been deleted:</comment> %d',
$this->deleteRecords()
));

$output->writeln('<info>Fields changes cleanup complete</info>');
}

private function deleteRecords(): int
{
$qb = $this->doctrineHelper
->getEntityManagerForClass(FieldsChanges::class)
->getRepository(FieldsChanges::class)
->createQueryBuilder('fc');

$qb
->delete(FieldsChanges::class, 'fc')
->where($qb->expr()->eq('fc.entityClass', ':class'))
->setParameter('class', Job::class)
->andWhere($qb->expr()->in('fc.entityId', ':ids'));

$jqb = $this->doctrineHelper
->getEntityManagerForClass(Job::class)
->getRepository(Job::class)
->createQueryBuilder('j');

$jqb
->select('j.id')
->where($jqb->expr()->in('j.status', ':statuses'))
->setParameter('statuses', [Job::STATUS_SUCCESS, Job::STATUS_CANCELLED, Job::STATUS_FAILED, Job::STATUS_STALE])
->orderBy($jqb->expr()->desc('j.id'));

$iterator = new BufferedIdentityQueryResultIterator($jqb->getQuery());

$result = 0;
$iterator->setPageLoadedCallback(function (array $rows) use ($qb, &$result): array {
$ids = array_column($rows, 'id');

$result = $result + $qb->setParameter('ids', $ids)->getQuery()->execute();

return $ids;
});

iterator_to_array($iterator);

return $result;
}
}
1 change: 1 addition & 0 deletions DependencyInjection/OroAkeneoExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class OroAkeneoExtension extends Extension
public function load(array $configs, ContainerBuilder $container)
{
$loader = new Loader\YamlFileLoader($container, new FileLocator(__DIR__ . '/../Resources/config'));
$loader->load('commands.yml');
$loader->load('integration.yml');
$loader->load('importexport.yml');
$loader->load('services.yml');
Expand Down
7 changes: 3 additions & 4 deletions ImportExport/Reader/ProductImageReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

use Oro\Bundle\AkeneoBundle\ImportExport\AkeneoIntegrationTrait;
use Oro\Bundle\AkeneoBundle\Integration\AkeneoFileManager;
use Oro\Bundle\AkeneoBundle\Tools\CacheProviderTrait;
use Oro\Bundle\EntityBundle\ORM\DoctrineHelper;
use Oro\Bundle\ImportExportBundle\Context\ContextInterface;

class ProductImageReader extends IteratorBasedReader
{
use AkeneoIntegrationTrait;
use CacheProviderTrait;

/** @var array */
private $attributesImageFilter = [];
Expand Down Expand Up @@ -45,10 +47,7 @@ protected function initializeFromContext(ContextInterface $context)

$this->initAttributesImageList();

$items = $this->stepExecution
->getJobExecution()
->getExecutionContext()
->get('items') ?? [];
$items = $this->cacheProvider->fetch('akeneo')['items'] ?? [];

if (!empty($items)) {
$this->processImagesDownload($items, $context);
Expand Down
8 changes: 4 additions & 4 deletions ImportExport/Reader/ProductPriceReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@

namespace Oro\Bundle\AkeneoBundle\ImportExport\Reader;

use Oro\Bundle\AkeneoBundle\Tools\CacheProviderTrait;
use Oro\Bundle\ImportExportBundle\Context\ContextInterface;

class ProductPriceReader extends IteratorBasedReader
{
use CacheProviderTrait;

protected function initializeFromContext(ContextInterface $context)
{
parent::initializeFromContext($context);

$items = $this->stepExecution
->getJobExecution()
->getExecutionContext()
->get('items') ?? [];
$items = $this->cacheProvider->fetch('akeneo')['items'] ?? [];

$prices = [];

Expand Down
8 changes: 4 additions & 4 deletions ImportExport/Reader/ProductReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
namespace Oro\Bundle\AkeneoBundle\ImportExport\Reader;

use Oro\Bundle\AkeneoBundle\Integration\AkeneoFileManager;
use Oro\Bundle\AkeneoBundle\Tools\CacheProviderTrait;
use Oro\Bundle\ImportExportBundle\Context\ContextInterface;

class ProductReader extends IteratorBasedReader
{
use CacheProviderTrait;

/** @var AkeneoFileManager */
private $akeneoFileManager;

Expand All @@ -19,10 +22,7 @@ protected function initializeFromContext(ContextInterface $context)
{
parent::initializeFromContext($context);

$items = $this->stepExecution
->getJobExecution()
->getExecutionContext()
->get('items') ?? [];
$items = $this->cacheProvider->fetch('akeneo')['items'] ?? [];

if (!empty($items)) {
$this->processFileTypeDownload($items, $context);
Expand Down
8 changes: 4 additions & 4 deletions ImportExport/Reader/ProductVariantReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@

namespace Oro\Bundle\AkeneoBundle\ImportExport\Reader;

use Oro\Bundle\AkeneoBundle\Tools\CacheProviderTrait;
use Oro\Bundle\ImportExportBundle\Context\ContextInterface;

class ProductVariantReader extends IteratorBasedReader
{
use CacheProviderTrait;

protected function initializeFromContext(ContextInterface $context)
{
parent::initializeFromContext($context);

$variants = $this->stepExecution
->getJobExecution()
->getExecutionContext()
->get('variants') ?? [];
$variants = $this->cacheProvider->fetch('akeneo')['variants'] ?? [];

$this->stepExecution->setReadCount(count($variants));

Expand Down
Loading

0 comments on commit b582797

Please sign in to comment.