Skip to content

Commit

Permalink
fixed full dispatch issue
Browse files Browse the repository at this point in the history
  • Loading branch information
benwalch committed Mar 21, 2024
1 parent 8717673 commit 8948d47
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 69 deletions.
7 changes: 2 additions & 5 deletions src/EventSubscriber/DataProcessingEventSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,14 @@
namespace DynamicSearchBundle\EventSubscriber;

use DynamicSearchBundle\Builder\ContextDefinitionBuilderInterface;
use DynamicSearchBundle\Context\ContextDefinitionInterface;
use DynamicSearchBundle\DynamicSearchEvents;
use DynamicSearchBundle\Event\NewDataEvent;
use DynamicSearchBundle\Logger\LoggerInterface;
use DynamicSearchBundle\Processor\ResourceModificationProcessorInterface;
use DynamicSearchBundle\Provider\DataProviderInterface;
use DynamicSearchBundle\Queue\DataCollectorInterface;
use DynamicSearchBundle\Queue\Message\ProcessResourceMessage;
use DynamicSearchBundle\Queue\Message\QueueResourceMessage;
use DynamicSearchBundle\Validator\ResourceValidatorInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\MessageBusInterface;

class DataProcessingEventSubscriber implements EventSubscriberInterface
{
Expand All @@ -40,7 +37,7 @@ public function dispatchResourceModification(NewDataEvent $event): void
// data collector add to queue
$this->dataCollector->addToContextQueue(
$contextDefinition->getName(),
$contextDefinition->getContextDispatchType(),
ContextDefinitionInterface::CONTEXT_DISPATCH_TYPE_INDEX,
$event->getData(),
[
'resourceValidation' => [
Expand Down
9 changes: 5 additions & 4 deletions src/Manager/QueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ public function __construct(
public function clearQueue(): void
{
try {
$stmt = sprintf('SELECT COUNT(*) FROM %s', $this->tableName);
$affectedRows = $this->connection->executeQuery($stmt)->fetchFirstColumn();
$qb = $this->connection->createQueryBuilder();
$qb->select('COUNT(id)')->from($this->tableName);
$affectedRows = current($qb->executeQuery()->fetchFirstColumn());
$sql = $this->connection->getDatabasePlatform()->getTruncateTableSQL($this->tableName);
$this->connection->executeStatement($sql);
$this->logger->debug(sprintf('data queue cleared. Affected jobs: %d', $affectedRows), 'queue', 'maintenance');
$this->logger->debug(sprintf('data queue cleared. Affected jobs: %d', $affectedRows), 'queue', 'default');
} catch (\Throwable $e) {
$this->logger->error(sprintf('Error while clearing queue. Message was: %s', $e->getMessage()), 'queue', 'maintenance');
$this->logger->error(sprintf('Error while clearing queue. Message was: %s', $e->getMessage()), 'queue', 'default');
}
}
}
9 changes: 1 addition & 8 deletions src/Queue/MessageHandler/ProcessResourceHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace DynamicSearchBundle\Queue\MessageHandler;

use DynamicSearchBundle\Context\ContextDefinitionInterface;
use DynamicSearchBundle\Exception\SilentException;
use DynamicSearchBundle\Logger\LoggerInterface;
use DynamicSearchBundle\Queue\Message\ProcessResourceMessage;
Expand Down Expand Up @@ -61,13 +60,7 @@ private function process(array $jobs): void
}
try {

if (in_array($dispatchType, [ContextDefinitionInterface::CONTEXT_DISPATCH_TYPE_INSERT, ContextDefinitionInterface::CONTEXT_DISPATCH_TYPE_INDEX])) {
$this->resourceRunner->runInsertStack($message->contextName, $resourceMetas);
} elseif ($dispatchType === ContextDefinitionInterface::CONTEXT_DISPATCH_TYPE_UPDATE) {
$this->resourceRunner->runUpdateStack($message->contextName, $resourceMetas);
} elseif ($dispatchType === ContextDefinitionInterface::CONTEXT_DISPATCH_TYPE_DELETE) {
$this->resourceRunner->runDeleteStack($message->contextName, $resourceMetas);
}
$this->resourceRunner->runResourceStack($contextName, $dispatchType, $resourceMetas);

} catch (SilentException $e) {
// do not raise errors in silent exception. this error has been logged already in the right channel.
Expand Down
49 changes: 9 additions & 40 deletions src/Runner/ResourceRunner.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,21 @@
use DynamicSearchBundle\Normalizer\Resource\ResourceMetaInterface;
use DynamicSearchBundle\Processor\ResourceDeletionProcessorInterface;
use DynamicSearchBundle\Provider\DataProviderInterface;
use DynamicSearchBundle\Provider\IndexProviderInterface;

class ResourceRunner extends AbstractRunner implements ResourceRunnerInterface
{
public function __construct(protected ResourceDeletionProcessorInterface $resourceDeletionProcessor)
{
}

public function runInsert(string $contextName, ResourceMetaInterface $resourceMeta): void
public function runResourceStack(string $contextName, string $dispatchType, array $resourceMetaStack): void
{
$contextDefinition = $this->setupContextDefinition($contextName, ContextDefinitionInterface::CONTEXT_DISPATCH_TYPE_INSERT);

$providers = $this->setupProviders($contextDefinition, DataProviderInterface::PROVIDER_BEHAVIOUR_SINGLE_DISPATCH);

$this->warmUpProvider($contextDefinition, $providers);

/** @var DataProviderInterface $dataProvider */
$dataProvider = $providers['dataProvider'];

$this->callSaveMethod($contextDefinition, $dataProvider, 'provideSingle', [$contextDefinition, $resourceMeta], $providers);
$this->coolDownProvider($contextDefinition, $providers);
}
if ($dispatchType === ContextDefinitionInterface::CONTEXT_DISPATCH_TYPE_DELETE) {
$this->runDeleteStack($contextName, $resourceMetaStack);
return;
}

public function runInsertStack(string $contextName, array $resourceMetaStack): void
{
$contextDefinition = $this->setupContextDefinition($contextName, ContextDefinitionInterface::CONTEXT_DISPATCH_TYPE_INSERT);
$contextDefinition = $this->setupContextDefinition($contextName, $dispatchType);

$providers = $this->setupProviders($contextDefinition, DataProviderInterface::PROVIDER_BEHAVIOUR_SINGLE_DISPATCH);

Expand All @@ -45,26 +34,12 @@ public function runInsertStack(string $contextName, array $resourceMetaStack): v
}

$this->coolDownProvider($contextDefinition, $providers);
}

public function runUpdate(string $contextName, ResourceMetaInterface $resourceMeta): void
{
$contextDefinition = $this->setupContextDefinition($contextName, ContextDefinitionInterface::CONTEXT_DISPATCH_TYPE_UPDATE);

$providers = $this->setupProviders($contextDefinition, DataProviderInterface::PROVIDER_BEHAVIOUR_SINGLE_DISPATCH);

$this->warmUpProvider($contextDefinition, $providers);

/** @var DataProviderInterface $dataProvider */
$dataProvider = $providers['dataProvider'];

$this->callSaveMethod($contextDefinition, $dataProvider, 'provideSingle', [$contextDefinition, $resourceMeta], $providers);
$this->coolDownProvider($contextDefinition, $providers);
}

public function runUpdateStack(string $contextName, array $resourceMetaStack): void
public function runResource(string $contextName, string $dispatchType, ResourceMetaInterface $resourceMeta): void
{
$contextDefinition = $this->setupContextDefinition($contextName, ContextDefinitionInterface::CONTEXT_DISPATCH_TYPE_UPDATE);
$contextDefinition = $this->setupContextDefinition($contextName, $dispatchType);

$providers = $this->setupProviders($contextDefinition, DataProviderInterface::PROVIDER_BEHAVIOUR_SINGLE_DISPATCH);

Expand All @@ -73,10 +48,7 @@ public function runUpdateStack(string $contextName, array $resourceMetaStack): v
/** @var DataProviderInterface $dataProvider */
$dataProvider = $providers['dataProvider'];

foreach ($resourceMetaStack as $resourceMeta) {
$this->callSaveMethod($contextDefinition, $dataProvider, 'provideSingle', [$contextDefinition, $resourceMeta], $providers);
}

$this->callSaveMethod($contextDefinition, $dataProvider, 'provideSingle', [$contextDefinition, $resourceMeta], $providers);
$this->coolDownProvider($contextDefinition, $providers);
}

Expand All @@ -85,9 +57,6 @@ public function runDelete(string $contextName, ResourceMetaInterface $resourceMe
$contextDefinition = $this->setupContextDefinition($contextName, ContextDefinitionInterface::CONTEXT_DISPATCH_TYPE_DELETE);

$indexProvider = $this->setupIndexProvider($contextDefinition);
if (!$indexProvider instanceof IndexProviderInterface) {
return;
}

$this->warmUpProvider($contextDefinition, [$indexProvider]);
$this->callSaveMethod($contextDefinition, $this->resourceDeletionProcessor, 'processByResourceMeta', [$contextDefinition, $resourceMeta], [$indexProvider]);
Expand Down
14 changes: 2 additions & 12 deletions src/Runner/ResourceRunnerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,12 @@ interface ResourceRunnerInterface
/**
* @throws SilentException
*/
public function runInsert(string $contextName, ResourceMetaInterface $resourceMeta): void;
public function runResourceStack(string $contextName, string $dispatchType, array $resourceMetaStack): void;

/**
* @throws SilentException
*/
public function runInsertStack(string $contextName, array $resourceMetaStack): void;

/**
* @throws SilentException
*/
public function runUpdate(string $contextName, ResourceMetaInterface $resourceMeta): void;

/**
* @throws SilentException
*/
public function runUpdateStack(string $contextName, array $resourceMetaStack): void;
public function runResource(string $contextName, string $dispatchType, ResourceMetaInterface $resourceMeta): void;

/**
* @throws SilentException
Expand Down

0 comments on commit 8948d47

Please sign in to comment.