Skip to content

Commit

Permalink
PHPLIB-1237 Implement Parallel Benchmarks (#1166)
Browse files Browse the repository at this point in the history
Parallel Benchmarks specs: LDJSON multi-file import
https://github.com/mongodb/specifications/blob/e09b41df206f9efaa36ba4c332c47d04ddb7d6d1/source/benchmarking/benchmarking.rst#ldjson-multi-file-import)

Implementations:
- Using Driver's BulkWrite in a single thread
- Using library's Collection::insertMany in a single thread
- Using multiple forked threads
- Using amphp/parallel-functions with worker pool

To get the fastest result:
- Reading files is done using `stream_get_line`
- Document insertion is done using Driver's BulkInsert
  • Loading branch information
GromNaN authored Sep 20, 2023
1 parent ec6c431 commit 82a6397
Show file tree
Hide file tree
Showing 23 changed files with 259 additions and 9 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,15 @@ jobs:
- name: "Install dependencies with Composer"
uses: "ramsey/[email protected]"
with:
composer-options: "--no-suggest"
composer-options: "--no-suggest --working-dir=./benchmark"

- name: "Run phpbench"
working-directory: "./benchmark"
run: "vendor/bin/phpbench run --report=aggregate --report=bar_chart_time --report=env --output html"

- name: Upload HTML report
uses: actions/upload-artifact@v3
with:
name: phpbench-${{ github.sha }}.html
path: .phpbench/html/index.html
path: ./benchmark/.phpbench/html/index.html
retention-days: 3
26 changes: 26 additions & 0 deletions benchmark/composer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"name": "mongodb/mongodb-benchmark",
"type": "project",
"repositories": [
{
"type": "path",
"url": "../",
"symlink": true
}
],
"require": {
"php": ">=8.1",
"ext-pcntl": "*",
"amphp/parallel-functions": "^1.1",
"mongodb/mongodb": "@dev",
"phpbench/phpbench": "^1.2"
},
"autoload": {
"psr-4": {
"MongoDB\\Benchmark\\": "src/"
}
},
"scripts": {
"benchmark": "phpbench run --report=aggregate"
}
}
2 changes: 1 addition & 1 deletion phpbench.json.dist → benchmark/phpbench.json.dist
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"runner.env_enabled_providers": ["mongodb","sampler","git","opcache","php","uname","unix_sysload"],
"runner.bootstrap": "vendor/autoload.php",
"runner.file_pattern": "*Bench.php",
"runner.path": "benchmark",
"runner.path": "src",
"runner.php_config": { "memory_limit": "1G" },
"runner.iterations": 3,
"runner.revs": 10
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
216 changes: 216 additions & 0 deletions benchmark/src/DriverBench/ParallelMultiFileImportBench.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
<?php

namespace MongoDB\Benchmark\DriverBench;

use Amp\Parallel\Worker\DefaultPool;
use Generator;
use MongoDB\Benchmark\Fixtures\Data;
use MongoDB\Benchmark\Utils;
use MongoDB\BSON\Document;
use MongoDB\Driver\BulkWrite;
use PhpBench\Attributes\AfterClassMethods;
use PhpBench\Attributes\BeforeClassMethods;
use PhpBench\Attributes\BeforeMethods;
use PhpBench\Attributes\Iterations;
use PhpBench\Attributes\ParamProviders;
use PhpBench\Attributes\Revs;
use RuntimeException;

use function Amp\ParallelFunctions\parallelMap;
use function Amp\Promise\wait;
use function array_map;
use function count;
use function fclose;
use function fgets;
use function file_get_contents;
use function file_put_contents;
use function fopen;
use function is_dir;
use function mkdir;
use function pcntl_fork;
use function pcntl_waitpid;
use function range;
use function sprintf;
use function str_repeat;
use function stream_get_line;
use function sys_get_temp_dir;
use function unlink;

