From c31de40c1574634ec4497aa3c169084689d70732 Mon Sep 17 00:00:00 2001 From: enesaktay <21998826+enesaktay@users.noreply.github.com> Date: Mon, 13 Aug 2018 15:55:47 +0200 Subject: [PATCH 1/2] Add the functionality to configurate any message queue --- README.md | 15 + composer.json | 10 +- composer.lock | 458 ++++-------------- .../ExportDataToMessageQueueCommand.php | 10 +- .../ImportDataFromMessageQueueCommand.php | 15 +- .../Compiler/MessageQueuePass.php | 28 ++ src/DependencyInjection/Configuration.php | 8 + .../FOSSyliusImportExportExtension.php | 7 + src/Exporter/MqItemReader.php | 86 ---- src/Exporter/MqItemWriter.php | 22 +- src/FOSSyliusImportExportPlugin.php | 2 + .../ItemReaderInterface.php | 4 +- src/Importer/MqItemReader.php | 76 +++ src/Resources/config/services.yml | 4 +- .../config/services_message_queue.yml | 10 + tests/Application/app/config/config.yml | 8 + 16 files changed, 281 insertions(+), 482 deletions(-) create mode 100644 src/DependencyInjection/Compiler/MessageQueuePass.php delete mode 100644 src/Exporter/MqItemReader.php rename src/{Exporter => Importer}/ItemReaderInterface.php (61%) create mode 100644 src/Importer/MqItemReader.php create mode 100644 src/Resources/config/services_message_queue.yml diff --git a/README.md b/README.md index 78fbfade..0156ff99 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,21 @@ sylius_import_export: prefix: /admin ``` +### Message queue configuration: +```yaml +# define a service which will be used as the queue +services: + redis_connection_factory: + class: Enqueue\Redis\RedisConnectionFactory +``` + +```yaml +# use the defined service +fos_sylius_import_export: + message_queue: + service_id: 'redis_connection_factory' +``` + ## Usage ### Available importer types diff --git a/composer.json b/composer.json index fef370c3..320b4ac0 100644 --- a/composer.json +++ b/composer.json @@ -7,15 +7,16 @@ "php": "^7.1", "sylius/sylius": "^1.0", "portphp/portphp": "^1.2", - "symfony/stopwatch": "^3.3" + "symfony/stopwatch": "^3.3", + "queue-interop/queue-interop": "^0.6.2" }, "suggest": { "portphp/excel": "To support importing excel files, use version ^1.1", "ext-zip": "To support writing of Excel files", "portphp/csv": "To support importing csv files, use version ^1.1", - "enqueue/enqueue-bundle" : "To support message queuing", - "enqueue/redis" : "To support message queuing", - "predis/predis" : "To support message queuing" + "enqueue/enqueue-bundle" : "To help defining message queuing services", + "enqueue/redis" : "To support message queuing via redis", + "enqueue/sqs" : "To support message queuing via sqs" }, "require-dev": { "behat/behat": "^3.3", @@ -23,7 +24,6 @@ "behat/mink-browserkit-driver": "^1.3", "behat/mink-extension": "^2.2", "behat/mink-selenium2-driver": "^1.3", - "enqueue/enqueue-bundle": "^0.8.31", "enqueue/redis": "^0.8.23", "friends-of-behat/context-service-extension": "^1.0", "friends-of-behat/cross-container-extension": "^1.0", diff --git a/composer.lock b/composer.lock index 249a12a1..43bac660 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "e85bc9b443a1039506c8ddff0e5f1634", + "content-hash": "88eaf70db106102e109a4bbb73d4e50e", "packages": [ { "name": "behat/transliterator", @@ -2887,17 +2887,6 @@ { "name": "league/uri", "version": "5.3.0", - "source": { - "type": "git", - "url": "https://github.com/thephpleague/uri.git", - "reference": "f2bceb755f1108758cf4cf925e4cd7699ce686aa" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/thephpleague/uri/zipball/f2bceb755f1108758cf4cf925e4cd7699ce686aa", - "reference": "f2bceb755f1108758cf4cf925e4cd7699ce686aa", - "shasum": "" - }, "require": { "ext-fileinfo": "*", "ext-intl": "*", @@ -3793,16 +3782,16 @@ }, { "name": "paragonie/random_compat", - "version": "v2.0.15", + "version": "v2.0.17", "source": { "type": "git", "url": "https://github.com/paragonie/random_compat.git", - "reference": "10bcb46e8f3d365170f6de9d05245aa066b81f09" + "reference": "29af24f25bab834fcbb38ad2a69fa93b867e070d" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/paragonie/random_compat/zipball/10bcb46e8f3d365170f6de9d05245aa066b81f09", - "reference": "10bcb46e8f3d365170f6de9d05245aa066b81f09", + "url": "https://api.github.com/repos/paragonie/random_compat/zipball/29af24f25bab834fcbb38ad2a69fa93b867e070d", + "reference": "29af24f25bab834fcbb38ad2a69fa93b867e070d", "shasum": "" }, "require": { @@ -3838,7 +3827,7 @@ "pseudorandom", "random" ], - "time": "2018-06-08T15:26:40+00:00" + "time": "2018-07-04T16:31:37+00:00" }, { "name": "payum/iso4217", @@ -4919,23 +4908,67 @@ ], "time": "2017-10-23T01:57:42+00:00" }, + { + "name": "queue-interop/queue-interop", + "version": "0.6.2", + "source": { + "type": "git", + "url": "https://github.com/queue-interop/queue-interop.git", + "reference": "950c8083ff328751843080f25266dec0db624ddb" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/queue-interop/queue-interop/zipball/950c8083ff328751843080f25266dec0db624ddb", + "reference": "950c8083ff328751843080f25266dec0db624ddb", + "shasum": "" + }, + "require": { + "php": ">=5.5" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.0-dev" + } + }, + "autoload": { + "psr-4": { + "Interop\\Queue\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "description": "Promoting the interoperability of MQs objects. Based on Java JMS", + "homepage": "https://github.com/queue-interop/queue-interop", + "keywords": [ + "MQ", + "jms", + "message queue", + "messaging", + "queue" + ], + "time": "2018-08-05T12:16:04+00:00" + }, { "name": "ramsey/uuid", - "version": "3.7.3", + "version": "3.8.0", "source": { "type": "git", "url": "https://github.com/ramsey/uuid.git", - "reference": "44abcdad877d9a46685a3a4d221e3b2c4b87cb76" + "reference": "d09ea80159c1929d75b3f9c60504d613aeb4a1e3" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/ramsey/uuid/zipball/44abcdad877d9a46685a3a4d221e3b2c4b87cb76", - "reference": "44abcdad877d9a46685a3a4d221e3b2c4b87cb76", + "url": "https://api.github.com/repos/ramsey/uuid/zipball/d09ea80159c1929d75b3f9c60504d613aeb4a1e3", + "reference": "d09ea80159c1929d75b3f9c60504d613aeb4a1e3", "shasum": "" }, "require": { - "paragonie/random_compat": "^1.0|^2.0", - "php": "^5.4 || ^7.0" + "paragonie/random_compat": "^1.0|^2.0|9.99.99", + "php": "^5.4 || ^7.0", + "symfony/polyfill-ctype": "^1.8" }, "replace": { "rhumsaa/uuid": "self.version" @@ -4943,16 +4976,17 @@ "require-dev": { "codeception/aspect-mock": "^1.0 | ~2.0.0", "doctrine/annotations": "~1.2.0", - "goaop/framework": "1.0.0-alpha.2 | ^1.0 | ^2.1", + "goaop/framework": "1.0.0-alpha.2 | ^1.0 | ~2.1.0", "ircmaxell/random-lib": "^1.1", "jakub-onderka/php-parallel-lint": "^0.9.0", "mockery/mockery": "^0.9.9", "moontoast/math": "^1.1", "php-mock/php-mock-phpunit": "^0.3|^1.1", - "phpunit/phpunit": "^4.7|^5.0", + "phpunit/phpunit": "^4.7|^5.0|^6.5", "squizlabs/php_codesniffer": "^2.3" }, "suggest": { + "ext-ctype": "Provides support for PHP Ctype functions", "ext-libsodium": "Provides the PECL libsodium extension for use with the SodiumRandomGenerator", "ext-uuid": "Provides the PECL UUID extension for use with the PeclUuidTimeGenerator and PeclUuidRandomGenerator", "ircmaxell/random-lib": "Provides RandomLib for use with the RandomLibAdapter", @@ -4997,7 +5031,7 @@ "identifier", "uuid" ], - "time": "2018-01-20T00:28:24+00:00" + "time": "2018-07-19T23:38:55+00:00" }, { "name": "sensiolabs/security-checker", @@ -5882,16 +5916,16 @@ }, { "name": "symfony/polyfill-apcu", - "version": "v1.8.0", + "version": "v1.9.0", "source": { "type": "git", "url": "https://github.com/symfony/polyfill-apcu.git", - "reference": "9b83bd010112ec196410849e840d9b9fefcb15ad" + "reference": "19e1b73bf255265ad0b568f81766ae2a3266d8d2" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/polyfill-apcu/zipball/9b83bd010112ec196410849e840d9b9fefcb15ad", - "reference": "9b83bd010112ec196410849e840d9b9fefcb15ad", + "url": "https://api.github.com/repos/symfony/polyfill-apcu/zipball/19e1b73bf255265ad0b568f81766ae2a3266d8d2", + "reference": "19e1b73bf255265ad0b568f81766ae2a3266d8d2", "shasum": "" }, "require": { @@ -5900,7 +5934,7 @@ "type": "library", "extra": { "branch-alias": { - "dev-master": "1.8-dev" + "dev-master": "1.9-dev" } }, "autoload": { @@ -5934,29 +5968,32 @@ "portable", "shim" ], - "time": "2018-04-26T10:06:28+00:00" + "time": "2018-08-06T14:22:27+00:00" }, { "name": "symfony/polyfill-ctype", - "version": "v1.8.0", + "version": "v1.9.0", "source": { "type": "git", "url": "https://github.com/symfony/polyfill-ctype.git", - "reference": "7cc359f1b7b80fc25ed7796be7d96adc9b354bae" + "reference": "e3d826245268269cd66f8326bd8bc066687b4a19" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/polyfill-ctype/zipball/7cc359f1b7b80fc25ed7796be7d96adc9b354bae", - "reference": "7cc359f1b7b80fc25ed7796be7d96adc9b354bae", + "url": "https://api.github.com/repos/symfony/polyfill-ctype/zipball/e3d826245268269cd66f8326bd8bc066687b4a19", + "reference": "e3d826245268269cd66f8326bd8bc066687b4a19", "shasum": "" }, "require": { "php": ">=5.3.3" }, + "suggest": { + "ext-ctype": "For best performance" + }, "type": "library", "extra": { "branch-alias": { - "dev-master": "1.8-dev" + "dev-master": "1.9-dev" } }, "autoload": { @@ -5989,7 +6026,7 @@ "polyfill", "portable" ], - "time": "2018-04-30T19:57:29+00:00" + "time": "2018-08-06T14:22:27+00:00" }, { "name": "symfony/polyfill-iconv", @@ -6052,16 +6089,16 @@ }, { "name": "symfony/polyfill-intl-icu", - "version": "v1.8.0", + "version": "v1.9.0", "source": { "type": "git", "url": "https://github.com/symfony/polyfill-intl-icu.git", - "reference": "80ee17ae83c10cd513e5144f91a73607a21edb4e" + "reference": "f22a90256d577c7ef7efad8df1f0201663d57644" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/polyfill-intl-icu/zipball/80ee17ae83c10cd513e5144f91a73607a21edb4e", - "reference": "80ee17ae83c10cd513e5144f91a73607a21edb4e", + "url": "https://api.github.com/repos/symfony/polyfill-intl-icu/zipball/f22a90256d577c7ef7efad8df1f0201663d57644", + "reference": "f22a90256d577c7ef7efad8df1f0201663d57644", "shasum": "" }, "require": { @@ -6074,7 +6111,7 @@ "type": "library", "extra": { "branch-alias": { - "dev-master": "1.8-dev" + "dev-master": "1.9-dev" } }, "autoload": { @@ -6106,20 +6143,20 @@ "portable", "shim" ], - "time": "2018-04-25T14:53:50+00:00" + "time": "2018-08-06T14:22:27+00:00" }, { "name": "symfony/polyfill-mbstring", - "version": "v1.8.0", + "version": "v1.9.0", "source": { "type": "git", "url": "https://github.com/symfony/polyfill-mbstring.git", - "reference": "3296adf6a6454a050679cde90f95350ad604b171" + "reference": "d0cd638f4634c16d8df4508e847f14e9e43168b8" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/polyfill-mbstring/zipball/3296adf6a6454a050679cde90f95350ad604b171", - "reference": "3296adf6a6454a050679cde90f95350ad604b171", + "url": "https://api.github.com/repos/symfony/polyfill-mbstring/zipball/d0cd638f4634c16d8df4508e847f14e9e43168b8", + "reference": "d0cd638f4634c16d8df4508e847f14e9e43168b8", "shasum": "" }, "require": { @@ -6131,7 +6168,7 @@ "type": "library", "extra": { "branch-alias": { - "dev-master": "1.8-dev" + "dev-master": "1.9-dev" } }, "autoload": { @@ -6165,7 +6202,7 @@ "portable", "shim" ], - "time": "2018-04-26T10:06:28+00:00" + "time": "2018-08-06T14:22:27+00:00" }, { "name": "symfony/swiftmailer-bundle", @@ -6231,16 +6268,16 @@ }, { "name": "symfony/symfony", - "version": "v3.4.13", + "version": "v3.4.14", "source": { "type": "git", "url": "https://github.com/symfony/symfony.git", - "reference": "c406c5a4f11fc00b60d3ec585125350418c4568d" + "reference": "f50e17fa4edd6216f7fe5b908f04e64ba1d19a9e" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/symfony/zipball/c406c5a4f11fc00b60d3ec585125350418c4568d", - "reference": "c406c5a4f11fc00b60d3ec585125350418c4568d", + "url": "https://api.github.com/repos/symfony/symfony/zipball/f50e17fa4edd6216f7fe5b908f04e64ba1d19a9e", + "reference": "f50e17fa4edd6216f7fe5b908f04e64ba1d19a9e", "shasum": "" }, "require": { @@ -6382,7 +6419,7 @@ "keywords": [ "framework" ], - "time": "2018-07-23T16:37:45+00:00" + "time": "2018-08-01T14:48:04+00:00" }, { "name": "symfony/thanks", @@ -7700,274 +7737,6 @@ "homepage": "https://github.com/container-interop/container-interop", "time": "2017-02-14T19:40:03+00:00" }, - { - "name": "enqueue/async-event-dispatcher", - "version": "0.8.24", - "source": { - "type": "git", - "url": "https://github.com/php-enqueue/async-event-dispatcher.git", - "reference": "36902adf96e01a4db2311fd7896b9e5c96ed29de" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/php-enqueue/async-event-dispatcher/zipball/36902adf96e01a4db2311fd7896b9e5c96ed29de", - "reference": "36902adf96e01a4db2311fd7896b9e5c96ed29de", - "shasum": "" - }, - "require": { - "enqueue/enqueue": "^0.8@dev", - "php": ">=5.6", - "symfony/event-dispatcher": "^2.8|^3|^4" - }, - "require-dev": { - "enqueue/fs": "^0.8@dev", - "enqueue/null": "^0.8@dev", - "enqueue/test": "^0.8@dev", - "phpunit/phpunit": "~5.5", - "symfony/config": "^2.8|^3|^4", - "symfony/dependency-injection": "^2.8|^3|^4", - "symfony/filesystem": "^2.8|^3|^4", - "symfony/http-kernel": "^2.8|^3|^4" - }, - "suggest": { - "symfony/dependency-injection": "^2.8|^3|^4 If you'd like to use async event dispatcher container extension." - }, - "type": "library", - "extra": { - "branch-alias": { - "dev-master": "0.8.x-dev" - } - }, - "autoload": { - "psr-4": { - "Enqueue\\AsyncEventDispatcher\\": "" - }, - "exclude-from-classmap": [ - "/Tests/" - ] - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "description": "Symfony async event dispatcher", - "homepage": "https://enqueue.forma-pro.com/", - "keywords": [ - "async event", - "event dispatcher", - "messaging", - "queue" - ], - "time": "2018-03-23T10:52:55+00:00" - }, - { - "name": "enqueue/enqueue", - "version": "0.8.29", - "source": { - "type": "git", - "url": "https://github.com/php-enqueue/enqueue.git", - "reference": "d637e699d15b5343daa5c16d3b5b3641bc502c0b" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/php-enqueue/enqueue/zipball/d637e699d15b5343daa5c16d3b5b3641bc502c0b", - "reference": "d637e699d15b5343daa5c16d3b5b3641bc502c0b", - "shasum": "" - }, - "require": { - "enqueue/null": "^0.8@dev", - "php": ">=5.6", - "psr/log": "^1", - "queue-interop/queue-interop": "^0.6@dev|^1.0.0-alpha1", - "ramsey/uuid": "^2|^3.5" - }, - "require-dev": { - "empi89/php-amqp-stubs": "*@dev", - "enqueue/amqp-bunny": "^0.8@dev", - "enqueue/amqp-ext": "^0.8@dev", - "enqueue/amqp-lib": "^0.8@dev", - "enqueue/dbal": "^0.8@dev", - "enqueue/fs": "^0.8@dev", - "enqueue/gearman": "^0.8@dev", - "enqueue/gps": "^0.8@dev", - "enqueue/mongodb": "^0.8@dev", - "enqueue/pheanstalk": "^0.8@dev", - "enqueue/rdkafka": "^0.8@dev", - "enqueue/redis": "^0.8@dev", - "enqueue/simple-client": "^0.8@dev", - "enqueue/sqs": "^0.8@dev", - "enqueue/stomp": "^0.8@dev", - "enqueue/test": "^0.8@dev", - "phpunit/phpunit": "~5.5", - "symfony/config": "^2.8|^3|^4", - "symfony/console": "^2.8|^3|^4", - "symfony/dependency-injection": "^2.8|^3|^4", - "symfony/event-dispatcher": "^2.8|^3|^4", - "symfony/http-kernel": "^2.8|^3|^4" - }, - "suggest": { - "enqueue/amqp-ext": "AMQP transport (based on php extension)", - "enqueue/dbal": "Doctrine DBAL transport", - "enqueue/fs": "Filesystem transport", - "enqueue/redis": "Redis transport", - "enqueue/sqs": "Amazon AWS SQS transport", - "enqueue/stomp": "STOMP transport", - "symfony/config": "^2.8|^3|^4", - "symfony/console": "^2.8|^3|^4 If you want to use li commands", - "symfony/dependency-injection": "^2.8|^3|^4" - }, - "type": "library", - "extra": { - "branch-alias": { - "dev-master": "0.8.x-dev" - } - }, - "autoload": { - "psr-4": { - "Enqueue\\": "" - }, - "files": [ - "functions_include.php" - ], - "exclude-from-classmap": [ - "/Tests/" - ] - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "description": "Message Queue Library", - "homepage": "https://enqueue.forma-pro.com/", - "keywords": [ - "AMQP", - "messaging", - "queue", - "rabbitmq" - ], - "time": "2018-05-04T05:47:00+00:00" - }, - { - "name": "enqueue/enqueue-bundle", - "version": "0.8.31", - "source": { - "type": "git", - "url": "https://github.com/php-enqueue/enqueue-bundle.git", - "reference": "5d57224edb468731ab1d40e4eba8a07df444ea58" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/php-enqueue/enqueue-bundle/zipball/5d57224edb468731ab1d40e4eba8a07df444ea58", - "reference": "5d57224edb468731ab1d40e4eba8a07df444ea58", - "shasum": "" - }, - "require": { - "enqueue/async-event-dispatcher": "^0.8@dev", - "enqueue/enqueue": "^0.8@dev", - "enqueue/null": "^0.8@dev", - "php": ">=5.6", - "symfony/framework-bundle": "^2.8|^3|^4" - }, - "require-dev": { - "doctrine/doctrine-bundle": "~1.2", - "enqueue/amqp-bunny": "^0.8@dev", - "enqueue/amqp-ext": "^0.8@dev", - "enqueue/amqp-lib": "^0.8@dev", - "enqueue/dbal": "^0.8@dev", - "enqueue/fs": "^0.8@dev", - "enqueue/gps": "^0.8@dev", - "enqueue/job-queue": "^0.8@dev", - "enqueue/redis": "^0.8@dev", - "enqueue/sqs": "^0.8@dev", - "enqueue/stomp": "^0.8@dev", - "enqueue/test": "^0.8@dev", - "php-amqplib/php-amqplib": "^2.7@dev", - "phpunit/phpunit": "~5.5", - "symfony/browser-kit": "^2.8|^3|^4", - "symfony/expression-language": "^2.8|^3|^4", - "symfony/monolog-bundle": "^2.8|^3|^4" - }, - "type": "symfony-bundle", - "extra": { - "branch-alias": { - "dev-master": "0.8.x-dev" - } - }, - "autoload": { - "psr-4": { - "Enqueue\\Bundle\\": "" - }, - "exclude-from-classmap": [ - "/Tests/" - ] - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "description": "Message Queue Bundle", - "homepage": "https://enqueue.forma-pro.com/", - "keywords": [ - "AMQP", - "messaging", - "queue", - "rabbitmq" - ], - "time": "2018-05-10T13:40:08+00:00" - }, - { - "name": "enqueue/null", - "version": "0.8.24", - "source": { - "type": "git", - "url": "https://github.com/php-enqueue/null.git", - "reference": "e75270a99e6d65a2b7d739fa6c6131338d85acee" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/php-enqueue/null/zipball/e75270a99e6d65a2b7d739fa6c6131338d85acee", - "reference": "e75270a99e6d65a2b7d739fa6c6131338d85acee", - "shasum": "" - }, - "require": { - "php": ">=5.6", - "queue-interop/queue-interop": "^0.6@dev|^1.0.0-alpha1" - }, - "require-dev": { - "enqueue/enqueue": "^0.8@dev", - "enqueue/test": "^0.8@dev", - "phpunit/phpunit": "~5.5", - "queue-interop/queue-spec": "^0.5.3@dev", - "symfony/config": "^2.8|^3|^4", - "symfony/dependency-injection": "^2.8|^3|^4" - }, - "type": "library", - "extra": { - "branch-alias": { - "dev-master": "0.8.x-dev" - } - }, - "autoload": { - "psr-4": { - "Enqueue\\Null\\": "" - }, - "exclude-from-classmap": [ - "/Tests/" - ] - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "description": "Enqueue Null transport", - "homepage": "https://enqueue.forma-pro.com/", - "keywords": [ - "messaging", - "queue", - "testing" - ], - "time": "2018-03-23T10:52:55+00:00" - }, { "name": "enqueue/redis", "version": "0.8.23", @@ -10397,49 +10166,6 @@ ], "time": "2016-06-16T16:22:20+00:00" }, - { - "name": "queue-interop/queue-interop", - "version": "0.6.1", - "source": { - "type": "git", - "url": "https://github.com/queue-interop/queue-interop.git", - "reference": "38579005c0492c0275bbae31170edf30a7e740fa" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/queue-interop/queue-interop/zipball/38579005c0492c0275bbae31170edf30a7e740fa", - "reference": "38579005c0492c0275bbae31170edf30a7e740fa", - "shasum": "" - }, - "require": { - "php": ">=5.5" - }, - "type": "library", - "extra": { - "branch-alias": { - "dev-master": "0.6.x-dev" - } - }, - "autoload": { - "psr-4": { - "Interop\\Queue\\": "src/" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "description": "Promoting the interoperability of MQs objects. Based on Java JMS", - "homepage": "https://github.com/queue-interop/queue-interop", - "keywords": [ - "MQ", - "jms", - "message queue", - "messaging", - "queue" - ], - "time": "2017-08-10T11:24:15+00:00" - }, { "name": "se/selenium-server-standalone", "version": "v2.53.1", diff --git a/src/Command/ExportDataToMessageQueueCommand.php b/src/Command/ExportDataToMessageQueueCommand.php index da81aed5..36d13da6 100644 --- a/src/Command/ExportDataToMessageQueueCommand.php +++ b/src/Command/ExportDataToMessageQueueCommand.php @@ -4,9 +4,8 @@ namespace FriendsOfSylius\SyliusImportExportPlugin\Command; -use Enqueue\Redis\RedisConnectionFactory; use FriendsOfSylius\SyliusImportExportPlugin\Exporter\ExporterRegistry; -use FriendsOfSylius\SyliusImportExportPlugin\Exporter\MqItemWriter; +use FriendsOfSylius\SyliusImportExportPlugin\Exporter\ItemWriterInterface; use FriendsOfSylius\SyliusImportExportPlugin\Exporter\ResourceExporterInterface; use Sylius\Component\Resource\Model\ResourceInterface; use Sylius\Component\Resource\Repository\RepositoryInterface; @@ -74,7 +73,9 @@ protected function execute(InputInterface $input, OutputInterface $output): void $this->listExporters($input, $output, sprintf('There is no \'%s\' exporter.', $name)); } - $this->export($name, $idsToExport, $exporter); + /** @var ItemWriterInterface $mqItemWriter */ + $mqItemWriter = $this->container->get('sylius.message_queue_writer'); + $this->export($mqItemWriter, $name, $idsToExport, $exporter); $this->finishExport($items, 'message queue', $name, $output); } @@ -122,14 +123,13 @@ private function prepareExport(array $items): array /** * @param int[] $idsToExport */ - private function export(string $name, array $idsToExport, string $exporter): void + private function export(ItemWriterInterface $mqItemWriter, string $name, array $idsToExport, string $exporter): void { /** @var ResourceExporterInterface $service */ $service = $this->exporterRegistry->get($name); $service->export($idsToExport); $itemsToExport = $service->getExportedData(); - $mqItemWriter = new MqItemWriter(new RedisConnectionFactory()); $mqItemWriter->initQueue('sylius.export.queue.' . $exporter); $mqItemWriter->write(json_decode($itemsToExport)); } diff --git a/src/Command/ImportDataFromMessageQueueCommand.php b/src/Command/ImportDataFromMessageQueueCommand.php index aa34520d..a7cdc766 100644 --- a/src/Command/ImportDataFromMessageQueueCommand.php +++ b/src/Command/ImportDataFromMessageQueueCommand.php @@ -4,18 +4,20 @@ namespace FriendsOfSylius\SyliusImportExportPlugin\Command; -use Enqueue\Redis\RedisConnectionFactory; -use FriendsOfSylius\SyliusImportExportPlugin\Exporter\MqItemReader; use FriendsOfSylius\SyliusImportExportPlugin\Importer\ImporterRegistry; +use FriendsOfSylius\SyliusImportExportPlugin\Importer\ItemReaderInterface; use FriendsOfSylius\SyliusImportExportPlugin\Importer\SingleDataArrayImporterInterface; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Style\SymfonyStyle; +use Symfony\Component\DependencyInjection\ContainerAwareTrait; final class ImportDataFromMessageQueueCommand extends Command { + use ContainerAwareTrait; + /** * @var ImporterRegistry */ @@ -69,7 +71,9 @@ protected function execute(InputInterface $input, OutputInterface $output): void /** @var SingleDataArrayImporterInterface $service */ $service = $this->importerRegistry->get($name); - $this->getImporterJsonDataFromMessageQueue($importer, $service, $output); + /** @var ItemReaderInterface $mqItemReader */ + $mqItemReader = $this->container->get('sylius.message_queue_reader'); + $this->getImporterJsonDataFromMessageQueue($mqItemReader, $importer, $service, $output); $this->finishImport($name, $output); } @@ -82,11 +86,10 @@ private function finishImport(string $name, OutputInterface $output): void $output->writeln($message); } - private function getImporterJsonDataFromMessageQueue(string $importer, SingleDataArrayImporterInterface $service, OutputInterface $output): void + private function getImporterJsonDataFromMessageQueue(ItemReaderInterface $mqItemReader, $importer, SingleDataArrayImporterInterface $service, OutputInterface $output): void { - $mqItemReader = new MqItemReader(new RedisConnectionFactory(), $service); $mqItemReader->initQueue('sylius.export.queue.' . $importer); - $mqItemReader->readAndImport(); + $mqItemReader->readAndImport($service); $output->writeln('Imported: ' . $mqItemReader->getMessagesImportedCount()); $output->writeln('Skipped: ' . $mqItemReader->getMessagesSkippedCount()); } diff --git a/src/DependencyInjection/Compiler/MessageQueuePass.php b/src/DependencyInjection/Compiler/MessageQueuePass.php new file mode 100644 index 00000000..a53f1f21 --- /dev/null +++ b/src/DependencyInjection/Compiler/MessageQueuePass.php @@ -0,0 +1,28 @@ +getParameter('sylius.message_queue'); + + $writerDefinition = $container->getDefinition('sylius.message_queue_writer'); + $writerDefinition->addArgument(new Reference($config['exporter_service_id'])); + $writerDefinition->setAbstract(false); + + $readerDefinition = $container->getDefinition('sylius.message_queue_reader'); + $readerDefinition->addArgument(new Reference($config['importer_service_id'])); + $readerDefinition->setAbstract(false); + } +} diff --git a/src/DependencyInjection/Configuration.php b/src/DependencyInjection/Configuration.php index 4a6f405b..0a04a406 100644 --- a/src/DependencyInjection/Configuration.php +++ b/src/DependencyInjection/Configuration.php @@ -34,6 +34,14 @@ public function getConfigTreeBuilder(): TreeBuilder ->booleanNode('web_ui')->defaultTrue()->end() ->end() ->end() + ->arrayNode('message_queue') + ->canBeEnabled() + ->children() + ->scalarNode('service_id')->defaultNull()->end() + ->scalarNode('importer_service_id')->defaultNull()->end() + ->scalarNode('exporter_service_id')->defaultNull()->end() + ->end() + ->end() ->end() ->end(); diff --git a/src/DependencyInjection/FOSSyliusImportExportExtension.php b/src/DependencyInjection/FOSSyliusImportExportExtension.php index b5e3e860..f68be6a5 100644 --- a/src/DependencyInjection/FOSSyliusImportExportExtension.php +++ b/src/DependencyInjection/FOSSyliusImportExportExtension.php @@ -54,6 +54,13 @@ public function load(array $configs, ContainerBuilder $container) $loader->load('services_export_excel.yml'); } + if (isset($config['message_queue'])) { + $loader->load('services_message_queue.yml'); + $config['message_queue']['importer_service_id'] = $config['message_queue']['importer_service_id'] ?? $config['message_queue']['service_id']; + $config['message_queue']['exporter_service_id'] = $config['message_queue']['exporter_service_id'] ?? $config['message_queue']['service_id']; + $container->setParameter('sylius.message_queue', $config['message_queue']); + } + $loader->load('services_import_json.yml'); $loader->load('services_export_json.yml'); diff --git a/src/Exporter/MqItemReader.php b/src/Exporter/MqItemReader.php deleted file mode 100644 index e9dc0011..00000000 --- a/src/Exporter/MqItemReader.php +++ /dev/null @@ -1,86 +0,0 @@ -redisConnectionFactory = $redisConnectionFactory; - $this->service = $service; - } - - /** - * {@inheritdoc} - */ - public function initQueue(string $queueName): void - { - $this->redisContext = $this->redisConnectionFactory->createContext(); - $this->queue = $this->redisContext->createQueue($queueName); - $this->messagesImportedCount = 0; - $this->messagesSkippedCount = 0; - } - - public function readAndImport(): void - { - /** @var RedisConsumer $consumer */ - $consumer = $this->redisContext->createConsumer($this->queue); - - /** @var RedisMessage $message */ - while ($message = $consumer->receive()) { - $dataArrayToImport = (array) json_decode($message->getBody()); - - $this->service->importSingleDataArrayWithoutResult($dataArrayToImport); - ++$this->messagesImportedCount; - } - } - - public function getMessagesImportedCount(): int - { - return $this->messagesImportedCount; - } - - public function getMessagesSkippedCount(): int - { - return $this->messagesSkippedCount; - } -} diff --git a/src/Exporter/MqItemWriter.php b/src/Exporter/MqItemWriter.php index 2ce5e194..fd4384bc 100644 --- a/src/Exporter/MqItemWriter.php +++ b/src/Exporter/MqItemWriter.php @@ -4,7 +4,7 @@ namespace FriendsOfSylius\SyliusImportExportPlugin\Exporter; -use Enqueue\Redis\RedisConnectionFactory; +use Interop\Queue\PsrConnectionFactory; use Interop\Queue\PsrConsumer; use Interop\Queue\PsrContext; use Interop\Queue\PsrQueue; @@ -12,14 +12,14 @@ class MqItemWriter implements ItemWriterInterface { /** - * @var RedisConnectionFactory + * @var PsrConnectionFactory */ - private $redisConnectionFactory; + private $psrConnectionFactory; /** * @var PsrContext */ - private $redisContext; + private $context; /** * @var PsrQueue @@ -31,16 +31,16 @@ class MqItemWriter implements ItemWriterInterface */ private $consumer; - public function __construct(RedisConnectionFactory $redisConnectionFactory) + public function __construct(PsrConnectionFactory $psrConnectionFactory) { - $this->redisConnectionFactory = $redisConnectionFactory; + $this->psrConnectionFactory = $psrConnectionFactory; } public function initQueue(string $queueName): void { - $this->redisContext = $this->redisConnectionFactory->createContext(); - $this->queue = $this->redisContext->createQueue($queueName); - $this->consumer = $this->redisContext->createConsumer($this->queue); + $this->context = $this->psrConnectionFactory->createContext(); + $this->queue = $this->context->createQueue($queueName); + $this->consumer = $this->context->createConsumer($this->queue); } /** @@ -49,12 +49,12 @@ public function initQueue(string $queueName): void public function write(array $items): void { foreach ($items as $item) { - $message = $this->redisContext->createMessage( + $message = $this->context->createMessage( json_encode($item) ?: '', [], ['recordedOn' => (new \DateTime())->format('Y-m-d H:i:s')] ); - $this->redisContext->createProducer()->send($this->queue, $message); + $this->context->createProducer()->send($this->queue, $message); } } } diff --git a/src/FOSSyliusImportExportPlugin.php b/src/FOSSyliusImportExportPlugin.php index baa3e2fc..25b8111f 100644 --- a/src/FOSSyliusImportExportPlugin.php +++ b/src/FOSSyliusImportExportPlugin.php @@ -4,6 +4,7 @@ namespace FriendsOfSylius\SyliusImportExportPlugin; +use FriendsOfSylius\SyliusImportExportPlugin\DependencyInjection\Compiler\MessageQueuePass; use FriendsOfSylius\SyliusImportExportPlugin\DependencyInjection\Compiler\RegisterExporterPass; use FriendsOfSylius\SyliusImportExportPlugin\DependencyInjection\Compiler\RegisterImporterPass; use Sylius\Bundle\CoreBundle\Application\SyliusPluginTrait; @@ -22,5 +23,6 @@ public function build(ContainerBuilder $container) parent::build($container); $container->addCompilerPass(new RegisterImporterPass()); $container->addCompilerPass(new RegisterExporterPass()); + $container->addCompilerPass(new MessageQueuePass()); } } diff --git a/src/Exporter/ItemReaderInterface.php b/src/Importer/ItemReaderInterface.php similarity index 61% rename from src/Exporter/ItemReaderInterface.php rename to src/Importer/ItemReaderInterface.php index 5099dd2e..2a350d5c 100644 --- a/src/Exporter/ItemReaderInterface.php +++ b/src/Importer/ItemReaderInterface.php @@ -2,13 +2,13 @@ declare(strict_types=1); -namespace FriendsOfSylius\SyliusImportExportPlugin\Exporter; +namespace FriendsOfSylius\SyliusImportExportPlugin\Importer; interface ItemReaderInterface { public function initQueue(string $queueName): void; - public function readAndImport(): void; + public function readAndImport(SingleDataArrayImporterInterface $service): void; public function getMessagesImportedCount(): int; diff --git a/src/Importer/MqItemReader.php b/src/Importer/MqItemReader.php new file mode 100644 index 00000000..0aa1dfd8 --- /dev/null +++ b/src/Importer/MqItemReader.php @@ -0,0 +1,76 @@ +psrConnectionFactory = $psrConnectionFactory; + } + + public function initQueue(string $queueName): void + { + $this->context = $this->psrConnectionFactory->createContext(); + $this->queue = $this->context->createQueue($queueName); + $this->messagesImportedCount = 0; + $this->messagesSkippedCount = 0; + } + + public function readAndImport(SingleDataArrayImporterInterface $service): void + { + /** @var PsrConsumer $consumer */ + $consumer = $this->context->createConsumer($this->queue); + + /** @var PsrMessage $message */ + while ($message = $consumer->receive()) { + $dataArrayToImport = (array) json_decode($message->getBody()); + + $service->importSingleDataArrayWithoutResult($dataArrayToImport); + ++$this->messagesImportedCount; + } + } + + public function getMessagesImportedCount(): int + { + return $this->messagesImportedCount; + } + + public function getMessagesSkippedCount(): int + { + return $this->messagesSkippedCount; + } +} diff --git a/src/Resources/config/services.yml b/src/Resources/config/services.yml index cf967433..9f9137cc 100644 --- a/src/Resources/config/services.yml +++ b/src/Resources/config/services.yml @@ -54,10 +54,12 @@ services: tags: - { name: 'console.command' } - sylius.command.import_data_data_from_message_queue: + sylius.command.import_data_from_message_queue: class: FriendsOfSylius\SyliusImportExportPlugin\Command\ImportDataFromMessageQueueCommand arguments: - "@sylius.importers_registry" + calls: + - [ setContainer, ["@service_container"]] tags: - { name: 'console.command' } diff --git a/src/Resources/config/services_message_queue.yml b/src/Resources/config/services_message_queue.yml new file mode 100644 index 00000000..7e0362e4 --- /dev/null +++ b/src/Resources/config/services_message_queue.yml @@ -0,0 +1,10 @@ +services: + sylius.message_queue_writer: + class: FriendsOfSylius\SyliusImportExportPlugin\Exporter\MqItemWriter + abstract: true + public: true + + sylius.message_queue_reader: + class: FriendsOfSylius\SyliusImportExportPlugin\Importer\MqItemReader + abstract: true + public: true diff --git a/tests/Application/app/config/config.yml b/tests/Application/app/config/config.yml index 5fcf36c8..356c8595 100644 --- a/tests/Application/app/config/config.yml +++ b/tests/Application/app/config/config.yml @@ -64,3 +64,11 @@ fos_rest: sylius_theme: sources: test: ~ + +fos_sylius_import_export: + message_queue: + service_id: 'redis_connection_factory' + +services: + redis_connection_factory: + class: Enqueue\Redis\RedisConnectionFactory From a00e76a838460ed2bc85b42f12f130dc804f2127 Mon Sep 17 00:00:00 2001 From: enesaktay <21998826+enesaktay@users.noreply.github.com> Date: Wed, 15 Aug 2018 13:30:50 +0200 Subject: [PATCH 2/2] update readme.md --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 0156ff99..6631367c 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,7 @@ sylius_import_export: ``` ### Message queue configuration: +Any library implementing the **"queue-interop/queue-interop"** can be used as the message queue. Following is the "enqueue/redis" library shown as an example usage. ```yaml # define a service which will be used as the queue services: