Skip to content

Commit

Permalink
select_server_for_write() to avoid inheriting a transaction's RP option
Browse files Browse the repository at this point in the history
  • Loading branch information
jmikola committed Oct 2, 2023
1 parent 837847c commit c58bc0b
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 28 deletions.
2 changes: 1 addition & 1 deletion src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public function dropDatabase(string $databaseName, array $options = [])
$options['typeMap'] = $this->typeMap;
}

$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down
40 changes: 20 additions & 20 deletions src/Collection.php
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public function bulkWrite(array $operations, array $options = [])

$operation = new BulkWrite($this->databaseName, $this->collectionName, $operations, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand Down Expand Up @@ -362,7 +362,7 @@ public function createIndexes(array $indexes, array $options = [])

$operation = new CreateIndexes($this->databaseName, $this->collectionName, $indexes, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand Down Expand Up @@ -418,7 +418,7 @@ public function createSearchIndex($definition, array $options = []): string
public function createSearchIndexes(array $indexes, array $options = []): array
{
$operation = new CreateSearchIndexes($this->databaseName, $this->collectionName, $indexes, $options);
$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

return $operation->execute($server);
}
Expand All @@ -441,7 +441,7 @@ public function deleteMany($filter, array $options = [])

$operation = new DeleteMany($this->databaseName, $this->collectionName, $filter, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -462,7 +462,7 @@ public function deleteOne($filter, array $options = [])

$operation = new DeleteOne($this->databaseName, $this->collectionName, $filter, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand Down Expand Up @@ -503,7 +503,7 @@ public function drop(array $options = [])
$options = $this->inheritWriteOptions($options);
$options = $this->inheritTypeMap($options);

$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

if (! isset($options['encryptedFields'])) {
$options['encryptedFields'] = get_encrypted_fields_from_driver($this->databaseName, $this->collectionName, $this->manager)
Expand Down Expand Up @@ -541,7 +541,7 @@ public function dropIndex($indexName, array $options = [])

$operation = new DropIndexes($this->databaseName, $this->collectionName, $indexName, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -561,7 +561,7 @@ public function dropIndexes(array $options = [])

$operation = new DropIndexes($this->databaseName, $this->collectionName, '*', $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -577,7 +577,7 @@ public function dropIndexes(array $options = [])
public function dropSearchIndex(string $name, array $options = []): void
{
$operation = new DropSearchIndex($this->databaseName, $this->collectionName, $name);
$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

$operation->execute($server);
}
Expand Down Expand Up @@ -690,7 +690,7 @@ public function findOneAndDelete($filter, array $options = [])

$operation = new FindOneAndDelete($this->databaseName, $this->collectionName, $filter, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand Down Expand Up @@ -720,7 +720,7 @@ public function findOneAndReplace($filter, $replacement, array $options = [])

$operation = new FindOneAndReplace($this->databaseName, $this->collectionName, $filter, $replacement, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand Down Expand Up @@ -750,7 +750,7 @@ public function findOneAndUpdate($filter, $update, array $options = [])

$operation = new FindOneAndUpdate($this->databaseName, $this->collectionName, $filter, $update, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand Down Expand Up @@ -854,7 +854,7 @@ public function insertMany(array $documents, array $options = [])

$operation = new InsertMany($this->databaseName, $this->collectionName, $documents, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -875,7 +875,7 @@ public function insertOne($document, array $options = [])

$operation = new InsertOne($this->databaseName, $this->collectionName, $document, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand Down Expand Up @@ -949,7 +949,7 @@ public function mapReduce(JavascriptInterface $map, JavascriptInterface $reduce,

$operation = new MapReduce($this->databaseName, $this->collectionName, $map, $reduce, $out, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -975,7 +975,7 @@ public function rename(string $toCollectionName, ?string $toDatabaseName = null,

$operation = new RenameCollection($this->databaseName, $this->collectionName, $toDatabaseName, $toCollectionName, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -998,7 +998,7 @@ public function replaceOne($filter, $replacement, array $options = [])

$operation = new ReplaceOne($this->databaseName, $this->collectionName, $filter, $replacement, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -1020,7 +1020,7 @@ public function updateMany($filter, $update, array $options = [])

$operation = new UpdateMany($this->databaseName, $this->collectionName, $filter, $update, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -1042,7 +1042,7 @@ public function updateOne($filter, $update, array $options = [])

$operation = new UpdateOne($this->databaseName, $this->collectionName, $filter, $update, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -1059,7 +1059,7 @@ public function updateOne($filter, $update, array $options = [])
public function updateSearchIndex(string $name, $definition, array $options = []): void
{
$operation = new UpdateSearchIndex($this->databaseName, $this->collectionName, $name, $definition, $options);
$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

$operation->execute($server);
}
Expand Down
12 changes: 6 additions & 6 deletions src/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public function createCollection(string $collectionName, array $options = [])
? new CreateEncryptedCollection($this->databaseName, $collectionName, $options)
: new CreateCollection($this->databaseName, $collectionName, $options);

$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

return $operation->execute($server);
}
Expand Down Expand Up @@ -318,7 +318,7 @@ public function createEncryptedCollection(string $collectionName, ClientEncrypti
}

$operation = new CreateEncryptedCollection($this->databaseName, $collectionName, $options);
$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

try {
$operation->createDataKeys($clientEncryption, $kmsProvider, $masterKey, $encryptedFields);
Expand Down Expand Up @@ -346,7 +346,7 @@ public function drop(array $options = [])
$options['typeMap'] = $this->typeMap;
}

$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down Expand Up @@ -374,7 +374,7 @@ public function dropCollection(string $collectionName, array $options = [])
$options['typeMap'] = $this->typeMap;
}

$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down Expand Up @@ -502,7 +502,7 @@ public function modifyCollection(string $collectionName, array $collectionOption
$options['typeMap'] = $this->typeMap;
}

$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down Expand Up @@ -536,7 +536,7 @@ public function renameCollection(string $fromCollectionName, string $toCollectio
$options['typeMap'] = $this->typeMap;
}

$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down
21 changes: 20 additions & 1 deletion src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,11 @@ function select_server_for_aggregate_write_stage(Manager $manager, array &$optio
$readPreference = extract_read_preference_from_options($options);

/* If there is either no read preference or a primary read preference, there
* is no special server selection logic to apply. */
* is no special server selection logic to apply.
*
* Note: an alternative read preference could still be inherited from an
* active transaction's options, but we can rely on libmongoc to raise a
* "read preference in a transaction must be primary" error if necessary. */
if ($readPreference === null || $readPreference->getModeString() === ReadPreference::PRIMARY) {
return select_server($manager, $options);
}
Expand Down Expand Up @@ -645,3 +649,18 @@ function select_server_for_aggregate_write_stage(Manager $manager, array &$optio

return $server;
}

/**
* Performs server selection for a write operation.
*
* The pinned server for an active transaction takes priority, followed by an
* operation-level read preference, followed by a primary read preference. This
* is similar to select_server() except that it ignores a read preference from
* an active transaction's options.
*
* @internal
*/
function select_server_for_write(Manager $manager, array $options): Server
{
return select_server($manager, $options + ['readPreference' => new ReadPreference(ReadPreference::PRIMARY)]);
}

0 comments on commit c58bc0b

Please sign in to comment.