Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PHPLIB-476: Consider transaction readPreference in select_server #1178

Merged
merged 2 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public function dropDatabase(string $databaseName, array $options = [])
$options['typeMap'] = $this->typeMap;
}

$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down
40 changes: 20 additions & 20 deletions src/Collection.php
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public function bulkWrite(array $operations, array $options = [])

$operation = new BulkWrite($this->databaseName, $this->collectionName, $operations, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand Down Expand Up @@ -362,7 +362,7 @@ public function createIndexes(array $indexes, array $options = [])

$operation = new CreateIndexes($this->databaseName, $this->collectionName, $indexes, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand Down Expand Up @@ -418,7 +418,7 @@ public function createSearchIndex($definition, array $options = []): string
public function createSearchIndexes(array $indexes, array $options = []): array
{
$operation = new CreateSearchIndexes($this->databaseName, $this->collectionName, $indexes, $options);
$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);
Copy link
Member

@GromNaN GromNaN Oct 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if that's relevant for search indexes.

These commands internally proxy the search index management commands to a separate process that runs alongside an Atlas cluster. As such, read concern and write concern are not relevant for the search index management commands.
https://github.com/mongodb/specifications/blob/db3114e957f7c0976a1af09882dbb46cb4a70049/source/index-management/index-management.rst#where-are-read-concern-and-write-concern

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm familiar with mongot, but does that mean mongod has absolutely not state when it comes to search indexes? Perhaps that's the case, as listSearchIndexes() is distinct from listIndexes().

This also raises the question of what happens if the search index operations are executed within a transaction. Presumably they don't maintain state on mongod at all (and thus operate outside of a transaction), but it's not clear to me if mongod would actually restrict their usage during an active transaction.

That said, I don't see the harm in requiring a primary in this case since it is a write operation from the user's perspective.


return $operation->execute($server);
}
Expand All @@ -441,7 +441,7 @@ public function deleteMany($filter, array $options = [])

$operation = new DeleteMany($this->databaseName, $this->collectionName, $filter, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -462,7 +462,7 @@ public function deleteOne($filter, array $options = [])

$operation = new DeleteOne($this->databaseName, $this->collectionName, $filter, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand Down Expand Up @@ -503,7 +503,7 @@ public function drop(array $options = [])
$options = $this->inheritWriteOptions($options);
$options = $this->inheritTypeMap($options);

$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

if (! isset($options['encryptedFields'])) {
$options['encryptedFields'] = get_encrypted_fields_from_driver($this->databaseName, $this->collectionName, $this->manager)
Expand Down Expand Up @@ -541,7 +541,7 @@ public function dropIndex($indexName, array $options = [])

$operation = new DropIndexes($this->databaseName, $this->collectionName, $indexName, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -561,7 +561,7 @@ public function dropIndexes(array $options = [])

$operation = new DropIndexes($this->databaseName, $this->collectionName, '*', $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -577,7 +577,7 @@ public function dropIndexes(array $options = [])
public function dropSearchIndex(string $name, array $options = []): void
{
$operation = new DropSearchIndex($this->databaseName, $this->collectionName, $name);
$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

$operation->execute($server);
}
Expand Down Expand Up @@ -690,7 +690,7 @@ public function findOneAndDelete($filter, array $options = [])

$operation = new FindOneAndDelete($this->databaseName, $this->collectionName, $filter, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand Down Expand Up @@ -720,7 +720,7 @@ public function findOneAndReplace($filter, $replacement, array $options = [])

$operation = new FindOneAndReplace($this->databaseName, $this->collectionName, $filter, $replacement, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand Down Expand Up @@ -750,7 +750,7 @@ public function findOneAndUpdate($filter, $update, array $options = [])

$operation = new FindOneAndUpdate($this->databaseName, $this->collectionName, $filter, $update, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand Down Expand Up @@ -854,7 +854,7 @@ public function insertMany(array $documents, array $options = [])

$operation = new InsertMany($this->databaseName, $this->collectionName, $documents, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -875,7 +875,7 @@ public function insertOne($document, array $options = [])

$operation = new InsertOne($this->databaseName, $this->collectionName, $document, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand Down Expand Up @@ -949,7 +949,7 @@ public function mapReduce(JavascriptInterface $map, JavascriptInterface $reduce,

$operation = new MapReduce($this->databaseName, $this->collectionName, $map, $reduce, $out, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -975,7 +975,7 @@ public function rename(string $toCollectionName, ?string $toDatabaseName = null,

$operation = new RenameCollection($this->databaseName, $this->collectionName, $toDatabaseName, $toCollectionName, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -998,7 +998,7 @@ public function replaceOne($filter, $replacement, array $options = [])

$operation = new ReplaceOne($this->databaseName, $this->collectionName, $filter, $replacement, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -1020,7 +1020,7 @@ public function updateMany($filter, $update, array $options = [])

$operation = new UpdateMany($this->databaseName, $this->collectionName, $filter, $update, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -1042,7 +1042,7 @@ public function updateOne($filter, $update, array $options = [])

$operation = new UpdateOne($this->databaseName, $this->collectionName, $filter, $update, $options);

return $operation->execute(select_server($this->manager, $options));
return $operation->execute(select_server_for_write($this->manager, $options));
}

/**
Expand All @@ -1059,7 +1059,7 @@ public function updateOne($filter, $update, array $options = [])
public function updateSearchIndex(string $name, $definition, array $options = []): void
{
$operation = new UpdateSearchIndex($this->databaseName, $this->collectionName, $name, $definition, $options);
$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

$operation->execute($server);
}
Expand Down
12 changes: 6 additions & 6 deletions src/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public function createCollection(string $collectionName, array $options = [])
? new CreateEncryptedCollection($this->databaseName, $collectionName, $options)
: new CreateCollection($this->databaseName, $collectionName, $options);

$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

return $operation->execute($server);
}
Expand Down Expand Up @@ -318,7 +318,7 @@ public function createEncryptedCollection(string $collectionName, ClientEncrypti
}

$operation = new CreateEncryptedCollection($this->databaseName, $collectionName, $options);
$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

try {
$operation->createDataKeys($clientEncryption, $kmsProvider, $masterKey, $encryptedFields);
Expand Down Expand Up @@ -346,7 +346,7 @@ public function drop(array $options = [])
$options['typeMap'] = $this->typeMap;
}

$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down Expand Up @@ -374,7 +374,7 @@ public function dropCollection(string $collectionName, array $options = [])
$options['typeMap'] = $this->typeMap;
}

$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down Expand Up @@ -502,7 +502,7 @@ public function modifyCollection(string $collectionName, array $collectionOption
$options['typeMap'] = $this->typeMap;
}

$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down Expand Up @@ -536,7 +536,7 @@ public function renameCollection(string $fromCollectionName, string $toCollectio
$options['typeMap'] = $this->typeMap;
}

$server = select_server($this->manager, $options);
$server = select_server_for_write($this->manager, $options);

if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
Expand Down
53 changes: 41 additions & 12 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -544,11 +544,11 @@ function with_transaction(Session $session, callable $callback, array $transacti
*/
function extract_session_from_options(array $options): ?Session
{
if (! isset($options['session']) || ! $options['session'] instanceof Session) {
return null;
if (isset($options['session']) && $options['session'] instanceof Session) {
return $options['session'];
}

return $options['session'];
return null;
}

/**
Expand All @@ -558,33 +558,43 @@ function extract_session_from_options(array $options): ?Session
*/
function extract_read_preference_from_options(array $options): ?ReadPreference
{
if (! isset($options['readPreference']) || ! $options['readPreference'] instanceof ReadPreference) {
return null;
if (isset($options['readPreference']) && $options['readPreference'] instanceof ReadPreference) {
return $options['readPreference'];
}

return $options['readPreference'];
return null;
}

/**
* Performs server selection, respecting the readPreference and session options
* (if given)
* Performs server selection, respecting the readPreference and session options.
*
* The pinned server for an active transaction takes priority, followed by an
* operation-level read preference, followed by an active transaction's read
* preference, followed by a primary read preference.
*
* @internal
*/
function select_server(Manager $manager, array $options): Server
{
$session = extract_session_from_options($options);
$server = $session instanceof Session ? $session->getServer() : null;

// Pinned server for an active transaction takes priority
if ($server !== null) {
return $server;
}

// Operation read preference takes priority
$readPreference = extract_read_preference_from_options($options);
if (! $readPreference instanceof ReadPreference) {
// TODO: PHPLIB-476: Read transaction read preference once PHPC-1439 is implemented
$readPreference = new ReadPreference(ReadPreference::PRIMARY);

// Read preference for an active transaction takes priority
if ($readPreference === null && $session instanceof Session && $session->isInTransaction()) {
/* Session::getTransactionOptions() should always return an array if the
* session is in a transaction, but we can be defensive. */
$readPreference = extract_read_preference_from_options($session->getTransactionOptions() ?? []);
}

// Manager::selectServer() defaults to a primary read preference
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since 1.11 (PHPC-1937).

return $manager->selectServer($readPreference);
}

Expand All @@ -601,7 +611,11 @@ function select_server_for_aggregate_write_stage(Manager $manager, array &$optio
$readPreference = extract_read_preference_from_options($options);

/* If there is either no read preference or a primary read preference, there
* is no special server selection logic to apply. */
* is no special server selection logic to apply.
*
* Note: an alternative read preference could still be inherited from an
* active transaction's options, but we can rely on libmongoc to raise a
* "read preference in a transaction must be primary" error if necessary. */
if ($readPreference === null || $readPreference->getModeString() === ReadPreference::PRIMARY) {
return select_server($manager, $options);
}
Expand Down Expand Up @@ -635,3 +649,18 @@ function select_server_for_aggregate_write_stage(Manager $manager, array &$optio

return $server;
}

/**
* Performs server selection for a write operation.
*
* The pinned server for an active transaction takes priority, followed by an
* operation-level read preference, followed by a primary read preference. This
* is similar to select_server() except that it ignores a read preference from
* an active transaction's options.
*
* @internal
*/
function select_server_for_write(Manager $manager, array $options): Server
Copy link
Member Author

@jmikola jmikola Oct 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quoting Transactions: readPreference:

The read preference to use for all read operations in this transaction.

The select_server_for_write() method here allows us to avoid considering a transaction's readPreference option for a known write command. I intentionally chose not to use this method for read methods that are typically run on primaries (e.g. collection and database enumeration). And it's not used for Database::command() sinec that's considered a read operation (see: runCommand spec).

It's clear we don't have comprehensive test coverage for this across all operations (for example, there's no test that tries to list databases within a transaction), but I'm glad that I was alerted to this oversight in some failing legacy transaction spec tests.

{
return select_server($manager, $options + ['readPreference' => new ReadPreference(ReadPreference::PRIMARY)]);
}
51 changes: 51 additions & 0 deletions tests/Functions/SelectServerFunctionalTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?php

namespace MongoDB\Tests\Functions;

use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\Server;
use MongoDB\Tests\FunctionalTestCase;

use function MongoDB\select_server;

class SelectServerFunctionalTest extends FunctionalTestCase
{
/** @dataProvider providePinnedOptions */
public function testSelectServerPrefersPinnedServer(array $options): void
{
$this->skipIfTransactionsAreNotSupported();

if (! $this->isShardedCluster()) {
$this->markTestSkipped('Pinning requires a sharded cluster');
}

if ($this->isLoadBalanced()) {
$this->markTestSkipped('libmongoc does not pin for load-balanced topology');
}

/* By default, the Manager under test is created with a single-mongos
* URI. Explicitly create a Client with multiple mongoses. */
$client = static::createTestClient(static::getUri(true));

// Collection must be created before the transaction starts
$this->createCollection($this->getDatabaseName(), $this->getCollectionName());

$session = $client->startSession();
$session->startTransaction();

$collection = $client->selectCollection($this->getDatabaseName(), $this->getCollectionName());
$collection->find([], ['session' => $session]);

$this->assertTrue($session->isInTransaction());
$this->assertInstanceOf(Server::class, $session->getServer(), 'Session is pinned');
$this->assertEquals($session->getServer(), select_server($client->getManager(), ['session' => $session]));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this test is still a bit dubious, as we don't have any insight into what Manager::selectServer() is actually doing. That may change with server selection logging (PHPLIB-1000).

Currently, there's a chance that server selection just happens to select the same server that was pinned (see: Topology type: Sharded in the Server Selection spec). Supplying an alternative read preference in the operation-level options doesn't really affect this, since it wouldn't influence mongos selection. Again, logging would help verify that.

Having said all that, I think adding a functional test is a good first step since we previously had no direct test coverage for this function. This file at least provides a starting point to add more coverage down the line. I've left a comment in PHPLIB-1000 to that effect.

}

public static function providePinnedOptions(): array
{
return [
[['readPreference' => new ReadPreference(ReadPreference::PRIMARY_PREFERRED)]],
[[]],
];
}
}