Skip to content

Commit

Permalink
Merge pull request #138 from enesaktay/keep-import-to-mq-running
Browse files Browse the repository at this point in the history
Add a timeout function to the import-from-message-queue command
  • Loading branch information
lsmith77 authored Aug 15, 2018
2 parents 066688b + c5b5e73 commit d9252ad
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 7 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ admin overview panel using the event hook system, ie. `admin/tax-categories/`.
$ bin/console sylius:import-from-message-queue country
```

- To make the importer wait 1s for messages to get into the message queue (default, does not wait)

```bash
$ bin/console sylius:import-from-message-queue country --timeout=1000
```

- Export data of resources to file using `country` exporter
```bash
$ bin/console sylius:export country my/countries/export/csv/file.csv --format=csv
Expand Down
13 changes: 9 additions & 4 deletions src/Command/ImportDataFromMessageQueueCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;

Expand Down Expand Up @@ -38,7 +39,9 @@ protected function configure(): void
->setDescription('Import data from message queue.')
->setDefinition([
new InputArgument('importer', InputArgument::OPTIONAL, 'The importer to use.'),
]);
new InputOption('timeout', 't', InputOption::VALUE_OPTIONAL, 'The time in ms the importer will wait for some input.', 0),
])
;
}

/**
Expand All @@ -54,6 +57,8 @@ protected function execute(InputInterface $input, OutputInterface $output): void
return;
}

$timeout = $input->getOption('timeout');

// only accepts the format of json as messages
$name = ImporterRegistry::buildServiceName($importer, 'json');

Expand All @@ -69,7 +74,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void
/** @var SingleDataArrayImporterInterface $service */
$service = $this->importerRegistry->get($name);

$this->getImporterJsonDataFromMessageQueue($importer, $service, $output);
$this->importJsonDataFromMessageQueue($importer, $service, $output, (int) $timeout);
$this->finishImport($name, $output);
}

Expand All @@ -82,11 +87,11 @@ private function finishImport(string $name, OutputInterface $output): void
$output->writeln($message);
}

private function getImporterJsonDataFromMessageQueue(string $importer, SingleDataArrayImporterInterface $service, OutputInterface $output): void
private function importJsonDataFromMessageQueue(string $importer, SingleDataArrayImporterInterface $service, OutputInterface $output, int $timeout): void
{
$mqItemReader = new MqItemReader(new RedisConnectionFactory(), $service);
$mqItemReader->initQueue('sylius.export.queue.' . $importer);
$mqItemReader->readAndImport();
$mqItemReader->readAndImport($timeout);
$output->writeln('Imported: ' . $mqItemReader->getMessagesImportedCount());
$output->writeln('Skipped: ' . $mqItemReader->getMessagesSkippedCount());
}
Expand Down
2 changes: 1 addition & 1 deletion src/Exporter/ItemReaderInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ interface ItemReaderInterface
{
public function initQueue(string $queueName): void;

public function readAndImport(): void;
public function readAndImport(int $timeout): void;

public function getMessagesImportedCount(): int;

Expand Down
4 changes: 2 additions & 2 deletions src/Exporter/MqItemReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ public function initQueue(string $queueName): void
$this->messagesSkippedCount = 0;
}

public function readAndImport(): void
public function readAndImport(int $timeout = 0): void
{
/** @var RedisConsumer $consumer */
$consumer = $this->redisContext->createConsumer($this->queue);

/** @var RedisMessage $message */
while ($message = $consumer->receive()) {
while ($message = $consumer->receive($timeout)) {
$dataArrayToImport = (array) json_decode($message->getBody());

$this->service->importSingleDataArrayWithoutResult($dataArrayToImport);
Expand Down

0 comments on commit d9252ad

Please sign in to comment.