diff --git a/README.md b/README.md index 78fbfade..3aa82db1 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,12 @@ admin overview panel using the event hook system, ie. `admin/tax-categories/`. $ bin/console sylius:import-from-message-queue country ``` + - To make the importer wait 1s for messages to get into the message queue (default, does not wait) + + ```bash + $ bin/console sylius:import-from-message-queue country --timeout=1000 + ``` + - Export data of resources to file using `country` exporter ```bash $ bin/console sylius:export country my/countries/export/csv/file.csv --format=csv diff --git a/src/Command/ImportDataFromMessageQueueCommand.php b/src/Command/ImportDataFromMessageQueueCommand.php index aa34520d..493480cb 100644 --- a/src/Command/ImportDataFromMessageQueueCommand.php +++ b/src/Command/ImportDataFromMessageQueueCommand.php @@ -11,6 +11,7 @@ use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Style\SymfonyStyle; @@ -38,7 +39,9 @@ protected function configure(): void ->setDescription('Import data from message queue.') ->setDefinition([ new InputArgument('importer', InputArgument::OPTIONAL, 'The importer to use.'), - ]); + new InputOption('timeout', 't', InputOption::VALUE_OPTIONAL, 'The time in ms the importer will wait for some input.', 0), + ]) + ; } /** @@ -54,6 +57,8 @@ protected function execute(InputInterface $input, OutputInterface $output): void return; } + $timeout = $input->getOption('timeout'); + // only accepts the format of json as messages $name = ImporterRegistry::buildServiceName($importer, 'json'); @@ -69,7 +74,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void /** @var SingleDataArrayImporterInterface $service */ $service = $this->importerRegistry->get($name); - $this->getImporterJsonDataFromMessageQueue($importer, $service, $output); + $this->importJsonDataFromMessageQueue($importer, $service, $output, (int) $timeout); $this->finishImport($name, $output); } @@ -82,11 +87,11 @@ private function finishImport(string $name, OutputInterface $output): void $output->writeln($message); } - private function getImporterJsonDataFromMessageQueue(string $importer, SingleDataArrayImporterInterface $service, OutputInterface $output): void + private function importJsonDataFromMessageQueue(string $importer, SingleDataArrayImporterInterface $service, OutputInterface $output, int $timeout): void { $mqItemReader = new MqItemReader(new RedisConnectionFactory(), $service); $mqItemReader->initQueue('sylius.export.queue.' . $importer); - $mqItemReader->readAndImport(); + $mqItemReader->readAndImport($timeout); $output->writeln('Imported: ' . $mqItemReader->getMessagesImportedCount()); $output->writeln('Skipped: ' . $mqItemReader->getMessagesSkippedCount()); } diff --git a/src/Exporter/ItemReaderInterface.php b/src/Exporter/ItemReaderInterface.php index 5099dd2e..9440ae11 100644 --- a/src/Exporter/ItemReaderInterface.php +++ b/src/Exporter/ItemReaderInterface.php @@ -8,7 +8,7 @@ interface ItemReaderInterface { public function initQueue(string $queueName): void; - public function readAndImport(): void; + public function readAndImport(int $timeout): void; public function getMessagesImportedCount(): int; diff --git a/src/Exporter/MqItemReader.php b/src/Exporter/MqItemReader.php index e9dc0011..8a883674 100644 --- a/src/Exporter/MqItemReader.php +++ b/src/Exporter/MqItemReader.php @@ -60,13 +60,13 @@ public function initQueue(string $queueName): void $this->messagesSkippedCount = 0; } - public function readAndImport(): void + public function readAndImport(int $timeout = 0): void { /** @var RedisConsumer $consumer */ $consumer = $this->redisContext->createConsumer($this->queue); /** @var RedisMessage $message */ - while ($message = $consumer->receive()) { + while ($message = $consumer->receive($timeout)) { $dataArrayToImport = (array) json_decode($message->getBody()); $this->service->importSingleDataArrayWithoutResult($dataArrayToImport);