diff --git a/benchmark/composer.json b/benchmark/composer.json index bb8e1718c..17fda3ef0 100644 --- a/benchmark/composer.json +++ b/benchmark/composer.json @@ -15,7 +15,7 @@ "require": { "php": ">=8.1", "ext-pcntl": "*", - "amphp/parallel-functions": "^1.1", + "amphp/parallel": "^2.2", "mongodb/mongodb": "@dev", "phpbench/phpbench": "^1.2" }, @@ -26,5 +26,8 @@ }, "scripts": { "benchmark": "phpbench run --report=aggregate" + }, + "config": { + "sort-packages": true } } diff --git a/benchmark/src/DriverBench/Amp/ExportFileTask.php b/benchmark/src/DriverBench/Amp/ExportFileTask.php new file mode 100644 index 000000000..8600cd671 --- /dev/null +++ b/benchmark/src/DriverBench/Amp/ExportFileTask.php @@ -0,0 +1,25 @@ +files, $this->filter, $this->options); + + return $this->files; + } +} diff --git a/benchmark/src/DriverBench/Amp/ImportFileTask.php b/benchmark/src/DriverBench/Amp/ImportFileTask.php new file mode 100644 index 000000000..8db174409 --- /dev/null +++ b/benchmark/src/DriverBench/Amp/ImportFileTask.php @@ -0,0 +1,23 @@ +files); + + return $this->files; + } +} diff --git a/benchmark/src/DriverBench/BSONMicroBench.php b/benchmark/src/DriverBench/BSONMicroBench.php index 33eb6fbcb..a1840c925 100644 --- a/benchmark/src/DriverBench/BSONMicroBench.php +++ b/benchmark/src/DriverBench/BSONMicroBench.php @@ -16,6 +16,7 @@ */ final class BSONMicroBench { + /** @param array{document:Document} $params */ #[ParamProviders('provideParams')] public function benchEncoding(array $params): void { @@ -25,12 +26,13 @@ public function benchEncoding(array $params): void } } + /** @param array{bson:string} $params */ #[ParamProviders('provideParams')] public function benchDecoding(array $params): void { - $document = $params['document']; + $bson = $params['bson']; for ($i = 0; $i < 10_000; $i++) { - Document::fromBSON($document); + Document::fromBSON($bson); } } diff --git a/benchmark/src/DriverBench/ParallelMultiFileExportBench.php b/benchmark/src/DriverBench/ParallelMultiFileExportBench.php new file mode 100644 index 000000000..e6aaabba4 --- /dev/null +++ b/benchmark/src/DriverBench/ParallelMultiFileExportBench.php @@ -0,0 +1,225 @@ +drop(); + + $doc = Document::fromJSON(file_get_contents(Data::LDJSON_FILE_PATH)); + Utils::getCollection()->insertMany(array_fill(0, 500_000, $doc)); + } + + public static function afterClass(): void + { + Utils::getDatabase()->drop(); + } + + public function afterIteration(): void + { + foreach (self::getFileNames() as $file) { + if (file_exists($file)) { + unlink($file); + } + } + } + + /** + * Using a single thread to export multiple files. + * By executing a single Find command for multiple files, we can reduce the number of roundtrips to the server. + * + * @param array{chunk:int} $params + */ + #[ParamProviders(['provideChunkParams'])] + public function benchSequential(array $params): void + { + foreach (array_chunk(self::getFileNames(), $params['chunk']) as $i => $files) { + self::exportFile($files, [], [ + 'limit' => 5_000 * $params['chunk'], + 'skip' => 5_000 * $params['chunk'] * $i, + ]); + } + } + + /** + * Using multiple forked threads + * + * @param array{chunk:int} $params + */ + #[ParamProviders(['provideChunkParams'])] + public function benchFork(array $params): void + { + $pids = []; + + // Reset to ensure that the existing libmongoc client (via the Manager) is not re-used by the child + // process. When the child process constructs a new Manager, the differing PID will result in creation + // of a new libmongoc client. + Utils::reset(); + + // Create a child process for each chunk of files + foreach (array_chunk(self::getFileNames(), $params['chunk']) as $i => $files) { + $pid = pcntl_fork(); + if ($pid === 0) { + self::exportFile($files, [], [ + 'limit' => 5_000 * $params['chunk'], + 'skip' => 5_000 * $params['chunk'] * $i, + ]); + + // Exit the child process + exit(0); + } + + if ($pid === -1) { + throw new RuntimeException('Failed to fork'); + } + + // Keep the forked process id to wait for it later + $pids[$pid] = true; + } + + // Wait for all child processes to finish + while ($pids !== []) { + $pid = pcntl_waitpid(-1, $status); + unset($pids[$pid]); + } + } + + /** + * Using amphp/parallel with worker pool + * + * @param array{chunk:int} $params + */ + #[ParamProviders(['provideChunkParams'])] + public function benchAmpWorkers(array $params): void + { + $workerPool = new ContextWorkerPool(ceil(100 / $params['chunk']), new ContextWorkerFactory()); + + $futures = []; + foreach (array_chunk(self::getFileNames(), $params['chunk']) as $i => $files) { + $futures[] = $workerPool->submit( + new ExportFileTask( + files: $files, + options: [ + 'limit' => 5_000 * $params['chunk'], + 'skip' => 5_000 * $params['chunk'] * $i, + ], + ), + )->getFuture(); + } + + foreach (Future::iterate($futures) as $future) { + $future->await(); + } + } + + public static function provideChunkParams(): Generator + { + yield 'by 1' => ['chunk' => 1]; + yield 'by 2' => ['chunk' => 2]; + yield 'by 4' => ['chunk' => 4]; + yield 'by 8' => ['chunk' => 8]; + yield 'by 13' => ['chunk' => 13]; + yield 'by 20' => ['chunk' => 20]; + yield 'by 100' => ['chunk' => 100]; + } + + /** + * Export a query to a file + */ + public static function exportFile(array|string $files, array $filter = [], array $options = []): void + { + $options += [ + // bson typemap is faster on query result, but slower to JSON encode + 'typeMap' => ['root' => 'array'], + // Excludes _id field to be identical to fixtures data + 'projection' => ['_id' => 0], + 'sort' => ['_id' => 1], + ]; + $cursor = Utils::getCollection()->find($filter, $options); + $cursor->rewind(); + + foreach ((array) $files as $file) { + // Aggregate file in memory to reduce filesystem operations + $data = ''; + for ($i = 0; $i < 5_000; $i++) { + $document = $cursor->current(); + // Cursor exhausted + if (! $document) { + break; + } + + // We don't use MongoDB\BSON\Document::toCanonicalExtendedJSON() because + // it is slower than json_encode() on an array. + $data .= json_encode($document) . "\n"; + $cursor->next(); + } + + // Write file in a single operation + file_put_contents($file, $data); + } + } + + /** + * Using a method to regenerate the file names because we cannot cache the result of the method in a static + * property. The benchmark runner will call the method in a different process, so the static property will not be + * populated. + */ + private static function getFileNames(): array + { + $tempDir = sys_get_temp_dir() . '/mongodb-php-benchmark'; + if (! is_dir($tempDir)) { + mkdir($tempDir); + } + + return array_map( + static fn (int $i) => sprintf('%s/%03d.txt', $tempDir, $i), + range(0, 99), + ); + } +} diff --git a/benchmark/src/DriverBench/ParallelMultiFileImportBench.php b/benchmark/src/DriverBench/ParallelMultiFileImportBench.php index 159462c8e..cba5fcbd5 100644 --- a/benchmark/src/DriverBench/ParallelMultiFileImportBench.php +++ b/benchmark/src/DriverBench/ParallelMultiFileImportBench.php @@ -2,8 +2,11 @@ namespace MongoDB\Benchmark\DriverBench; -use Amp\Parallel\Worker\DefaultPool; +use Amp\Future; +use Amp\Parallel\Worker\ContextWorkerFactory; +use Amp\Parallel\Worker\ContextWorkerPool; use Generator; +use MongoDB\Benchmark\DriverBench\Amp\ImportFileTask; use MongoDB\Benchmark\Fixtures\Data; use MongoDB\Benchmark\Utils; use MongoDB\BSON\Document; @@ -16,10 +19,9 @@ use PhpBench\Attributes\Revs; use RuntimeException; -use function Amp\ParallelFunctions\parallelMap; -use function Amp\Promise\wait; +use function array_chunk; use function array_map; -use function count; +use function ceil; use function fclose; use function fgets; use function file_get_contents; @@ -72,61 +74,64 @@ public function beforeIteration(): void } /** - * Using Driver's BulkWrite in a single thread + * Using Driver's BulkWrite in a single thread. + * The number of files to import in each iteration is controlled by the "chunk" parameter. + * + * @param array{chunk:int} $params */ - public function benchMultiFileImportBulkWrite(): void + #[ParamProviders(['provideChunkParams'])] + public function benchBulkWrite(array $params): void { - foreach (self::getFileNames() as $file) { - self::importFile($file); + foreach (array_chunk(self::getFileNames(), $params['chunk']) as $files) { + self::importFile($files); } } /** * Using library's Collection::insertMany in a single thread */ - public function benchMultiFileImportInsertMany(): void + public function benchInsertMany(): void { $collection = Utils::getCollection(); foreach (self::getFileNames() as $file) { $docs = []; // Read file contents into BSON documents $fh = fopen($file, 'r'); - while (($line = fgets($fh)) !== false) { - if ($line !== '') { - $docs[] = Document::fromJSON($line); + try { + while (($line = fgets($fh)) !== false) { + if ($line !== '') { + $docs[] = Document::fromJSON($line); + } } + } finally { + fclose($fh); } - fclose($fh); - // Insert documents in bulk $collection->insertMany($docs); } } /** - * Using multiple forked threads + * Using multiple forked threads. The number of threads is controlled by the "chunk" parameter, + * which is the number of files to import in each thread. * - * @param array{processes:int, files:string[], batchSize:int} $params + * @param array{chunk:int} $params */ - #[ParamProviders(['provideProcessesParameter'])] - public function benchMultiFileImportFork(array $params): void + #[ParamProviders(['provideChunkParams'])] + public function benchFork(array $params): void { $pids = []; - foreach (self::getFileNames() as $file) { - // Wait for a child process to finish if we have reached the maximum number of processes - if (count($pids) >= $params['processes']) { - $pid = pcntl_waitpid(-1, $status); - unset($pids[$pid]); - } + // Reset to ensure that the existing libmongoc client (via the Manager) is not re-used by the child + // process. When the child process constructs a new Manager, the differing PID will result in creation + // of a new libmongoc client. + Utils::reset(); + + foreach (array_chunk(self::getFileNames(), $params['chunk']) as $files) { $pid = pcntl_fork(); if ($pid === 0) { - // Reset to ensure that the existing libmongoc client (via the Manager) is not re-used by the child - // process. When the child process constructs a new Manager, the differing PID will result in creation - // of a new libmongoc client. - Utils::reset(); - self::importFile($file); + self::importFile($files); // Exit the child process exit(0); @@ -148,31 +153,36 @@ public function benchMultiFileImportFork(array $params): void } /** - * Using amphp/parallel-functions with worker pool + * Using amphp/parallel with worker pool * - * @param array{processes:int, files:string[], batchSize:int} $params + * @param array{processes:int} $params */ - #[ParamProviders(['provideProcessesParameter'])] - public function benchMultiFileImportAmp(array $params): void + #[ParamProviders(['provideChunkParams'])] + public function benchAmpWorkers(array $params): void { - wait(parallelMap( - self::getFileNames(), - // Uses array callable instead of closure to skip complex serialization - [self::class, 'importFile'], - // The pool size is the number of processes - new DefaultPool($params['processes']), - )); + $workerPool = new ContextWorkerPool(ceil(100 / $params['chunk']), new ContextWorkerFactory()); + + $futures = array_map( + fn ($files) => $workerPool->submit(new ImportFileTask($files))->getFuture(), + array_chunk(self::getFileNames(), $params['chunk']), + ); + + foreach (Future::iterate($futures) as $future) { + $future->await(); + } + + $workerPool->shutdown(); } - public static function provideProcessesParameter(): Generator + public function provideChunkParams(): Generator { - yield '1 proc' => ['processes' => 1]; // 100 sequences, to compare to the single thread baseline - yield '2 proc' => ['processes' => 2]; // 50 sequences - yield '4 proc' => ['processes' => 4]; // 25 sequences - yield '8 proc' => ['processes' => 8]; // 13 sequences - yield '13 proc' => ['processes' => 13]; // 8 sequences - yield '20 proc' => ['processes' => 20]; // 5 sequences - yield '34 proc' => ['processes' => 34]; // 3 sequences + yield 'by 1' => ['chunk' => 1]; + yield 'by 2' => ['chunk' => 2]; + yield 'by 4' => ['chunk' => 4]; + yield 'by 8' => ['chunk' => 8]; + yield 'by 13' => ['chunk' => 13]; + yield 'by 20' => ['chunk' => 20]; + yield 'by 100' => ['chunk' => 100]; } /** @@ -182,17 +192,22 @@ public static function provideProcessesParameter(): Generator * - fgets * - stream_get_line 🏆 */ - public static function importFile(string $file): void + public static function importFile(string|array $files): void { $namespace = sprintf('%s.%s', Utils::getDatabaseName(), Utils::getCollectionName()); $bulkWrite = new BulkWrite(); - $fh = fopen($file, 'r'); - while (($line = stream_get_line($fh, 10_000, "\n")) !== false) { - $bulkWrite->insert(Document::fromJSON($line)); + foreach ((array) $files as $file) { + $fh = fopen($file, 'r'); + try { + while (($line = stream_get_line($fh, 10_000, "\n")) !== false) { + $bulkWrite->insert(Document::fromJSON($line)); + } + } finally { + fclose($fh); + } } - fclose($fh); Utils::getClient()->getManager()->executeBulkWrite($namespace, $bulkWrite); }