Skip to content

Commit

Permalink
Merge pull request #2 from nutgram/support_new_version
Browse files Browse the repository at this point in the history
Support new version
  • Loading branch information
sergix44 authored Jun 28, 2023
2 parents ca57d2b + 856d56f commit 0274539
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 24 deletions.
6 changes: 3 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
"description": "Async running mode for Nutgram",
"type": "library",
"require": {
"nutgram/nutgram": ">=3.1.4",
"spatie/fork": "^1.1"
"nutgram/nutgram": "^4.0",
"ext-pcntl": "*"
},
"license": "MIT",
"autoload": {
Expand All @@ -19,7 +19,7 @@
}
],
"require-dev": {
"orchestra/testbench": "^7.5"
"orchestra/testbench": "^7.0|^8.0"
},
"extra": {
"laravel": {
Expand Down
4 changes: 2 additions & 2 deletions src/ExtendsNutgramProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ public function register()
{
$this->app->extend(Nutgram::class, function (Nutgram $bot, Application $app) {
if (!$app->runningUnitTests() && $app->runningInConsole()) {
$concurrency = $bot->getConfig()['concurrency'] ?? 2;
$bot->setRunningMode(new ParallelPolling($concurrency));
$mode = new ParallelPolling($bot->getConfig()?->concurrency ?? 2);
$bot->setRunningMode($mode);
}

return $bot;
Expand Down
23 changes: 4 additions & 19 deletions src/ParallelPolling.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,19 @@

use SergiX44\Nutgram\Nutgram;
use SergiX44\Nutgram\RunningMode\Polling;
use Spatie\Async\Pool;
use Spatie\Fork\Fork;
use Throwable;

class ParallelPolling extends Polling
{
private Fork $pool;
private ProcessManager $manager;

public function __construct(int $concurrency = 2)
{
$this->pool = Fork::new()->concurrent($concurrency);
$this->manager = new ProcessManager($concurrency);
}

protected function fire(Nutgram $bot, array|null $updates): void
protected function fire(Nutgram $bot, array $updates = []): void
{
$tasks = [];
foreach ($updates as $update) {
$tasks[] = static function () use ($bot, $update) {
try {
$bot->processUpdate($update);
} catch (Throwable $e) {
echo "$e\n";
} finally {
$bot->clearData();
}
};
}

$this->pool->run(...$tasks);
$this->manager->pushUpdates($bot, self::$STDERR, $updates);
}
}
73 changes: 73 additions & 0 deletions src/ProcessManager.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?php

namespace SergiX44\Async;

use RuntimeException;
use SergiX44\Nutgram\Nutgram;
use SergiX44\Nutgram\Telegram\Types\Common\Update;
use Throwable;

class ProcessManager
{

private array $runners = [];

public function __construct(private int $maxWorkers)
{
}

public function pushUpdates(Nutgram $bot, mixed $stderr, array $updates)
{
$pid = pcntl_fork();

if ($pid == -1) {
throw new RuntimeException('Cannot fork!');
} elseif ($pid) {
$this->runners[$pid] = $pid;

// remove the stopped runners
foreach ($this->runners as $pid) {
if (pcntl_waitpid($pid, $status, WNOHANG | WUNTRACED)) {
if (pcntl_wifexited($status)) {
unset($this->runners[$pid]);
}
}
}
} else {
$this->forkRunners($bot, $stderr, $updates);
}
}

public function forkRunners(Nutgram $bot, mixed $stderr, array $updates)
{
$updateWorkers = [];
foreach ($updates as $update) {
if (count($updateWorkers) >= $this->maxWorkers) {
$pid = pcntl_wait($status);
unset($updateWorkers[$pid]);
}

$pid = pcntl_fork();

if ($pid == -1) {
throw new RuntimeException('Cannot fork!');
} elseif ($pid) {
$updateWorkers[$pid] = $pid;
} else {
try {
$bot->processUpdate($update);
} catch (Throwable $e) {
fwrite($stderr, "$e\n");
} finally {
exit(0);
}
}
}

foreach ($updateWorkers as $pid) {
pcntl_waitpid($pid, $status);
}
exit(0);
}

}

0 comments on commit 0274539

Please sign in to comment.