diff --git a/src/Queue/Transport/ExtendedDoctrineConnection.php b/src/Queue/Transport/ExtendedDoctrineConnection.php index 710eac3..d7e4587 100644 --- a/src/Queue/Transport/ExtendedDoctrineConnection.php +++ b/src/Queue/Transport/ExtendedDoctrineConnection.php @@ -12,8 +12,11 @@ use Doctrine\DBAL\Platforms\PostgreSQLPlatform; use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Result; +use Doctrine\DBAL\Schema\AbstractSchemaManager; +use Doctrine\DBAL\Schema\Comparator; +use Doctrine\DBAL\Schema\Schema; +use Doctrine\DBAL\Schema\SchemaDiff; use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer; -use Doctrine\DBAL\Schema\Table; use Doctrine\DBAL\Types\Types; use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection; use Symfony\Component\Messenger\Exception\TransportException; @@ -21,11 +24,23 @@ final class ExtendedDoctrineConnection extends Connection { private bool $autoSetup; + private ?SchemaSynchronizer $schemaSynchronizer; public function __construct(array $configuration, DBALConnection $driverConnection, ?SchemaSynchronizer $schemaSynchronizer = null) { parent::__construct($configuration, $driverConnection, $schemaSynchronizer); $this->autoSetup = $this->configuration['auto_setup']; + $this->schemaSynchronizer = $schemaSynchronizer; + } + + public function setup(): void + { + $configuration = $this->driverConnection->getConfiguration(); + $assetFilter = $configuration->getSchemaAssetsFilter(); + $configuration->setSchemaAssetsFilter(static fn () => true); + $this->updateSchema(); + $configuration->setSchemaAssetsFilter($assetFilter); + $this->autoSetup = false; } public function get(): ?array @@ -159,7 +174,7 @@ public function send(string $body, array $headers, int $delay = 0): string Types::STRING, Types::DATETIME_IMMUTABLE, Types::DATETIME_IMMUTABLE, - Types::INTEGER + Types::BIGINT ]); } @@ -265,32 +280,118 @@ private function executeInsert(string $sql, array $parameters = [], array $types return $id; } - private function decodeEnvelopeHeaders(array $doctrineEnvelope): array + private function createSchemaManager(): AbstractSchemaManager { - $doctrineEnvelope['headers'] = json_decode($doctrineEnvelope['headers'], true); + return method_exists($this->driverConnection, 'createSchemaManager') + ? $this->driverConnection->createSchemaManager() + : $this->driverConnection->getSchemaManager(); + } - return $doctrineEnvelope; + private function getSchema(): Schema + { + $schema = new Schema([], [], $this->createSchemaManager()->createSchemaConfig()); + $this->addTableToSchema($schema); + + return $schema; } - public function getExtraSetupSqlForTable(Table $createdTable): array + private function addTableToSchema(Schema $schema): void { - if (!$createdTable->hasOption(self::TABLE_OPTION_NAME)) { - return []; + $table = $schema->createTable($this->configuration['table_name']); + // add an internal option to mark that we created this & the non-namespaced table name + $table->addOption(self::TABLE_OPTION_NAME, $this->configuration['table_name']); + $table->addColumn('id', Types::BIGINT) + ->setAutoincrement(true) + ->setNotnull(true); + $table->addColumn('body', Types::TEXT) + ->setNotnull(true); + $table->addColumn('headers', Types::TEXT) + ->setNotnull(true); + $table->addColumn('queue_name', Types::STRING) + ->setLength(190) // MySQL 5.6 only supports 191 characters on an indexed column in utf8mb4 mode + ->setNotnull(true); + $table->addColumn('created_at', Types::DATETIME_IMMUTABLE) + ->setNotnull(true); + $table->addColumn('available_at', Types::DATETIME_IMMUTABLE) + ->setNotnull(true); + $table->addColumn('delivered_at', Types::DATETIME_IMMUTABLE) + ->setNotnull(false); + $table->addColumn('available_at_micro', Types::BIGINT) + ->setNotnull(true); + $table->setPrimaryKey(['id']); + $table->addIndex(['queue_name']); + $table->addIndex(['available_at']); + $table->addIndex(['delivered_at']); + $table->addIndex(['available_at_micro']); + } + + private function updateSchema(): void + { + if (null !== $this->schemaSynchronizer) { + $this->schemaSynchronizer->updateSchema($this->getSchema(), true); + + return; + } + + $schemaManager = $this->createSchemaManager(); + $comparator = $this->createComparator($schemaManager); + $schemaDiff = $this->compareSchemas($comparator, method_exists($schemaManager, 'introspectSchema') ? $schemaManager->introspectSchema() : $schemaManager->createSchema(), $this->getSchema()); + $platform = $this->driverConnection->getDatabasePlatform(); + + if (!method_exists(SchemaDiff::class, 'getCreatedSchemas')) { + foreach ($schemaDiff->toSaveSql($platform) as $sql) { + $this->driverConnection->executeStatement($sql); + } + + return; + } + + if ($platform->supportsSchemas()) { + foreach ($schemaDiff->getCreatedSchemas() as $schema) { + $this->driverConnection->executeStatement($platform->getCreateSchemaSQL($schema)); + } + } + + if ($platform->supportsSequences()) { + foreach ($schemaDiff->getAlteredSequences() as $sequence) { + $this->driverConnection->executeStatement($platform->getAlterSequenceSQL($sequence)); + } + + foreach ($schemaDiff->getCreatedSequences() as $sequence) { + $this->driverConnection->executeStatement($platform->getCreateSequenceSQL($sequence)); + } } - if ($createdTable->getOption(self::TABLE_OPTION_NAME) !== $this->configuration['table_name']) { - return []; + foreach ($platform->getCreateTablesSQL($schemaDiff->getCreatedTables()) as $sql) { + $this->driverConnection->executeStatement($sql); } - return $this->getAdditionalColumnSql(); + foreach ($schemaDiff->getAlteredTables() as $tableDiff) { + foreach ($platform->getAlterTableSQL($tableDiff) as $sql) { + $this->driverConnection->executeStatement($sql); + } + } + } + + private function decodeEnvelopeHeaders(array $doctrineEnvelope): array + { + $doctrineEnvelope['headers'] = json_decode($doctrineEnvelope['headers'], true); + + return $doctrineEnvelope; + } + + private function createComparator(AbstractSchemaManager $schemaManager): Comparator + { + return method_exists($schemaManager, 'createComparator') + ? $schemaManager->createComparator() + : new Comparator(); } - private function getAdditionalColumnSql(): array + private function compareSchemas(Comparator $comparator, Schema $from, Schema $to): SchemaDiff { - return [ - sprintf('ALTER TABLE %s ADD available_at_micro BIGINT UNSIGNED;', $this->configuration['table_name']), - sprintf('CREATE INDEX IDX_available_at_micro ON %s (available_at_micro);', $this->configuration['table_name']) - ]; + return method_exists($comparator, 'compareSchemas') || method_exists($comparator, 'doCompareSchemas') + ? $comparator->compareSchemas($from, $to) + : $comparator->compare($from, $to); } }