Skip to content

Commit

Permalink
fix custom transport auto-setup
Browse files Browse the repository at this point in the history
  • Loading branch information
benwalch committed Apr 3, 2024
1 parent 6f14d80 commit 26ea90e
Showing 1 changed file with 117 additions and 16 deletions.
133 changes: 117 additions & 16 deletions src/Queue/Transport/ExtendedDoctrineConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,35 @@
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Result;
use Doctrine\DBAL\Schema\AbstractSchemaManager;
use Doctrine\DBAL\Schema\Comparator;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\SchemaDiff;
use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
use Doctrine\DBAL\Schema\Table;
use Doctrine\DBAL\Types\Types;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection;
use Symfony\Component\Messenger\Exception\TransportException;

final class ExtendedDoctrineConnection extends Connection
{
private bool $autoSetup;
private ?SchemaSynchronizer $schemaSynchronizer;

public function __construct(array $configuration, DBALConnection $driverConnection, ?SchemaSynchronizer $schemaSynchronizer = null)
{
parent::__construct($configuration, $driverConnection, $schemaSynchronizer);
$this->autoSetup = $this->configuration['auto_setup'];
$this->schemaSynchronizer = $schemaSynchronizer;
}

public function setup(): void
{
$configuration = $this->driverConnection->getConfiguration();
$assetFilter = $configuration->getSchemaAssetsFilter();
$configuration->setSchemaAssetsFilter(static fn () => true);
$this->updateSchema();
$configuration->setSchemaAssetsFilter($assetFilter);
$this->autoSetup = false;
}

public function get(): ?array
Expand Down Expand Up @@ -159,7 +174,7 @@ public function send(string $body, array $headers, int $delay = 0): string
Types::STRING,
Types::DATETIME_IMMUTABLE,
Types::DATETIME_IMMUTABLE,
Types::INTEGER
Types::BIGINT
]);
}

Expand Down Expand Up @@ -265,32 +280,118 @@ private function executeInsert(string $sql, array $parameters = [], array $types
return $id;
}

private function decodeEnvelopeHeaders(array $doctrineEnvelope): array
private function createSchemaManager(): AbstractSchemaManager
{
$doctrineEnvelope['headers'] = json_decode($doctrineEnvelope['headers'], true);
return method_exists($this->driverConnection, 'createSchemaManager')
? $this->driverConnection->createSchemaManager()
: $this->driverConnection->getSchemaManager();
}

return $doctrineEnvelope;
private function getSchema(): Schema
{
$schema = new Schema([], [], $this->createSchemaManager()->createSchemaConfig());
$this->addTableToSchema($schema);

return $schema;
}

public function getExtraSetupSqlForTable(Table $createdTable): array
private function addTableToSchema(Schema $schema): void
{
if (!$createdTable->hasOption(self::TABLE_OPTION_NAME)) {
return [];
$table = $schema->createTable($this->configuration['table_name']);
// add an internal option to mark that we created this & the non-namespaced table name
$table->addOption(self::TABLE_OPTION_NAME, $this->configuration['table_name']);
$table->addColumn('id', Types::BIGINT)
->setAutoincrement(true)
->setNotnull(true);
$table->addColumn('body', Types::TEXT)
->setNotnull(true);
$table->addColumn('headers', Types::TEXT)
->setNotnull(true);
$table->addColumn('queue_name', Types::STRING)
->setLength(190) // MySQL 5.6 only supports 191 characters on an indexed column in utf8mb4 mode
->setNotnull(true);
$table->addColumn('created_at', Types::DATETIME_IMMUTABLE)
->setNotnull(true);
$table->addColumn('available_at', Types::DATETIME_IMMUTABLE)
->setNotnull(true);
$table->addColumn('delivered_at', Types::DATETIME_IMMUTABLE)
->setNotnull(false);
$table->addColumn('available_at_micro', Types::BIGINT)
->setNotnull(true);
$table->setPrimaryKey(['id']);
$table->addIndex(['queue_name']);
$table->addIndex(['available_at']);
$table->addIndex(['delivered_at']);
$table->addIndex(['available_at_micro']);
}

private function updateSchema(): void
{
if (null !== $this->schemaSynchronizer) {
$this->schemaSynchronizer->updateSchema($this->getSchema(), true);

return;
}

$schemaManager = $this->createSchemaManager();
$comparator = $this->createComparator($schemaManager);
$schemaDiff = $this->compareSchemas($comparator, method_exists($schemaManager, 'introspectSchema') ? $schemaManager->introspectSchema() : $schemaManager->createSchema(), $this->getSchema());
$platform = $this->driverConnection->getDatabasePlatform();

if (!method_exists(SchemaDiff::class, 'getCreatedSchemas')) {
foreach ($schemaDiff->toSaveSql($platform) as $sql) {
$this->driverConnection->executeStatement($sql);
}

return;
}

if ($platform->supportsSchemas()) {
foreach ($schemaDiff->getCreatedSchemas() as $schema) {
$this->driverConnection->executeStatement($platform->getCreateSchemaSQL($schema));
}
}

if ($platform->supportsSequences()) {
foreach ($schemaDiff->getAlteredSequences() as $sequence) {
$this->driverConnection->executeStatement($platform->getAlterSequenceSQL($sequence));
}

foreach ($schemaDiff->getCreatedSequences() as $sequence) {
$this->driverConnection->executeStatement($platform->getCreateSequenceSQL($sequence));
}
}

if ($createdTable->getOption(self::TABLE_OPTION_NAME) !== $this->configuration['table_name']) {
return [];
foreach ($platform->getCreateTablesSQL($schemaDiff->getCreatedTables()) as $sql) {
$this->driverConnection->executeStatement($sql);
}

return $this->getAdditionalColumnSql();
foreach ($schemaDiff->getAlteredTables() as $tableDiff) {
foreach ($platform->getAlterTableSQL($tableDiff) as $sql) {
$this->driverConnection->executeStatement($sql);
}
}
}

private function decodeEnvelopeHeaders(array $doctrineEnvelope): array
{
$doctrineEnvelope['headers'] = json_decode($doctrineEnvelope['headers'], true);

return $doctrineEnvelope;
}

private function createComparator(AbstractSchemaManager $schemaManager): Comparator
{
return method_exists($schemaManager, 'createComparator')
? $schemaManager->createComparator()
: new Comparator();
}

private function getAdditionalColumnSql(): array
private function compareSchemas(Comparator $comparator, Schema $from, Schema $to): SchemaDiff
{
return [
sprintf('ALTER TABLE %s ADD available_at_micro BIGINT UNSIGNED;', $this->configuration['table_name']),
sprintf('CREATE INDEX IDX_available_at_micro ON %s (available_at_micro);', $this->configuration['table_name'])
];
return method_exists($comparator, 'compareSchemas') || method_exists($comparator, 'doCompareSchemas')
? $comparator->compareSchemas($from, $to)
: $comparator->compare($from, $to);
}

}

0 comments on commit 26ea90e

Please sign in to comment.