diff --git a/src/Client.php b/src/Client.php index 06ac91846..acdeef003 100644 --- a/src/Client.php +++ b/src/Client.php @@ -200,7 +200,7 @@ public function dropDatabase(string $databaseName, array $options = []) $options['typeMap'] = $this->typeMap; } - $server = select_server($this->manager, $options); + $server = select_server_for_write($this->manager, $options); if (! isset($options['writeConcern']) && ! is_in_transaction($options)) { $options['writeConcern'] = $this->writeConcern; diff --git a/src/Collection.php b/src/Collection.php index 0420be097..55b5c8a85 100644 --- a/src/Collection.php +++ b/src/Collection.php @@ -260,7 +260,7 @@ public function bulkWrite(array $operations, array $options = []) $operation = new BulkWrite($this->databaseName, $this->collectionName, $operations, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -362,7 +362,7 @@ public function createIndexes(array $indexes, array $options = []) $operation = new CreateIndexes($this->databaseName, $this->collectionName, $indexes, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -418,7 +418,7 @@ public function createSearchIndex($definition, array $options = []): string public function createSearchIndexes(array $indexes, array $options = []): array { $operation = new CreateSearchIndexes($this->databaseName, $this->collectionName, $indexes, $options); - $server = select_server($this->manager, $options); + $server = select_server_for_write($this->manager, $options); return $operation->execute($server); } @@ -441,7 +441,7 @@ public function deleteMany($filter, array $options = []) $operation = new DeleteMany($this->databaseName, $this->collectionName, $filter, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -462,7 +462,7 @@ public function deleteOne($filter, array $options = []) $operation = new DeleteOne($this->databaseName, $this->collectionName, $filter, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -503,7 +503,7 @@ public function drop(array $options = []) $options = $this->inheritWriteOptions($options); $options = $this->inheritTypeMap($options); - $server = select_server($this->manager, $options); + $server = select_server_for_write($this->manager, $options); if (! isset($options['encryptedFields'])) { $options['encryptedFields'] = get_encrypted_fields_from_driver($this->databaseName, $this->collectionName, $this->manager) @@ -541,7 +541,7 @@ public function dropIndex($indexName, array $options = []) $operation = new DropIndexes($this->databaseName, $this->collectionName, $indexName, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -561,7 +561,7 @@ public function dropIndexes(array $options = []) $operation = new DropIndexes($this->databaseName, $this->collectionName, '*', $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -577,7 +577,7 @@ public function dropIndexes(array $options = []) public function dropSearchIndex(string $name, array $options = []): void { $operation = new DropSearchIndex($this->databaseName, $this->collectionName, $name); - $server = select_server($this->manager, $options); + $server = select_server_for_write($this->manager, $options); $operation->execute($server); } @@ -690,7 +690,7 @@ public function findOneAndDelete($filter, array $options = []) $operation = new FindOneAndDelete($this->databaseName, $this->collectionName, $filter, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -720,7 +720,7 @@ public function findOneAndReplace($filter, $replacement, array $options = []) $operation = new FindOneAndReplace($this->databaseName, $this->collectionName, $filter, $replacement, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -750,7 +750,7 @@ public function findOneAndUpdate($filter, $update, array $options = []) $operation = new FindOneAndUpdate($this->databaseName, $this->collectionName, $filter, $update, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -854,7 +854,7 @@ public function insertMany(array $documents, array $options = []) $operation = new InsertMany($this->databaseName, $this->collectionName, $documents, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -875,7 +875,7 @@ public function insertOne($document, array $options = []) $operation = new InsertOne($this->databaseName, $this->collectionName, $document, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -949,7 +949,7 @@ public function mapReduce(JavascriptInterface $map, JavascriptInterface $reduce, $operation = new MapReduce($this->databaseName, $this->collectionName, $map, $reduce, $out, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -975,7 +975,7 @@ public function rename(string $toCollectionName, ?string $toDatabaseName = null, $operation = new RenameCollection($this->databaseName, $this->collectionName, $toDatabaseName, $toCollectionName, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -998,7 +998,7 @@ public function replaceOne($filter, $replacement, array $options = []) $operation = new ReplaceOne($this->databaseName, $this->collectionName, $filter, $replacement, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -1020,7 +1020,7 @@ public function updateMany($filter, $update, array $options = []) $operation = new UpdateMany($this->databaseName, $this->collectionName, $filter, $update, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -1042,7 +1042,7 @@ public function updateOne($filter, $update, array $options = []) $operation = new UpdateOne($this->databaseName, $this->collectionName, $filter, $update, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -1059,7 +1059,7 @@ public function updateOne($filter, $update, array $options = []) public function updateSearchIndex(string $name, $definition, array $options = []): void { $operation = new UpdateSearchIndex($this->databaseName, $this->collectionName, $name, $definition, $options); - $server = select_server($this->manager, $options); + $server = select_server_for_write($this->manager, $options); $operation->execute($server); } diff --git a/src/Database.php b/src/Database.php index c4057356f..95933df8a 100644 --- a/src/Database.php +++ b/src/Database.php @@ -282,7 +282,7 @@ public function createCollection(string $collectionName, array $options = []) ? new CreateEncryptedCollection($this->databaseName, $collectionName, $options) : new CreateCollection($this->databaseName, $collectionName, $options); - $server = select_server($this->manager, $options); + $server = select_server_for_write($this->manager, $options); return $operation->execute($server); } @@ -318,7 +318,7 @@ public function createEncryptedCollection(string $collectionName, ClientEncrypti } $operation = new CreateEncryptedCollection($this->databaseName, $collectionName, $options); - $server = select_server($this->manager, $options); + $server = select_server_for_write($this->manager, $options); try { $operation->createDataKeys($clientEncryption, $kmsProvider, $masterKey, $encryptedFields); @@ -346,7 +346,7 @@ public function drop(array $options = []) $options['typeMap'] = $this->typeMap; } - $server = select_server($this->manager, $options); + $server = select_server_for_write($this->manager, $options); if (! isset($options['writeConcern']) && ! is_in_transaction($options)) { $options['writeConcern'] = $this->writeConcern; @@ -374,7 +374,7 @@ public function dropCollection(string $collectionName, array $options = []) $options['typeMap'] = $this->typeMap; } - $server = select_server($this->manager, $options); + $server = select_server_for_write($this->manager, $options); if (! isset($options['writeConcern']) && ! is_in_transaction($options)) { $options['writeConcern'] = $this->writeConcern; @@ -502,7 +502,7 @@ public function modifyCollection(string $collectionName, array $collectionOption $options['typeMap'] = $this->typeMap; } - $server = select_server($this->manager, $options); + $server = select_server_for_write($this->manager, $options); if (! isset($options['writeConcern']) && ! is_in_transaction($options)) { $options['writeConcern'] = $this->writeConcern; @@ -536,7 +536,7 @@ public function renameCollection(string $fromCollectionName, string $toCollectio $options['typeMap'] = $this->typeMap; } - $server = select_server($this->manager, $options); + $server = select_server_for_write($this->manager, $options); if (! isset($options['writeConcern']) && ! is_in_transaction($options)) { $options['writeConcern'] = $this->writeConcern; diff --git a/src/functions.php b/src/functions.php index 95c85380b..1f1d5f722 100644 --- a/src/functions.php +++ b/src/functions.php @@ -544,11 +544,11 @@ function with_transaction(Session $session, callable $callback, array $transacti */ function extract_session_from_options(array $options): ?Session { - if (! isset($options['session']) || ! $options['session'] instanceof Session) { - return null; + if (isset($options['session']) && $options['session'] instanceof Session) { + return $options['session']; } - return $options['session']; + return null; } /** @@ -558,16 +558,19 @@ function extract_session_from_options(array $options): ?Session */ function extract_read_preference_from_options(array $options): ?ReadPreference { - if (! isset($options['readPreference']) || ! $options['readPreference'] instanceof ReadPreference) { - return null; + if (isset($options['readPreference']) && $options['readPreference'] instanceof ReadPreference) { + return $options['readPreference']; } - return $options['readPreference']; + return null; } /** - * Performs server selection, respecting the readPreference and session options - * (if given) + * Performs server selection, respecting the readPreference and session options. + * + * The pinned server for an active transaction takes priority, followed by an + * operation-level read preference, followed by an active transaction's read + * preference, followed by a primary read preference. * * @internal */ @@ -575,16 +578,23 @@ function select_server(Manager $manager, array $options): Server { $session = extract_session_from_options($options); $server = $session instanceof Session ? $session->getServer() : null; + + // Pinned server for an active transaction takes priority if ($server !== null) { return $server; } + // Operation read preference takes priority $readPreference = extract_read_preference_from_options($options); - if (! $readPreference instanceof ReadPreference) { - // TODO: PHPLIB-476: Read transaction read preference once PHPC-1439 is implemented - $readPreference = new ReadPreference(ReadPreference::PRIMARY); + + // Read preference for an active transaction takes priority + if ($readPreference === null && $session instanceof Session && $session->isInTransaction()) { + /* Session::getTransactionOptions() should always return an array if the + * session is in a transaction, but we can be defensive. */ + $readPreference = extract_read_preference_from_options($session->getTransactionOptions() ?? []); } + // Manager::selectServer() defaults to a primary read preference return $manager->selectServer($readPreference); } @@ -601,7 +611,11 @@ function select_server_for_aggregate_write_stage(Manager $manager, array &$optio $readPreference = extract_read_preference_from_options($options); /* If there is either no read preference or a primary read preference, there - * is no special server selection logic to apply. */ + * is no special server selection logic to apply. + * + * Note: an alternative read preference could still be inherited from an + * active transaction's options, but we can rely on libmongoc to raise a + * "read preference in a transaction must be primary" error if necessary. */ if ($readPreference === null || $readPreference->getModeString() === ReadPreference::PRIMARY) { return select_server($manager, $options); } @@ -635,3 +649,18 @@ function select_server_for_aggregate_write_stage(Manager $manager, array &$optio return $server; } + +/** + * Performs server selection for a write operation. + * + * The pinned server for an active transaction takes priority, followed by an + * operation-level read preference, followed by a primary read preference. This + * is similar to select_server() except that it ignores a read preference from + * an active transaction's options. + * + * @internal + */ +function select_server_for_write(Manager $manager, array $options): Server +{ + return select_server($manager, $options + ['readPreference' => new ReadPreference(ReadPreference::PRIMARY)]); +} diff --git a/tests/Functions/SelectServerFunctionalTest.php b/tests/Functions/SelectServerFunctionalTest.php new file mode 100644 index 000000000..0a3b82a6d --- /dev/null +++ b/tests/Functions/SelectServerFunctionalTest.php @@ -0,0 +1,51 @@ +skipIfTransactionsAreNotSupported(); + + if (! $this->isShardedCluster()) { + $this->markTestSkipped('Pinning requires a sharded cluster'); + } + + if ($this->isLoadBalanced()) { + $this->markTestSkipped('libmongoc does not pin for load-balanced topology'); + } + + /* By default, the Manager under test is created with a single-mongos + * URI. Explicitly create a Client with multiple mongoses. */ + $client = static::createTestClient(static::getUri(true)); + + // Collection must be created before the transaction starts + $this->createCollection($this->getDatabaseName(), $this->getCollectionName()); + + $session = $client->startSession(); + $session->startTransaction(); + + $collection = $client->selectCollection($this->getDatabaseName(), $this->getCollectionName()); + $collection->find([], ['session' => $session]); + + $this->assertTrue($session->isInTransaction()); + $this->assertInstanceOf(Server::class, $session->getServer(), 'Session is pinned'); + $this->assertEquals($session->getServer(), select_server($client->getManager(), ['session' => $session])); + } + + public static function providePinnedOptions(): array + { + return [ + [['readPreference' => new ReadPreference(ReadPreference::PRIMARY_PREFERRED)]], + [[]], + ]; + } +}