ProcessManager.php 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. declare(strict_types=1);
  7. namespace Magento\Indexer\Model;
  8. /**
  9. * Provide functionality for executing user functions in multi-thread mode.
  10. */
  11. class ProcessManager
  12. {
  13. /**
  14. * Threads count environment variable name
  15. */
  16. const THREADS_COUNT = 'MAGE_INDEXER_THREADS_COUNT';
  17. /** @var bool */
  18. private $failInChildProcess = false;
  19. /** @var \Magento\Framework\App\ResourceConnection */
  20. private $resource;
  21. /** @var \Magento\Framework\Registry */
  22. private $registry;
  23. /** @var int|null */
  24. private $threadsCount;
  25. /**
  26. * @param \Magento\Framework\App\ResourceConnection $resource
  27. * @param \Magento\Framework\Registry $registry
  28. * @param int|null $threadsCount
  29. */
  30. public function __construct(
  31. \Magento\Framework\App\ResourceConnection $resource,
  32. \Magento\Framework\Registry $registry = null,
  33. int $threadsCount = null
  34. ) {
  35. $this->resource = $resource;
  36. if (null === $registry) {
  37. $registry = \Magento\Framework\App\ObjectManager::getInstance()->get(
  38. \Magento\Framework\Registry::class
  39. );
  40. }
  41. $this->registry = $registry;
  42. $this->threadsCount = (int)$threadsCount;
  43. }
  44. /**
  45. * Execute user functions
  46. *
  47. * @param \Traversable $userFunctions
  48. */
  49. public function execute($userFunctions)
  50. {
  51. if ($this->threadsCount > 1 && $this->isCanBeParalleled() && !$this->isSetupMode() && PHP_SAPI == 'cli') {
  52. $this->multiThreadsExecute($userFunctions);
  53. } else {
  54. $this->simpleThreadExecute($userFunctions);
  55. }
  56. }
  57. /**
  58. * Execute user functions in singleThreads mode
  59. *
  60. * @param \Traversable $userFunctions
  61. */
  62. private function simpleThreadExecute($userFunctions)
  63. {
  64. foreach ($userFunctions as $userFunction) {
  65. call_user_func($userFunction);
  66. }
  67. }
  68. /**
  69. * Execute user functions in multiThreads mode
  70. *
  71. * @param \Traversable $userFunctions
  72. * @SuppressWarnings(PHPMD.UnusedLocalVariable)
  73. */
  74. private function multiThreadsExecute($userFunctions)
  75. {
  76. $this->resource->closeConnection(null);
  77. $threadNumber = 0;
  78. foreach ($userFunctions as $userFunction) {
  79. $pid = pcntl_fork();
  80. if ($pid == -1) {
  81. throw new \RuntimeException('Unable to fork a new process');
  82. } elseif ($pid) {
  83. $this->executeParentProcess($threadNumber);
  84. } else {
  85. $this->startChildProcess($userFunction);
  86. }
  87. }
  88. while (pcntl_waitpid(0, $status) != -1) {
  89. //Waiting for the completion of child processes
  90. }
  91. if ($this->failInChildProcess) {
  92. throw new \RuntimeException('Fail in child process');
  93. }
  94. }
  95. /**
  96. * Is process can be paralleled
  97. *
  98. * @return bool
  99. */
  100. private function isCanBeParalleled(): bool
  101. {
  102. return function_exists('pcntl_fork');
  103. }
  104. /**
  105. * Is setup mode
  106. *
  107. * @return bool
  108. */
  109. private function isSetupMode(): bool
  110. {
  111. return $this->registry->registry('setup-mode-enabled') ?: false;
  112. }
  113. /**
  114. * Start child process
  115. *
  116. * @param callable $userFunction
  117. * @SuppressWarnings(PHPMD.ExitExpression)
  118. */
  119. private function startChildProcess(callable $userFunction)
  120. {
  121. $status = call_user_func($userFunction);
  122. $status = is_integer($status) ? $status : 0;
  123. exit($status);
  124. }
  125. /**
  126. * Execute parent process
  127. *
  128. * @param int $threadNumber
  129. */
  130. private function executeParentProcess(int &$threadNumber)
  131. {
  132. $threadNumber++;
  133. if ($threadNumber >= $this->threadsCount) {
  134. pcntl_wait($status);
  135. if (pcntl_wexitstatus($status) !== 0) {
  136. $this->failInChildProcess = true;
  137. }
  138. $threadNumber--;
  139. }
  140. }
  141. }