123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- <?php
- /**
- * Copyright © Magento, Inc. All rights reserved.
- * See COPYING.txt for license details.
- */
- declare(strict_types=1);
- namespace Magento\Indexer\Model;
- /**
- * Provide functionality for executing user functions in multi-thread mode.
- */
- class ProcessManager
- {
- /**
- * Threads count environment variable name
- */
- const THREADS_COUNT = 'MAGE_INDEXER_THREADS_COUNT';
- /** @var bool */
- private $failInChildProcess = false;
- /** @var \Magento\Framework\App\ResourceConnection */
- private $resource;
- /** @var \Magento\Framework\Registry */
- private $registry;
- /** @var int|null */
- private $threadsCount;
- /**
- * @param \Magento\Framework\App\ResourceConnection $resource
- * @param \Magento\Framework\Registry $registry
- * @param int|null $threadsCount
- */
- public function __construct(
- \Magento\Framework\App\ResourceConnection $resource,
- \Magento\Framework\Registry $registry = null,
- int $threadsCount = null
- ) {
- $this->resource = $resource;
- if (null === $registry) {
- $registry = \Magento\Framework\App\ObjectManager::getInstance()->get(
- \Magento\Framework\Registry::class
- );
- }
- $this->registry = $registry;
- $this->threadsCount = (int)$threadsCount;
- }
- /**
- * Execute user functions
- *
- * @param \Traversable $userFunctions
- */
- public function execute($userFunctions)
- {
- if ($this->threadsCount > 1 && $this->isCanBeParalleled() && !$this->isSetupMode() && PHP_SAPI == 'cli') {
- $this->multiThreadsExecute($userFunctions);
- } else {
- $this->simpleThreadExecute($userFunctions);
- }
- }
- /**
- * Execute user functions in singleThreads mode
- *
- * @param \Traversable $userFunctions
- */
- private function simpleThreadExecute($userFunctions)
- {
- foreach ($userFunctions as $userFunction) {
- call_user_func($userFunction);
- }
- }
- /**
- * Execute user functions in multiThreads mode
- *
- * @param \Traversable $userFunctions
- * @SuppressWarnings(PHPMD.UnusedLocalVariable)
- */
- private function multiThreadsExecute($userFunctions)
- {
- $this->resource->closeConnection(null);
- $threadNumber = 0;
- foreach ($userFunctions as $userFunction) {
- $pid = pcntl_fork();
- if ($pid == -1) {
- throw new \RuntimeException('Unable to fork a new process');
- } elseif ($pid) {
- $this->executeParentProcess($threadNumber);
- } else {
- $this->startChildProcess($userFunction);
- }
- }
- while (pcntl_waitpid(0, $status) != -1) {
- //Waiting for the completion of child processes
- }
- if ($this->failInChildProcess) {
- throw new \RuntimeException('Fail in child process');
- }
- }
- /**
- * Is process can be paralleled
- *
- * @return bool
- */
- private function isCanBeParalleled(): bool
- {
- return function_exists('pcntl_fork');
- }
- /**
- * Is setup mode
- *
- * @return bool
- */
- private function isSetupMode(): bool
- {
- return $this->registry->registry('setup-mode-enabled') ?: false;
- }
- /**
- * Start child process
- *
- * @param callable $userFunction
- * @SuppressWarnings(PHPMD.ExitExpression)
- */
- private function startChildProcess(callable $userFunction)
- {
- $status = call_user_func($userFunction);
- $status = is_integer($status) ? $status : 0;
- exit($status);
- }
- /**
- * Execute parent process
- *
- * @param int $threadNumber
- */
- private function executeParentProcess(int &$threadNumber)
- {
- $threadNumber++;
- if ($threadNumber >= $this->threadsCount) {
- pcntl_wait($status);
- if (pcntl_wexitstatus($status) !== 0) {
- $this->failInChildProcess = true;
- }
- $threadNumber--;
- }
- }
- }
|