Skip to content

Commit

Permalink
PHPLIB-476: Consider transaction readPreference in select_server (#1178)
Browse files Browse the repository at this point in the history
This also refactors the conditionals in extract_session_from_options and extract_read_preference_from_options to improve readability.

select_server() previously did not consider the read preference of an active transaction. This isn't very significant, as transactions require a primary read preference, but it is correct to do so.

This also introduces select_server_for_write() to avoid inheriting an active transaction's readPreference option.
  • Loading branch information
jmikola authored Oct 4, 2023
1 parent 74d19f8 commit f32542d
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 39 deletions.
2 changes: 1 addition & 1 deletion src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public function dropDatabase(string $databaseName, array $options = [])
$options['typeMap'] = $this->typeMap;
}

$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down
40 changes: 20 additions & 20 deletions src/Collection.php
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public function bulkWrite(array $operations, array $options = [])

$operation = new BulkWrite($this->databaseName, $this->collectionName, $operations, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand Down Expand Up @@ -362,7 +362,7 @@ public function createIndexes(array $indexes, array $options = [])

$operation = new CreateIndexes($this->databaseName, $this->collectionName, $indexes, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand Down Expand Up @@ -418,7 +418,7 @@ public function createSearchIndex($definition, array $options = []): string
public function createSearchIndexes(array $indexes, array $options = []): array
{
$operation = new CreateSearchIndexes($this->databaseName, $this->collectionName, $indexes, $options);
$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

return $operation->execute($server);
}
Expand All @@ -441,7 +441,7 @@ public function deleteMany($filter, array $options = [])

$operation = new DeleteMany($this->databaseName, $this->collectionName, $filter, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -462,7 +462,7 @@ public function deleteOne($filter, array $options = [])

$operation = new DeleteOne($this->databaseName, $this->collectionName, $filter, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand Down Expand Up @@ -503,7 +503,7 @@ public function drop(array $options = [])
$options = $this->inheritWriteOptions($options);
$options = $this->inheritTypeMap($options);

$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

if (! isset($options['encryptedFields'])) {
$options['encryptedFields'] = get_encrypted_fields_from_driver($this->databaseName, $this->collectionName, $this->manager)
Expand Down Expand Up @@ -541,7 +541,7 @@ public function dropIndex($indexName, array $options = [])

$operation = new DropIndexes($this->databaseName, $this->collectionName, $indexName, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -561,7 +561,7 @@ public function dropIndexes(array $options = [])

$operation = new DropIndexes($this->databaseName, $this->collectionName, '*', $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -577,7 +577,7 @@ public function dropIndexes(array $options = [])
public function dropSearchIndex(string $name, array $options = []): void
{
$operation = new DropSearchIndex($this->databaseName, $this->collectionName, $name);
$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

$operation->execute($server);
}
Expand Down Expand Up @@ -690,7 +690,7 @@ public function findOneAndDelete($filter, array $options = [])

$operation = new FindOneAndDelete($this->databaseName, $this->collectionName, $filter, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand Down Expand Up @@ -720,7 +720,7 @@ public function findOneAndReplace($filter, $replacement, array $options = [])

$operation = new FindOneAndReplace($this->databaseName, $this->collectionName, $filter, $replacement, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand Down Expand Up @@ -750,7 +750,7 @@ public function findOneAndUpdate($filter, $update, array $options = [])

$operation = new FindOneAndUpdate($this->databaseName, $this->collectionName, $filter, $update, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand Down Expand Up @@ -854,7 +854,7 @@ public function insertMany(array $documents, array $options = [])

$operation = new InsertMany($this->databaseName, $this->collectionName, $documents, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -875,7 +875,7 @@ public function insertOne($document, array $options = [])

$operation = new InsertOne($this->databaseName, $this->collectionName, $document, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand Down Expand Up @@ -949,7 +949,7 @@ public function mapReduce(JavascriptInterface $map, JavascriptInterface $reduce,

$operation = new MapReduce($this->databaseName, $this->collectionName, $map, $reduce, $out, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -975,7 +975,7 @@ public function rename(string $toCollectionName, ?string $toDatabaseName = null,

$operation = new RenameCollection($this->databaseName, $this->collectionName, $toDatabaseName, $toCollectionName, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -998,7 +998,7 @@ public function replaceOne($filter, $replacement, array $options = [])

$operation = new ReplaceOne($this->databaseName, $this->collectionName, $filter, $replacement, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -1020,7 +1020,7 @@ public function updateMany($filter, $update, array $options = [])

$operation = new UpdateMany($this->databaseName, $this->collectionName, $filter, $update, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -1042,7 +1042,7 @@ public function updateOne($filter, $update, array $options = [])

$operation = new UpdateOne($this->databaseName, $this->collectionName, $filter, $update, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -1059,7 +1059,7 @@ public function updateOne($filter, $update, array $options = [])
public function updateSearchIndex(string $name, $definition, array $options = []): void
{
$operation = new UpdateSearchIndex($this->databaseName, $this->collectionName, $name, $definition, $options);
$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

$operation->execute($server);
}
Expand Down
12 changes: 6 additions & 6 deletions src/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public function createCollection(string $collectionName, array $options = [])
? new CreateEncryptedCollection($this->databaseName, $collectionName, $options)
: new CreateCollection($this->databaseName, $collectionName, $options);

$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

return $operation->execute($server);
}
Expand Down Expand Up @@ -318,7 +318,7 @@ public function createEncryptedCollection(string $collectionName, ClientEncrypti
}

$operation = new CreateEncryptedCollection($this->databaseName, $collectionName, $options);
$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

try {
$operation->createDataKeys($clientEncryption, $kmsProvider, $masterKey, $encryptedFields);
Expand Down Expand Up @@ -346,7 +346,7 @@ public function drop(array $options = [])
$options['typeMap'] = $this->typeMap;
}

$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down Expand Up @@ -374,7 +374,7 @@ public function dropCollection(string $collectionName, array $options = [])
$options['typeMap'] = $this->typeMap;
}

$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down Expand Up @@ -502,7 +502,7 @@ public function modifyCollection(string $collectionName, array $collectionOption
$options['typeMap'] = $this->typeMap;
}

$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down Expand Up @@ -536,7 +536,7 @@ public function renameCollection(string $fromCollectionName, string $toCollectio
$options['typeMap'] = $this->typeMap;
}

$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down
53 changes: 41 additions & 12 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -544,11 +544,11 @@ function with_transaction(Session $session, callable $callback, array $transacti
*/
function extract_session_from_options(array $options): ?Session
{
if (! isset($options['session']) || ! $options['session'] instanceof Session) {
return null;
if (isset($options['session']) && $options['session'] instanceof Session) {
return $options['session'];
}

return $options['session'];
return null;
}

/**
Expand All @@ -558,33 +558,43 @@ function extract_session_from_options(array $options): ?Session
*/
function extract_read_preference_from_options(array $options): ?ReadPreference
{
if (! isset($options['readPreference']) || ! $options['readPreference'] instanceof ReadPreference) {
return null;
if (isset($options['readPreference']) && $options['readPreference'] instanceof ReadPreference) {
return $options['readPreference'];
}

return $options['readPreference'];
return null;
}

/**
* Performs server selection, respecting the readPreference and session options
* (if given)
* Performs server selection, respecting the readPreference and session options.
*
* The pinned server for an active transaction takes priority, followed by an
* operation-level read preference, followed by an active transaction's read
* preference, followed by a primary read preference.
*
* @internal
*/
function select_server(Manager $manager, array $options): Server
{
$session = extract_session_from_options($options);
$server = $session instanceof Session ? $session->getServer() : null;

// Pinned server for an active transaction takes priority
if ($server !== null) {
return $server;
}

// Operation read preference takes priority
$readPreference = extract_read_preference_from_options($options);
if (! $readPreference instanceof ReadPreference) {
// TODO: PHPLIB-476: Read transaction read preference once PHPC-1439 is implemented
$readPreference = new ReadPreference(ReadPreference::PRIMARY);

// Read preference for an active transaction takes priority
if ($readPreference === null && $session instanceof Session && $session->isInTransaction()) {
/* Session::getTransactionOptions() should always return an array if the
* session is in a transaction, but we can be defensive. */
$readPreference = extract_read_preference_from_options($session->getTransactionOptions() ?? []);
}

// Manager::selectServer() defaults to a primary read preference
return $manager->selectServer($readPreference);
}

Expand All @@ -601,7 +611,11 @@ function select_server_for_aggregate_write_stage(Manager $manager, array &$optio
$readPreference = extract_read_preference_from_options($options);

/* If there is either no read preference or a primary read preference, there
* is no special server selection logic to apply. */
* is no special server selection logic to apply.
*
* Note: an alternative read preference could still be inherited from an
* active transaction's options, but we can rely on libmongoc to raise a
* "read preference in a transaction must be primary" error if necessary. */
if ($readPreference === null || $readPreference->getModeString() === ReadPreference::PRIMARY) {
return select_server($manager, $options);
}
Expand Down Expand Up @@ -635,3 +649,18 @@ function select_server_for_aggregate_write_stage(Manager $manager, array &$optio

return $server;
}

/**
* Performs server selection for a write operation.
*
* The pinned server for an active transaction takes priority, followed by an
* operation-level read preference, followed by a primary read preference. This
* is similar to select_server() except that it ignores a read preference from
* an active transaction's options.
*
* @internal
*/
function select_server_for_write(Manager $manager, array $options): Server
{
return select_server($manager, $options + ['readPreference' => new ReadPreference(ReadPreference::PRIMARY)]);
}
51 changes: 51 additions & 0 deletions tests/Functions/SelectServerFunctionalTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?php

namespace MongoDB\Tests\Functions;

use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\Server;
use MongoDB\Tests\FunctionalTestCase;

use function MongoDB\select_server;

class SelectServerFunctionalTest extends FunctionalTestCase
{
/** @dataProvider providePinnedOptions */
public function testSelectServerPrefersPinnedServer(array $options): void
{
$this->skipIfTransactionsAreNotSupported();

if (! $this->isShardedCluster()) {
$this->markTestSkipped('Pinning requires a sharded cluster');
}

if ($this->isLoadBalanced()) {
$this->markTestSkipped('libmongoc does not pin for load-balanced topology');
}

/* By default, the Manager under test is created with a single-mongos
* URI. Explicitly create a Client with multiple mongoses. */
$client = static::createTestClient(static::getUri(true));

// Collection must be created before the transaction starts
$this->createCollection($this->getDatabaseName(), $this->getCollectionName());

$session = $client->startSession();
$session->startTransaction();

$collection = $client->selectCollection($this->getDatabaseName(), $this->getCollectionName());
$collection->find([], ['session' => $session]);

$this->assertTrue($session->isInTransaction());
$this->assertInstanceOf(Server::class, $session->getServer(), 'Session is pinned');
$this->assertEquals($session->getServer(), select_server($client->getManager(), ['session' => $session]));
}

public static function providePinnedOptions(): array
{
return [
[['readPreference' => new ReadPreference(ReadPreference::PRIMARY_PREFERRED)]],
[[]],
];
}
}

0 comments on commit f32542d

Please sign in to comment.