/**
* For accurate results, run benchmarks on a standalone server.
*
* @see https://github.com/mongodb/specifications/blob/ddfc8b583d49aaf8c4c19fa01255afb66b36b92e/source/benchmarking/benchmarking.rst#ldjson-multi-file-import
*/
#[BeforeClassMethods('beforeClass')]
#[AfterClassMethods('afterClass')]
#[BeforeMethods('beforeIteration')]
#[Iterations(1)]
#[Revs(1)]
final class ParallelMultiFileImportBench
{
public static function beforeClass(): void
{
// Generate files
$fileContents = str_repeat(file_get_contents(Data::LDJSON_FILE_PATH), 5_000);
foreach (self::getFileNames() as $file) {
file_put_contents($file, $fileContents);
}
}

public static function afterClass(): void
{
foreach (self::getFileNames() as $file) {
unlink($file);
}
}

public function beforeIteration(): void
{
$database = Utils::getDatabase();
$database->drop();
$database->createCollection(Utils::getCollectionName());
}

/**
* Using Driver's BulkWrite in a single thread
*/
public function benchMultiFileImportBulkWrite(): void
{
foreach (self::getFileNames() as $file) {
self::importFile($file);
}
}

/**
* Using library's Collection::insertMany in a single thread
*/
public function benchMultiFileImportInsertMany(): void
{
$collection = Utils::getCollection();
foreach (self::getFileNames() as $file) {
$docs = [];
// Read file contents into BSON documents
$fh = fopen($file, 'r');
while (($line = fgets($fh)) !== false) {
if ($line !== '') {
$docs[] = Document::fromJSON($line);
}
}

fclose($fh);

// Insert documents in bulk
$collection->insertMany($docs);
}
}

/**
* Using multiple forked threads
*
* @param array{processes:int, files:string[], batchSize:int} $params
*/
#[ParamProviders(['provideProcessesParameter'])]
public function benchMultiFileImportFork(array $params): void
{
$pids = [];
foreach (self::getFileNames() as $file) {
// Wait for a child process to finish if we have reached the maximum number of processes
if (count($pids) >= $params['processes']) {
$pid = pcntl_waitpid(-1, $status);
unset($pids[$pid]);
}

$pid = pcntl_fork();
if ($pid === 0) {
// Reset to ensure that the existing libmongoc client (via the Manager) is not re-used by the child
// process. When the child process constructs a new Manager, the differing PID will result in creation
// of a new libmongoc client.
Utils::reset();
self::importFile($file);

// Exit the child process
exit(0);
}

if ($pid === -1) {
throw new RuntimeException('Failed to fork');
}

// Keep the forked process id to wait for it later
$pids[$pid] = true;
}

// Wait for all child processes to finish
while ($pids !== []) {
$pid = pcntl_waitpid(-1, $status);
unset($pids[$pid]);
}
}

/**
* Using amphp/parallel-functions with worker pool
*
* @param array{processes:int, files:string[], batchSize:int} $params
*/
#[ParamProviders(['provideProcessesParameter'])]
public function benchMultiFileImportAmp(array $params): void
{
wait(parallelMap(
self::getFileNames(),
// Uses array callable instead of closure to skip complex serialization
[self::class, 'importFile'],
// The pool size is the number of processes
new DefaultPool($params['processes']),
));
}

public static function provideProcessesParameter(): Generator
{
yield '1 proc' => ['processes' => 1]; // 100 sequences, to compare to the single thread baseline
yield '2 proc' => ['processes' => 2]; // 50 sequences
yield '4 proc' => ['processes' => 4]; // 25 sequences
yield '8 proc' => ['processes' => 8]; // 13 sequences
yield '13 proc' => ['processes' => 13]; // 8 sequences
yield '20 proc' => ['processes' => 20]; // 5 sequences
yield '34 proc' => ['processes' => 34]; // 3 sequences
}

/**
* We benchmarked the following solutions to read a file line by line:
* - file
* - SplFileObject
* - fgets
* - stream_get_line 🏆
*/
public static function importFile(string $file): void
{
$namespace = sprintf('%s.%s', Utils::getDatabaseName(), Utils::getCollectionName());

$bulkWrite = new BulkWrite();
$fh = fopen($file, 'r');
while (($line = stream_get_line($fh, 10_000, "\n")) !== false) {
$bulkWrite->insert(Document::fromJSON($line));
}

fclose($fh);
Utils::getClient()->getManager()->executeBulkWrite($namespace, $bulkWrite);
}

/**
* Using a method to regenerate the file names because we cannot cache the result of the method in a static
* property. The benchmark runner will call the method in a different process, so the static property will not be
* populated.
*/
private static function getFileNames(): array
{
$tempDir = sys_get_temp_dir() . '/mongodb-php-benchmark';
if (! is_dir($tempDir)) {
mkdir($tempDir);
}

return array_map(
static fn (int $i) => sprintf('%s/%03d.txt', $tempDir, $i),
range(0, 99),
);
}
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ final class Data
public const LARGE_FILE_PATH = __DIR__ . '/data/large_doc.json';
public const SMALL_FILE_PATH = __DIR__ . '/data/small_doc.json';
public const TWEET_FILE_PATH = __DIR__ . '/data/tweet.json';
public const LDJSON_FILE_PATH = __DIR__ . '/data/ldjson.json';

public static function readJsonFile(string $path): array
{
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
1 change: 1 addition & 0 deletions benchmark/src/Fixtures/data/ldjson.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"text":"@wildfits you're not getting one.....","in_reply_to_status_id":22773233453,"retweet_count":null,"contributors":null,"created_at":"Thu Sep 02 19:38:18 +0000 2010","geo":null,"source":"web","coordinates":null,"in_reply_to_screen_name":"wildfits","truncated":false,"entities":{"user_mentions":[{"indices":[0,9],"screen_name":"wildfits","name":"Mairin Goetzinger","id":41832464}],"urls":[],"hashtags":[]},"retweeted":false,"place":null,"user":{"friends_count":179,"profile_sidebar_fill_color":"7a7a7a","location":"Minneapols, MN/Brookings SD","verified":false,"follow_request_sent":null,"favourites_count":0,"profile_sidebar_border_color":"a3a3a3","profile_image_url":"http://a1.twimg.com/profile_images/1110614677/Screen_shot_2010-08-25_at_10.12.40_AM_normal.png","geo_enabled":false,"created_at":"Sun Aug 17 00:23:13 +0000 2008","description":"graphic designer + foodie, with a love of music, movies, running, design, + the outdoors!","time_zone":"Mountain Time (US & Canada)","url":"http://jessiefarris.com/","screen_name":"jessiekf","notifications":null,"profile_background_color":"303030","listed_count":1,"lang":"en"}}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
9 changes: 8 additions & 1 deletion benchmark/Utils.php → benchmark/src/Utils.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ final class Utils

public static function getClient(): Client
{
return self::$client ??= new Client(self::getUri());
return self::$client ??= new Client(self::getUri(), [], ['disableClientPersistence' => true]);
}

public static function getDatabase(): Database
Expand All @@ -43,4 +43,11 @@ public static function getCollectionName(): string
{
return 'perftest';
}

public static function reset(): void
{
self::$client = null;
self::$database = null;
self::$collection = null;
}
}
6 changes: 2 additions & 4 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
},
"require-dev": {
"doctrine/coding-standard": "^11.1",
"phpbench/phpbench": "^1.2",
"rector/rector": "^0.16.0",
"squizlabs/php_codesniffer": "^3.7",
"symfony/phpunit-bridge": "^5.2",
Expand All @@ -33,13 +32,12 @@
},
"autoload-dev": {
"psr-4": {
"MongoDB\\Tests\\": "tests/",
"MongoDB\\Benchmark\\": "benchmark/"
"MongoDB\\Tests\\": "tests/"
},
"files": [ "tests/PHPUnit/Functions.php" ]
},
"scripts": {
"benchmark": "phpbench run --report=aggregate",
"bench": "cd benchmark && composer update && vendor/bin/phpbench run --report=aggregate",
"checks": [
"@check:cs",
"@check:psalm",
Expand Down
2 changes: 1 addition & 1 deletion phpcs.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<!-- Ignore warnings (n), show progress of the run (p), and show sniff names (s) -->
<arg value="nps"/>

<file>benchmark</file>
<file>benchmark/src</file>
<file>src</file>
<file>docs/examples</file>
<file>examples</file>
Expand Down

0 comments on commit 82a6397

Please sign in to comment.