From c58bc0b68ec964ec2ff671caf211b20a852180b2 Mon Sep 17 00:00:00 2001 From: Jeremy Mikola Date: Mon, 2 Oct 2023 10:09:44 -0400 Subject: [PATCH] select_server_for_write() to avoid inheriting a transaction's RP option --- src/Client.php | 2 +- src/Collection.php | 40 ++++++++++++++++++++-------------------- src/Database.php | 12 ++++++------ src/functions.php | 21 ++++++++++++++++++++- 4 files changed, 47 insertions(+), 28 deletions(-) diff --git a/src/Client.php b/src/Client.php index 06ac91846..acdeef003 100644 --- a/src/Client.php +++ b/src/Client.php @@ -200,7 +200,7 @@ public function dropDatabase(string $databaseName, array $options = []) $options['typeMap'] = $this->typeMap; } - $server = select_server($this->manager, $options); + $server = select_server_for_write($this->manager, $options); if (! isset($options['writeConcern']) && ! is_in_transaction($options)) { $options['writeConcern'] = $this->writeConcern; diff --git a/src/Collection.php b/src/Collection.php index 0420be097..55b5c8a85 100644 --- a/src/Collection.php +++ b/src/Collection.php @@ -260,7 +260,7 @@ public function bulkWrite(array $operations, array $options = []) $operation = new BulkWrite($this->databaseName, $this->collectionName, $operations, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -362,7 +362,7 @@ public function createIndexes(array $indexes, array $options = []) $operation = new CreateIndexes($this->databaseName, $this->collectionName, $indexes, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -418,7 +418,7 @@ public function createSearchIndex($definition, array $options = []): string public function createSearchIndexes(array $indexes, array $options = []): array { $operation = new CreateSearchIndexes($this->databaseName, $this->collectionName, $indexes, $options); - $server = select_server($this->manager, $options); + $server = select_server_for_write($this->manager, $options); return $operation->execute($server); } @@ -441,7 +441,7 @@ public function deleteMany($filter, array $options = []) $operation = new DeleteMany($this->databaseName, $this->collectionName, $filter, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -462,7 +462,7 @@ public function deleteOne($filter, array $options = []) $operation = new DeleteOne($this->databaseName, $this->collectionName, $filter, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -503,7 +503,7 @@ public function drop(array $options = []) $options = $this->inheritWriteOptions($options); $options = $this->inheritTypeMap($options); - $server = select_server($this->manager, $options); + $server = select_server_for_write($this->manager, $options); if (! isset($options['encryptedFields'])) { $options['encryptedFields'] = get_encrypted_fields_from_driver($this->databaseName, $this->collectionName, $this->manager) @@ -541,7 +541,7 @@ public function dropIndex($indexName, array $options = []) $operation = new DropIndexes($this->databaseName, $this->collectionName, $indexName, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -561,7 +561,7 @@ public function dropIndexes(array $options = []) $operation = new DropIndexes($this->databaseName, $this->collectionName, '*', $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -577,7 +577,7 @@ public function dropIndexes(array $options = []) public function dropSearchIndex(string $name, array $options = []): void { $operation = new DropSearchIndex($this->databaseName, $this->collectionName, $name); - $server = select_server($this->manager, $options); + $server = select_server_for_write($this->manager, $options); $operation->execute($server); } @@ -690,7 +690,7 @@ public function findOneAndDelete($filter, array $options = []) $operation = new FindOneAndDelete($this->databaseName, $this->collectionName, $filter, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -720,7 +720,7 @@ public function findOneAndReplace($filter, $replacement, array $options = []) $operation = new FindOneAndReplace($this->databaseName, $this->collectionName, $filter, $replacement, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -750,7 +750,7 @@ public function findOneAndUpdate($filter, $update, array $options = []) $operation = new FindOneAndUpdate($this->databaseName, $this->collectionName, $filter, $update, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -854,7 +854,7 @@ public function insertMany(array $documents, array $options = []) $operation = new InsertMany($this->databaseName, $this->collectionName, $documents, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -875,7 +875,7 @@ public function insertOne($document, array $options = []) $operation = new InsertOne($this->databaseName, $this->collectionName, $document, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -949,7 +949,7 @@ public function mapReduce(JavascriptInterface $map, JavascriptInterface $reduce, $operation = new MapReduce($this->databaseName, $this->collectionName, $map, $reduce, $out, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -975,7 +975,7 @@ public function rename(string $toCollectionName, ?string $toDatabaseName = null, $operation = new RenameCollection($this->databaseName, $this->collectionName, $toDatabaseName, $toCollectionName, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -998,7 +998,7 @@ public function replaceOne($filter, $replacement, array $options = []) $operation = new ReplaceOne($this->databaseName, $this->collectionName, $filter, $replacement, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -1020,7 +1020,7 @@ public function updateMany($filter, $update, array $options = []) $operation = new UpdateMany($this->databaseName, $this->collectionName, $filter, $update, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -1042,7 +1042,7 @@ public function updateOne($filter, $update, array $options = []) $operation = new UpdateOne($this->databaseName, $this->collectionName, $filter, $update, $options); - return $operation->execute(select_server($this->manager, $options)); + return $operation->execute(select_server_for_write($this->manager, $options)); } /** @@ -1059,7 +1059,7 @@ public function updateOne($filter, $update, array $options = []) public function updateSearchIndex(string $name, $definition, array $options = []): void { $operation = new UpdateSearchIndex($this->databaseName, $this->collectionName, $name, $definition, $options); - $server = select_server($this->manager, $options); + $server = select_server_for_write($this->manager, $options); $operation->execute($server); } diff --git a/src/Database.php b/src/Database.php index c4057356f..95933df8a 100644 --- a/src/Database.php +++ b/src/Database.php @@ -282,7 +282,7 @@ public function createCollection(string $collectionName, array $options = []) ? new CreateEncryptedCollection($this->databaseName, $collectionName, $options) : new CreateCollection($this->databaseName, $collectionName, $options); - $server = select_server($this->manager, $options); + $server = select_server_for_write($this->manager, $options); return $operation->execute($server); } @@ -318,7 +318,7 @@ public function createEncryptedCollection(string $collectionName, ClientEncrypti } $operation = new CreateEncryptedCollection($this->databaseName, $collectionName, $options); - $server = select_server($this->manager, $options); + $server = select_server_for_write($this->manager, $options); try { $operation->createDataKeys($clientEncryption, $kmsProvider, $masterKey, $encryptedFields); @@ -346,7 +346,7 @@ public function drop(array $options = []) $options['typeMap'] = $this->typeMap; } - $server = select_server($this->manager, $options); + $server = select_server_for_write($this->manager, $options); if (! isset($options['writeConcern']) && ! is_in_transaction($options)) { $options['writeConcern'] = $this->writeConcern; @@ -374,7 +374,7 @@ public function dropCollection(string $collectionName, array $options = []) $options['typeMap'] = $this->typeMap; } - $server = select_server($this->manager, $options); + $server = select_server_for_write($this->manager, $options); if (! isset($options['writeConcern']) && ! is_in_transaction($options)) { $options['writeConcern'] = $this->writeConcern; @@ -502,7 +502,7 @@ public function modifyCollection(string $collectionName, array $collectionOption $options['typeMap'] = $this->typeMap; } - $server = select_server($this->manager, $options); + $server = select_server_for_write($this->manager, $options); if (! isset($options['writeConcern']) && ! is_in_transaction($options)) { $options['writeConcern'] = $this->writeConcern; @@ -536,7 +536,7 @@ public function renameCollection(string $fromCollectionName, string $toCollectio $options['typeMap'] = $this->typeMap; } - $server = select_server($this->manager, $options); + $server = select_server_for_write($this->manager, $options); if (! isset($options['writeConcern']) && ! is_in_transaction($options)) { $options['writeConcern'] = $this->writeConcern; diff --git a/src/functions.php b/src/functions.php index 02a9a5d9c..1f1d5f722 100644 --- a/src/functions.php +++ b/src/functions.php @@ -611,7 +611,11 @@ function select_server_for_aggregate_write_stage(Manager $manager, array &$optio $readPreference = extract_read_preference_from_options($options); /* If there is either no read preference or a primary read preference, there - * is no special server selection logic to apply. */ + * is no special server selection logic to apply. + * + * Note: an alternative read preference could still be inherited from an + * active transaction's options, but we can rely on libmongoc to raise a + * "read preference in a transaction must be primary" error if necessary. */ if ($readPreference === null || $readPreference->getModeString() === ReadPreference::PRIMARY) { return select_server($manager, $options); } @@ -645,3 +649,18 @@ function select_server_for_aggregate_write_stage(Manager $manager, array &$optio return $server; } + +/** + * Performs server selection for a write operation. + * + * The pinned server for an active transaction takes priority, followed by an + * operation-level read preference, followed by a primary read preference. This + * is similar to select_server() except that it ignores a read preference from + * an active transaction's options. + * + * @internal + */ +function select_server_for_write(Manager $manager, array $options): Server +{ + return select_server($manager, $options + ['readPreference' => new ReadPreference(ReadPreference::PRIMARY)]); +}