Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…ortExportPlugin into add-configuration

# Conflicts:
#	src/Command/ImportDataFromMessageQueueCommand.php
#	src/Exporter/MqItemReader.php
#	src/Importer/ItemReaderInterface.php
  • Loading branch information
enesaktay committed Aug 15, 2018
2 parents a00e76a + d9252ad commit 5020f28
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 7 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ admin overview panel using the event hook system, ie. `admin/tax-categories/`.
$ bin/console sylius:import-from-message-queue country
```

- To make the importer wait 1s for messages to get into the message queue (default, does not wait)

```bash
$ bin/console sylius:import-from-message-queue country --timeout=1000
```

- Export data of resources to file using `country` exporter
```bash
$ bin/console sylius:export country my/countries/export/csv/file.csv --format=csv
Expand Down
13 changes: 9 additions & 4 deletions src/Command/ImportDataFromMessageQueueCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\DependencyInjection\ContainerAwareTrait;
Expand Down Expand Up @@ -40,7 +41,9 @@ protected function configure(): void
->setDescription('Import data from message queue.')
->setDefinition([
new InputArgument('importer', InputArgument::OPTIONAL, 'The importer to use.'),
]);
new InputOption('timeout', 't', InputOption::VALUE_OPTIONAL, 'The time in ms the importer will wait for some input.', 0),
])
;
}

/**
Expand All @@ -56,6 +59,8 @@ protected function execute(InputInterface $input, OutputInterface $output): void
return;
}

$timeout = $input->getOption('timeout');

// only accepts the format of json as messages
$name = ImporterRegistry::buildServiceName($importer, 'json');

Expand All @@ -73,7 +78,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void

/** @var ItemReaderInterface $mqItemReader */
$mqItemReader = $this->container->get('sylius.message_queue_reader');
$this->getImporterJsonDataFromMessageQueue($mqItemReader, $importer, $service, $output);
$this->importJsonDataFromMessageQueue($mqItemReader, $importer, $service, $output, (int) $timeout);
$this->finishImport($name, $output);
}

Expand All @@ -86,10 +91,10 @@ private function finishImport(string $name, OutputInterface $output): void
$output->writeln($message);
}

private function getImporterJsonDataFromMessageQueue(ItemReaderInterface $mqItemReader, $importer, SingleDataArrayImporterInterface $service, OutputInterface $output): void
private function importJsonDataFromMessageQueue(ItemReaderInterface $mqItemReader, $importer, SingleDataArrayImporterInterface $service, OutputInterface $output, int $timeout): void
{
$mqItemReader->initQueue('sylius.export.queue.' . $importer);
$mqItemReader->readAndImport($service);
$mqItemReader->readAndImport($service, $timeout);
$output->writeln('Imported: ' . $mqItemReader->getMessagesImportedCount());
$output->writeln('Skipped: ' . $mqItemReader->getMessagesSkippedCount());
}
Expand Down
2 changes: 1 addition & 1 deletion src/Importer/ItemReaderInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ interface ItemReaderInterface
{
public function initQueue(string $queueName): void;

public function readAndImport(SingleDataArrayImporterInterface $service): void;
public function readAndImport(SingleDataArrayImporterInterface $service, int $timeout): void;

public function getMessagesImportedCount(): int;

Expand Down
4 changes: 2 additions & 2 deletions src/Importer/MqItemReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ public function initQueue(string $queueName): void
$this->messagesSkippedCount = 0;
}

public function readAndImport(SingleDataArrayImporterInterface $service): void
public function readAndImport(SingleDataArrayImporterInterface $service, int $timeout = 0): void
{
/** @var PsrConsumer $consumer */
$consumer = $this->context->createConsumer($this->queue);

/** @var PsrMessage $message */
while ($message = $consumer->receive()) {
while ($message = $consumer->receive($timeout)) {
$dataArrayToImport = (array) json_decode($message->getBody());

$service->importSingleDataArrayWithoutResult($dataArrayToImport);
Expand Down

0 comments on commit 5020f28

Please sign in to comment.