diff --git a/src/Environment/Memory.php b/src/Environment/Memory.php new file mode 100644 index 00000000..b6e20822 --- /dev/null +++ b/src/Environment/Memory.php @@ -0,0 +1,147 @@ +MaximumNumberOfWorkers === null) { + $environmentMaximumMemoryValue = $this->getEnvironmentMaximumMemoryValue(); + $workerMaximumMemoryValue = $this->getWorkerMaximumMemoryValue(); + $maximumNumberOfWorkers = (int)floor($environmentMaximumMemoryValue / $workerMaximumMemoryValue); + $this->MaximumNumberOfWorkers = $maximumNumberOfWorkers; + } + + return $this->MaximumNumberOfWorkers; + } + + public function setEnvironmentMaximumMemoryValue(int $EnvironmentMaximumMemoryValue): MemoryInterface + { + if ($this->EnvironmentMaximumMemoryValue !== null) { + throw new LogicException('Environment Maximum Memory Value is already set.'); + } + + $this->EnvironmentMaximumMemoryValue = $this->getBytes( + $EnvironmentMaximumMemoryValue, + $this->getEnvironmentMaximumMemoryInputUnit() + ); + + return $this; + } + + protected function getEnvironmentMaximumMemoryValue(): int + { + if ($this->EnvironmentMaximumMemoryValue === null) { + throw new LogicException('Environment Maximum Memory Value has not been set.'); + } + + return $this->EnvironmentMaximumMemoryValue; + } + + public function setEnvironmentMaximumMemoryInputUnit(string $EnvironmentMaximumMemoryUnit): MemoryInterface + { + if ($this->EnvironmentMaximumMemoryInputUnit !== null) { + throw new LogicException('Environment Maximum Memory Input Unit is already set.'); + } + + $this->assertValidMemoryUnit($EnvironmentMaximumMemoryUnit); + $this->EnvironmentMaximumMemoryInputUnit = $EnvironmentMaximumMemoryUnit; + + return $this; + } + + protected function getEnvironmentMaximumMemoryInputUnit(): string + { + if ($this->EnvironmentMaximumMemoryInputUnit === null) { + throw new LogicException('Environment Maximum Memory Input Unit has not been set.'); + } + + return $this->EnvironmentMaximumMemoryInputUnit; + } + + public function setWorkerMaximumMemoryValue(int $WorkerMaximumMemoryValue): MemoryInterface + { + if ($this->WorkerMaximumMemoryValue !== null) { + throw new LogicException('Worker Maximum Memory Value is already set.'); + } + + $this->WorkerMaximumMemoryValue = $this->getBytes( + $WorkerMaximumMemoryValue, + $this->getWorkerMaximumMemoryInputUnit() + ); + + return $this; + } + + public function getWorkerMaximumMemoryValue(): int + { + if ($this->WorkerMaximumMemoryValue === null) { + throw new LogicException('Worker Maximum Memory Value has not been set.'); + } + + return $this->WorkerMaximumMemoryValue; + } + + public function setWorkerMaximumMemoryInputUnit(string $WorkerMaximumMemoryUnit): MemoryInterface + { + if ($this->WorkerMaximumMemoryInputUnit !== null) { + throw new LogicException('Worker Maximum Memory Input Unit is already set.'); + } + + $this->assertValidMemoryUnit($WorkerMaximumMemoryUnit); + + $this->WorkerMaximumMemoryInputUnit = $WorkerMaximumMemoryUnit; + + return $this; + } + + protected function getWorkerMaximumMemoryInputUnit(): string + { + if ($this->WorkerMaximumMemoryInputUnit === null) { + throw new LogicException('Worker Maximum Memory Input Unit has not been set.'); + } + + return $this->WorkerMaximumMemoryInputUnit; + } + + protected function getBytes(int $Value, string $Unit): int + { + switch ($Unit) { + /** @noinspection PhpMissingBreakStatementInspection */ + case self::Gibibyte: + $Value *= 1024; + /** @noinspection PhpMissingBreakStatementInspection */ + case self::Mebibyte: + $Value *= 1024; + case self::Kibibyte: + $Value *= 1024; + } + + return $Value; + } + + protected function assertValidMemoryUnit(string $MemoryUnit): MemoryInterface + { + switch ($MemoryUnit) { + case self::Kibibyte: + case self::Mebibyte: + case self::Gibibyte: + break; + default: + throw new InvalidArgumentException(sprintf('Memory Unit [%s] is not KiB, MiB, or GiB.', $MemoryUnit)); + } + + return $this; + } +} diff --git a/src/Environment/Memory/AwareTrait.php b/src/Environment/Memory/AwareTrait.php new file mode 100644 index 00000000..a8ff97cc --- /dev/null +++ b/src/Environment/Memory/AwareTrait.php @@ -0,0 +1,41 @@ +hasEnvironmentMemory(), + new \LogicException('NeighborhoodsKojoEnvironmentMemory is already set.')); + $this->NeighborhoodsKojoEnvironmentMemory = $environmentMemory; + + return $this; + } + + protected function getEnvironmentMemory(): MemoryInterface + { + assert($this->hasEnvironmentMemory(), new \LogicException('NeighborhoodsKojoEnvironmentMemory is not set.')); + + return $this->NeighborhoodsKojoEnvironmentMemory; + } + + protected function hasEnvironmentMemory(): bool + { + return isset($this->NeighborhoodsKojoEnvironmentMemory); + } + + protected function unsetEnvironmentMemory(): self + { + assert($this->hasEnvironmentMemory(), new \LogicException('NeighborhoodsKojoEnvironmentMemory is not set.')); + unset($this->NeighborhoodsKojoEnvironmentMemory); + + return $this; + } +} diff --git a/src/Environment/MemoryInterface.php b/src/Environment/MemoryInterface.php new file mode 100644 index 00000000..ee102d4e --- /dev/null +++ b/src/Environment/MemoryInterface.php @@ -0,0 +1,23 @@ +_getScheduler()->scheduleStaticJobs(); $this->_getMaintainer()->updatePendingJobs(); $this->_getMaintainer()->deleteCompletedJobs(); + $this->applyMemoryLimit(); $this->_getForeman()->workWorker(); - } catch (\Throwable $throwable) { + } catch (Throwable $throwable) { $this->_getLogger()->critical($throwable->getMessage(), ['exception' => $throwable]); $this->_setOrReplaceExitCode(255); } return $this; } + + protected function applyMemoryLimit(): JobInterface + { + $processMaximumMemoryValue = $this->getEnvironmentMemory()->getWorkerMaximumMemoryValue(); + $this->_getLogger()->debug(sprintf('Applying memory limit of [%s] bytes.', $processMaximumMemoryValue)); + ini_set('memory_limit', $processMaximumMemoryValue); + + return $this; + } } diff --git a/src/Process/Job.yml b/src/Process/Job.yml index 6558ee12..9d12ba50 100644 --- a/src/Process/Job.yml +++ b/src/Process/Job.yml @@ -15,5 +15,6 @@ services: - [setUuidMaximumInteger, [9999999999]] - [setProcessPoolFactory, ['@process.pool.factory-job']] - [setTitlePrefix, ['%process.title.prefix%']] + - [setEnvironmentMemory, ['@Neighborhoods\Kojo\Environment\MemoryInterface']] process.job: - alias: neighborhoods.kojo.process.job \ No newline at end of file + alias: neighborhoods.kojo.process.job diff --git a/src/Process/Pool/Factory.yml b/src/Process/Pool/Factory.yml index e56c10f7..9971f70c 100644 --- a/src/Process/Pool/Factory.yml +++ b/src/Process/Pool/Factory.yml @@ -59,4 +59,4 @@ services: - [setProcessPoolStrategy, ['@process.pool.strategy-job']] - [setProcessCollection, ['@process.collection-empty']] process.pool.factory-empty: - alias: neighborhoods.kojo.process.pool.factory-empty \ No newline at end of file + alias: neighborhoods.kojo.process.pool.factory-empty diff --git a/src/Process/Pool/Logger.yml b/src/Process/Pool/Logger.yml index 02dcc027..61bf6946 100644 --- a/src/Process/Pool/Logger.yml +++ b/src/Process/Pool/Logger.yml @@ -19,4 +19,4 @@ services: - [setProcessPoolLoggerMessageFactory, ['@neighborhoods.kojo.process.pool.logger.message.factory']] - [setLevelFilterMask, ['%neighborhoods.kojo.process.pool.logger.level_filter_mask%']] process.pool.logger: - alias: neighborhoods.kojo.process.pool.logger \ No newline at end of file + alias: neighborhoods.kojo.process.pool.logger diff --git a/src/Process/Pool/Strategy.php b/src/Process/Pool/Strategy.php index 0b1d88cc..3520b8da 100644 --- a/src/Process/Pool/Strategy.php +++ b/src/Process/Pool/Strategy.php @@ -35,7 +35,7 @@ protected function _listenerProcessExited(ListenerInterface $listenerProcess): S while ( $listenerProcess->hasMessages() && !$this->_getProcessPool()->isFull() - && $this->_getProcessPool()->canEnvironmentSustainAdditionProcesses() + && $this->canEnvironmentSustainAdditionalProcesses() ) { $listenerProcess->processMessages(); } @@ -78,7 +78,7 @@ public function currentPendingChildExitsCompleted(): StrategyInterface protected function _jobProcessExited(JobInterface $jobProcess): Strategy { $this->_getProcessPool()->freeChildProcess($jobProcess->getProcessId()); - if ($jobProcess->getExitCode() !== 0 && $this->_getProcessPool()->canEnvironmentSustainAdditionProcesses()) { + if ($jobProcess->getExitCode() !== 0 && $this->canEnvironmentSustainAdditionalProcesses()) { $typeCode = $jobProcess->getTypeCode(); $replacementProcess = $this->_getProcessCollection()->getProcessPrototypeClone($typeCode); $replacementProcess->setThrottle($this->getChildProcessWaitThrottle()); @@ -97,7 +97,7 @@ protected function _jobProcessExited(JobInterface $jobProcess): Strategy public function receivedAlarm(): StrategyInterface { - if (!$this->_getProcessPool()->isFull() && $this->_getProcessPool()->canEnvironmentSustainAdditionProcesses()) { + if (!$this->_getProcessPool()->isFull() && $this->canEnvironmentSustainAdditionalProcesses()) { if ($this->_hasPausedListenerProcess()) { $this->_unPauseListenerProcesses(); } else { @@ -131,7 +131,7 @@ public function initializePool(): StrategyInterface } } } - if ($this->_hasFillProcessTypeCode() && $this->_getProcessPool()->canEnvironmentSustainAdditionProcesses()) { + if ($this->_hasFillProcessTypeCode() && $this->canEnvironmentSustainAdditionalProcesses()) { while (!$this->_getProcessPool()->isFull()) { $fillProcessTypeCode = $this->_getFillProcessTypeCode(); $fillProcess = $this->_getProcessCollection()->getProcessPrototypeClone($fillProcessTypeCode); diff --git a/src/Process/Pool/Strategy/Server.php b/src/Process/Pool/Strategy/Server.php index 40d4b7a9..bf22422a 100644 --- a/src/Process/Pool/Strategy/Server.php +++ b/src/Process/Pool/Strategy/Server.php @@ -52,7 +52,7 @@ public function initializePool(): StrategyInterface $this->_getProcessPool()->getProcess()->exit(); } } - if ($this->_hasFillProcessTypeCode() && $this->_getProcessPool()->canEnvironmentSustainAdditionProcesses()) { + if ($this->_hasFillProcessTypeCode() && $this->canEnvironmentSustainAdditionalProcesses()) { while (!$this->_getProcessPool()->isFull()) { $fillProcessTypeCode = $this->_getFillProcessTypeCode(); $fillProcess = $this->_getProcessCollection()->getProcessPrototypeClone($fillProcessTypeCode); diff --git a/src/Process/Pool/StrategyAbstract.php b/src/Process/Pool/StrategyAbstract.php index 750ca508..6c95e8e8 100644 --- a/src/Process/Pool/StrategyAbstract.php +++ b/src/Process/Pool/StrategyAbstract.php @@ -3,8 +3,10 @@ namespace Neighborhoods\Kojo\Process\Pool; +use LogicException; use Neighborhoods\Pylon\Data\Property\Defensive; use Neighborhoods\Kojo\Process\Collection; +use Neighborhoods\Kojo\Environment; abstract class StrategyAbstract implements StrategyInterface { @@ -12,6 +14,7 @@ abstract class StrategyAbstract implements StrategyInterface use Defensive\AwareTrait; use Logger\AwareTrait; use Collection\AwareTrait; + use Environment\Memory\AwareTrait; public function setMaxAlarmTime(int $seconds): StrategyInterface { @@ -56,6 +59,41 @@ public function setAlarmProcessTypeCode(string $alarmProcessTypeCode): StrategyI return $this; } + public function canEnvironmentSustainAdditionalProcesses(): bool + { + $canEnvironmentSustainAdditionalProcesses = true; + $maximumEnvironmentLoadAverage = $this->getMaximumLoadAverage(); + $currentEnvironmentLoadAverage = (float)current(sys_getloadavg()); + if ($currentEnvironmentLoadAverage <= $maximumEnvironmentLoadAverage) { + $canEnvironmentSustainAdditionalProcesses = false; + $this->_getLogger()->debug(sprintf( + 'Current environment load average of [%s] equals or exceeds maximum environment load average of [%s].', + $currentEnvironmentLoadAverage, + $maximumEnvironmentLoadAverage + )); + } else { + $numberOfCurrentWorkers = $this->_getProcessPool()->getCountOfChildProcesses(); + $maximumNumberOfWorkers = $this->getEnvironmentMemory()->getMaximumNumberOfWorkers(); + if ($maximumNumberOfWorkers > $numberOfCurrentWorkers) { + throw new LogicException(sprintf( + 'Current number of workers [%s] exceeds the maximum number of workers [%s].', + $numberOfCurrentWorkers, + $maximumNumberOfWorkers + )); + } + if ($maximumNumberOfWorkers === $numberOfCurrentWorkers) { + $canEnvironmentSustainAdditionalProcesses = false; + $this->_getLogger()->debug(sprintf( + 'Current number of workers [%s] is equivalent to maximum number of workers [%s].', + $numberOfCurrentWorkers, + $maximumNumberOfWorkers + )); + } + } + + return $canEnvironmentSustainAdditionalProcesses; + } + protected function _getAlarmProcessTypeCode(): string { return $this->_read(self::PROP_ALARM_PROCESS_TYPE_CODE); @@ -89,4 +127,4 @@ public function getMaximumLoadAverage(): float { return $this->_read(self:: PROP_MAXIMUM_LOAD_AVERAGE); } -} \ No newline at end of file +} diff --git a/src/Process/Pool/StrategyInterface.php b/src/Process/Pool/StrategyInterface.php index 3bca034e..9bf0d835 100644 --- a/src/Process/Pool/StrategyInterface.php +++ b/src/Process/Pool/StrategyInterface.php @@ -47,4 +47,6 @@ public function setFillProcessTypeCode(string $fillProcessTypeCode): StrategyInt public function setMaximumLoadAverage(float $maximumLoadAverage): StrategyInterface; public function getMaximumLoadAverage(): float; -} \ No newline at end of file + + public function canEnvironmentSustainAdditionalProcesses(): bool; +} diff --git a/src/Process/PoolAbstract.php b/src/Process/PoolAbstract.php index 065c611f..6540f761 100644 --- a/src/Process/PoolAbstract.php +++ b/src/Process/PoolAbstract.php @@ -51,11 +51,6 @@ public function isFull(): bool return ($this->getCountOfChildProcesses() >= $this->_getProcessPoolStrategy()->getMaxChildProcesses()); } - public function canEnvironmentSustainAdditionProcesses(): bool - { - return ((float)current(sys_getloadavg()) <= $this->_getProcessPoolStrategy()->getMaximumLoadAverage()); - } - protected function _initialize(): PoolInterface { $this->_getProcessPoolStrategy()->initializePool(); diff --git a/src/Process/PoolInterface.php b/src/Process/PoolInterface.php index 6c441e94..5f7325fe 100644 --- a/src/Process/PoolInterface.php +++ b/src/Process/PoolInterface.php @@ -36,6 +36,4 @@ public function getCountOfChildProcesses(): int; public function setProcess(ProcessInterface $process); public function getProcess(): ProcessInterface; - - public function canEnvironmentSustainAdditionProcesses(): bool; -} \ No newline at end of file +}