Skip to content

Commit

Permalink
Add bulkwrite single process benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
GromNaN committed Sep 20, 2023
1 parent 02d633f commit 158cf1d
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 35 deletions.
79 changes: 45 additions & 34 deletions benchmark/src/DriverBench/ParallelBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@
use MongoDB\Benchmark\Fixtures\Data;
use MongoDB\Benchmark\Utils;
use MongoDB\BSON\Document;
use MongoDB\Collection;
use MongoDB\Driver\BulkWrite;
use PhpBench\Attributes\AfterClassMethods;
use PhpBench\Attributes\BeforeClassMethods;
use PhpBench\Attributes\BeforeMethods;
use PhpBench\Attributes\Iterations;
use PhpBench\Attributes\ParamProviders;
use PhpBench\Attributes\Revs;
use RuntimeException;

use function Amp\ParallelFunctions\parallelMap;
use function Amp\Promise\wait;
use function array_map;
use function ceil;
use function count;
use function file;
use function file_get_contents;
Expand Down Expand Up @@ -46,9 +46,6 @@
#[AfterClassMethods('afterClass')]
final class ParallelBench
{
/** @var string[] */
private static array $files = [];

public static function beforeClass(): void
{
// Generate files
Expand All @@ -63,21 +60,40 @@ public static function afterClass(): void
foreach (self::getFileNames() as $file) {
unlink($file);
}
}

self::$files = [];
/**
* Parallel: LDJSON multi-file import
* Using Driver's BulkWrite in a single thread
*/
#[BeforeMethods('beforeMultiFileImport')]
#[Revs(1)]
#[Iterations(1)]
public function benchMultiFileImportBulkWrite(): void
{
foreach (self::getFileNames() as $file) {
self::importFile($file);
}
}

/**
* Parallel: LDJSON multi-file import
* Using single thread
* Using library's Collection::insertMany in a single thread
*/
#[BeforeMethods('beforeMultiFileImport')]
#[Revs(1)]
public function benchMultiFileImport(): void
#[Iterations(1)]
public function benchMultiFileImportInsertMany(): void
{
$collection = Utils::getCollection();
foreach (self::getFileNames() as $file) {
self::importFile($file, $collection);
// Read file contents into BSON documents
$docs = array_map(
static fn (string $line) => Document::fromJSON($line),
file($file, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES | FILE_NO_DEFAULT_CONTEXT),
);
// Insert documents in bulk
$collection->insertMany($docs);
}
}

Expand All @@ -90,6 +106,7 @@ public function benchMultiFileImport(): void
#[BeforeMethods('beforeMultiFileImport')]
#[ParamProviders(['provideProcessesParameter'])]
#[Revs(1)]
#[Iterations(1)]
public function benchMultiFileImportFork(array $params): void
{
$pids = [];
Expand All @@ -102,11 +119,11 @@ public function benchMultiFileImportFork(array $params): void

$pid = pcntl_fork();
if ($pid === 0) {
// If we reset, we can garantee that we get a new manager in the child process
// If we don't reset, we will get the same manager client_zval in the child process
// and share the libmongoc client.
// Reset to ensure that the existing libmongoc client (via the Manager) is not re-used by the child
// process. When the child process constructs a new Manager, the differing PID will result in creation
// of a new libmongoc client.
Utils::reset();
self::importFile($file, Utils::getCollection());
self::importFile($file);

// Exit the child process
exit(0);
Expand Down Expand Up @@ -136,6 +153,7 @@ public function benchMultiFileImportFork(array $params): void
#[BeforeMethods('beforeMultiFileImport')]
#[ParamProviders(['provideProcessesParameter'])]
#[Revs(1)]
#[Iterations(1)]
public function benchMultiFileImportAmp(array $params): void
{
wait(parallelMap(
Expand All @@ -149,10 +167,13 @@ public function benchMultiFileImportAmp(array $params): void

public static function provideProcessesParameter(): Generator
{
// Max number of forked processes
for ($i = 1; $i <= 30; $i = (int) ceil($i * 1.25)) {
yield $i . ' proc' => ['processes' => $i];
}
yield '1 proc' => ['processes' => 1]; // 100 sequences, to compare to the single thread baseline
yield '2 proc' => ['processes' => 2]; // 50 sequences
yield '4 proc' => ['processes' => 4]; // 25 sequences
yield '8 proc' => ['processes' => 8]; // 13 sequences
yield '13 proc' => ['processes' => 13]; // 8 sequences
yield '20 proc' => ['processes' => 20]; // 5 sequences
yield '34 proc' => ['processes' => 34]; // 3 sequences
}

public function beforeMultiFileImport(): void
Expand All @@ -162,26 +183,16 @@ public function beforeMultiFileImport(): void
$database->createCollection(Utils::getCollectionName());
}

public function afterMultiFileImport(): void
public static function importFile(string $file): void
{
foreach (self::$files as $file) {
unlink($file);
}
$namespace = sprintf('%s.%s', Utils::getDatabaseName(), Utils::getCollectionName());

unset($this->files);
}

public static function importFile(string $file, ?Collection $collection = null): void
{
$collection ??= Utils::getCollection();
$bulkWrite = new BulkWrite();
foreach (file($file, FILE_IGNORE_NEW_LINES | FILE_NO_DEFAULT_CONTEXT) as $line) {
$bulkWrite->insert(Document::fromJSON($line));
}

// Read file contents into BSON documents
$docs = array_map(
static fn (string $line) => Document::fromJSON($line),
file($file, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES | FILE_NO_DEFAULT_CONTEXT),
);
// Insert documents in bulk
$collection->insertMany($docs);
Utils::getClient()->getManager()->executeBulkWrite($namespace, $bulkWrite);
}

private static function getFileNames(): array
Expand Down
2 changes: 1 addition & 1 deletion benchmark/src/Utils.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ final class Utils

public static function getClient(): Client
{
return self::$client ??= new Client(self::getUri());
return self::$client ??= new Client(self::getUri(), ['disableClientPersistence' => true]);
}

public static function getDatabase(): Database
Expand Down
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"files": [ "tests/PHPUnit/Functions.php" ]
},
"scripts": {
"bench": "cd benchmark && composer update && vendor/bin/phpbench run --report=aggregate",
"checks": [
"@check:cs",
"@check:psalm",
Expand Down

0 comments on commit 158cf1d

Please sign in to comment.