From 0563083697af9eb788db359a142549f9833aa90a Mon Sep 17 00:00:00 2001 From: enesaktay <21998826+enesaktay@users.noreply.github.com> Date: Wed, 25 Jul 2018 11:30:58 +0200 Subject: [PATCH 1/3] add a timeout function to the import --- README.md | 6 ++++++ src/Command/ImportDataFromMessageQueueCommand.php | 14 ++++++++++---- src/Exporter/ItemReaderInterface.php | 2 +- src/Exporter/MqItemReader.php | 4 ++-- 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 78fbfade..3aa82db1 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,12 @@ admin overview panel using the event hook system, ie. `admin/tax-categories/`. $ bin/console sylius:import-from-message-queue country ``` + - To make the importer wait 1s for messages to get into the message queue (default, does not wait) + + ```bash + $ bin/console sylius:import-from-message-queue country --timeout=1000 + ``` + - Export data of resources to file using `country` exporter ```bash $ bin/console sylius:export country my/countries/export/csv/file.csv --format=csv diff --git a/src/Command/ImportDataFromMessageQueueCommand.php b/src/Command/ImportDataFromMessageQueueCommand.php index 76affbcc..85ab2b7d 100644 --- a/src/Command/ImportDataFromMessageQueueCommand.php +++ b/src/Command/ImportDataFromMessageQueueCommand.php @@ -11,6 +11,7 @@ use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Style\SymfonyStyle; @@ -41,7 +42,9 @@ protected function configure(): void ->setDescription('Import data from message queue.') ->setDefinition([ new InputArgument('importer', InputArgument::OPTIONAL, 'The importer to use.'), - ]); + new InputOption('timeout', null, InputOption::VALUE_OPTIONAL, 'The time in ms the importer will wait for some input.', 0), + ]) + ; } /** @@ -57,6 +60,8 @@ protected function execute(InputInterface $input, OutputInterface $output): void return; } + $timeout = $input->getOption('timeout'); + // only accepts the format of json as messages $name = ImporterRegistry::buildServiceName($importer, 'json'); @@ -72,7 +77,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void /** @var SingleDataArrayImporterInterface $service */ $service = $this->importerRegistry->get($name); - $this->getImporterJsonDataFromMessageQueue($importer, $service, $output); + $this->importJsonDataFromMessageQueue($importer, $service, $output, (int) $timeout); $this->finishImport($name, $output); } @@ -93,12 +98,13 @@ private function finishImport(string $name, OutputInterface $output): void * @param string $importer * @param SingleDataArrayImporterInterface $service * @param OutputInterface $output + * @param int $timeout */ - private function getImporterJsonDataFromMessageQueue(string $importer, SingleDataArrayImporterInterface $service, OutputInterface $output): void + private function importJsonDataFromMessageQueue(string $importer, SingleDataArrayImporterInterface $service, OutputInterface $output, int $timeout): void { $mqItemReader = new MqItemReader(new RedisConnectionFactory(), $service); $mqItemReader->initQueue('sylius.export.queue.' . $importer); - $mqItemReader->readAndImport(); + $mqItemReader->readAndImport($timeout); $output->writeln('Imported: ' . $mqItemReader->getMessagesImportedCount()); $output->writeln('Skipped: ' . $mqItemReader->getMessagesSkippedCount()); } diff --git a/src/Exporter/ItemReaderInterface.php b/src/Exporter/ItemReaderInterface.php index bf0f0d82..fcfb2bb5 100644 --- a/src/Exporter/ItemReaderInterface.php +++ b/src/Exporter/ItemReaderInterface.php @@ -11,7 +11,7 @@ interface ItemReaderInterface */ public function initQueue(string $queueName): void; - public function readAndImport(): void; + public function readAndImport(int $timeout): void; /** * @return int diff --git a/src/Exporter/MqItemReader.php b/src/Exporter/MqItemReader.php index c8a79195..4b17fc62 100644 --- a/src/Exporter/MqItemReader.php +++ b/src/Exporter/MqItemReader.php @@ -66,7 +66,7 @@ public function initQueue(string $queueName): void $this->messagesSkippedCount = 0; } - public function readAndImport(): void + public function readAndImport(int $timeout = 0): void { /** @var RedisConsumer $consumer */ $consumer = $this->redisContext->createConsumer($this->queue); @@ -74,7 +74,7 @@ public function readAndImport(): void $dataTimestampArray = []; /** @var RedisMessage $message */ - while ($message = $consumer->receive()) { + while ($message = $consumer->receive($timeout)) { $dataArrayToImport = (array) json_decode($message->getBody()); $dataTimestamp = strtotime($message->getHeader('recordedOn')); From 83f2cb0dee03c04a141a129108645b9776be370a Mon Sep 17 00:00:00 2001 From: enesaktay <21998826+enesaktay@users.noreply.github.com> Date: Thu, 26 Jul 2018 10:00:21 +0200 Subject: [PATCH 2/3] cs fix --- src/Command/ImportDataFromMessageQueueCommand.php | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/Command/ImportDataFromMessageQueueCommand.php b/src/Command/ImportDataFromMessageQueueCommand.php index 2b6535cd..c98c7627 100644 --- a/src/Command/ImportDataFromMessageQueueCommand.php +++ b/src/Command/ImportDataFromMessageQueueCommand.php @@ -87,12 +87,6 @@ private function finishImport(string $name, OutputInterface $output): void $output->writeln($message); } - /** - * @param string $importer - * @param SingleDataArrayImporterInterface $service - * @param OutputInterface $output - * @param int $timeout - */ private function importJsonDataFromMessageQueue(string $importer, SingleDataArrayImporterInterface $service, OutputInterface $output, int $timeout): void { $mqItemReader = new MqItemReader(new RedisConnectionFactory(), $service); From c5b5e7382dcbab880972a280e4fc1451bd111c50 Mon Sep 17 00:00:00 2001 From: enesaktay <21998826+enesaktay@users.noreply.github.com> Date: Wed, 15 Aug 2018 13:20:04 +0200 Subject: [PATCH 3/3] add shortcut to option --- src/Command/ImportDataFromMessageQueueCommand.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Command/ImportDataFromMessageQueueCommand.php b/src/Command/ImportDataFromMessageQueueCommand.php index c98c7627..493480cb 100644 --- a/src/Command/ImportDataFromMessageQueueCommand.php +++ b/src/Command/ImportDataFromMessageQueueCommand.php @@ -39,7 +39,7 @@ protected function configure(): void ->setDescription('Import data from message queue.') ->setDefinition([ new InputArgument('importer', InputArgument::OPTIONAL, 'The importer to use.'), - new InputOption('timeout', null, InputOption::VALUE_OPTIONAL, 'The time in ms the importer will wait for some input.', 0), + new InputOption('timeout', 't', InputOption::VALUE_OPTIONAL, 'The time in ms the importer will wait for some input.', 0), ]) ; }