diff --git a/README.md b/README.md index 6631367c..4d3e81de 100644 --- a/README.md +++ b/README.md @@ -131,6 +131,12 @@ admin overview panel using the event hook system, ie. `admin/tax-categories/`. $ bin/console sylius:import-from-message-queue country ``` + - To make the importer wait 1s for messages to get into the message queue (default, does not wait) + + ```bash + $ bin/console sylius:import-from-message-queue country --timeout=1000 + ``` + - Export data of resources to file using `country` exporter ```bash $ bin/console sylius:export country my/countries/export/csv/file.csv --format=csv diff --git a/src/Command/ImportDataFromMessageQueueCommand.php b/src/Command/ImportDataFromMessageQueueCommand.php index a7cdc766..a769275e 100644 --- a/src/Command/ImportDataFromMessageQueueCommand.php +++ b/src/Command/ImportDataFromMessageQueueCommand.php @@ -10,6 +10,7 @@ use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Style\SymfonyStyle; use Symfony\Component\DependencyInjection\ContainerAwareTrait; @@ -40,7 +41,9 @@ protected function configure(): void ->setDescription('Import data from message queue.') ->setDefinition([ new InputArgument('importer', InputArgument::OPTIONAL, 'The importer to use.'), - ]); + new InputOption('timeout', 't', InputOption::VALUE_OPTIONAL, 'The time in ms the importer will wait for some input.', 0), + ]) + ; } /** @@ -56,6 +59,8 @@ protected function execute(InputInterface $input, OutputInterface $output): void return; } + $timeout = $input->getOption('timeout'); + // only accepts the format of json as messages $name = ImporterRegistry::buildServiceName($importer, 'json'); @@ -73,7 +78,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void /** @var ItemReaderInterface $mqItemReader */ $mqItemReader = $this->container->get('sylius.message_queue_reader'); - $this->getImporterJsonDataFromMessageQueue($mqItemReader, $importer, $service, $output); + $this->importJsonDataFromMessageQueue($mqItemReader, $importer, $service, $output, (int) $timeout); $this->finishImport($name, $output); } @@ -86,10 +91,10 @@ private function finishImport(string $name, OutputInterface $output): void $output->writeln($message); } - private function getImporterJsonDataFromMessageQueue(ItemReaderInterface $mqItemReader, $importer, SingleDataArrayImporterInterface $service, OutputInterface $output): void + private function importJsonDataFromMessageQueue(ItemReaderInterface $mqItemReader, $importer, SingleDataArrayImporterInterface $service, OutputInterface $output, int $timeout): void { $mqItemReader->initQueue('sylius.export.queue.' . $importer); - $mqItemReader->readAndImport($service); + $mqItemReader->readAndImport($service, $timeout); $output->writeln('Imported: ' . $mqItemReader->getMessagesImportedCount()); $output->writeln('Skipped: ' . $mqItemReader->getMessagesSkippedCount()); } diff --git a/src/Importer/ItemReaderInterface.php b/src/Importer/ItemReaderInterface.php index 2a350d5c..f7e02f6b 100644 --- a/src/Importer/ItemReaderInterface.php +++ b/src/Importer/ItemReaderInterface.php @@ -8,7 +8,7 @@ interface ItemReaderInterface { public function initQueue(string $queueName): void; - public function readAndImport(SingleDataArrayImporterInterface $service): void; + public function readAndImport(SingleDataArrayImporterInterface $service, int $timeout): void; public function getMessagesImportedCount(): int; diff --git a/src/Importer/MqItemReader.php b/src/Importer/MqItemReader.php index 0aa1dfd8..a0820da3 100644 --- a/src/Importer/MqItemReader.php +++ b/src/Importer/MqItemReader.php @@ -50,13 +50,13 @@ public function initQueue(string $queueName): void $this->messagesSkippedCount = 0; } - public function readAndImport(SingleDataArrayImporterInterface $service): void + public function readAndImport(SingleDataArrayImporterInterface $service, int $timeout = 0): void { /** @var PsrConsumer $consumer */ $consumer = $this->context->createConsumer($this->queue); /** @var PsrMessage $message */ - while ($message = $consumer->receive()) { + while ($message = $consumer->receive($timeout)) { $dataArrayToImport = (array) json_decode($message->getBody()); $service->importSingleDataArrayWithoutResult($dataArrayToImport);