Queue.php 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\Deploy\Process;
  7. use Magento\Deploy\Package\Package;
  8. use Magento\Deploy\Service\DeployPackage;
  9. use Magento\Framework\App\ResourceConnection;
  10. use Psr\Log\LoggerInterface;
  11. use Magento\Framework\App\State as AppState;
  12. use Magento\Framework\Locale\ResolverInterface as LocaleResolver;
  13. /**
  14. * Deployment Queue
  15. *
  16. * Deploy packages in parallel forks (if available)
  17. */
  18. class Queue
  19. {
  20. /**
  21. * Default max amount of processes
  22. */
  23. const DEFAULT_MAX_PROCESSES_AMOUNT = 4;
  24. /**
  25. * Default max execution time
  26. */
  27. const DEFAULT_MAX_EXEC_TIME = 400;
  28. /**
  29. * @var array
  30. */
  31. private $packages = [];
  32. /**
  33. * @var int[]
  34. */
  35. private $processIds = [];
  36. /**
  37. * @var Package[]
  38. */
  39. private $inProgress = [];
  40. /**
  41. * @var int
  42. */
  43. private $maxProcesses;
  44. /**
  45. * @var int
  46. */
  47. private $maxExecTime;
  48. /**
  49. * @var AppState
  50. */
  51. private $appState;
  52. /**
  53. * @var LocaleResolver
  54. */
  55. private $localeResolver;
  56. /**
  57. * @var ResourceConnection
  58. */
  59. private $resourceConnection;
  60. /**
  61. * @var LoggerInterface
  62. */
  63. private $logger;
  64. /**
  65. * @var DeployPackage
  66. */
  67. private $deployPackageService;
  68. /**
  69. * @var array
  70. */
  71. private $options = [];
  72. /**
  73. * @var int
  74. */
  75. private $start = 0;
  76. /**
  77. * @var int
  78. */
  79. private $lastJobStarted = 0;
  80. /**
  81. * @param AppState $appState
  82. * @param LocaleResolver $localeResolver
  83. * @param ResourceConnection $resourceConnection
  84. * @param LoggerInterface $logger
  85. * @param DeployPackage $deployPackageService
  86. * @param array $options
  87. * @param int $maxProcesses
  88. * @param int $maxExecTime
  89. */
  90. public function __construct(
  91. AppState $appState,
  92. LocaleResolver $localeResolver,
  93. ResourceConnection $resourceConnection,
  94. LoggerInterface $logger,
  95. DeployPackage $deployPackageService,
  96. array $options = [],
  97. $maxProcesses = self::DEFAULT_MAX_PROCESSES_AMOUNT,
  98. $maxExecTime = self::DEFAULT_MAX_EXEC_TIME
  99. ) {
  100. $this->appState = $appState;
  101. $this->localeResolver = $localeResolver;
  102. $this->resourceConnection = $resourceConnection;
  103. $this->logger = $logger;
  104. $this->deployPackageService = $deployPackageService;
  105. $this->options = $options;
  106. $this->maxProcesses = $maxProcesses;
  107. $this->maxExecTime = $maxExecTime;
  108. }
  109. /**
  110. * @param Package $package
  111. * @param Package[] $dependencies
  112. * @return bool true on success
  113. */
  114. public function add(Package $package, array $dependencies = [])
  115. {
  116. $this->packages[$package->getPath()] = [
  117. 'package' => $package,
  118. 'dependencies' => $dependencies
  119. ];
  120. return true;
  121. }
  122. /**
  123. * @return Package[]
  124. */
  125. public function getPackages()
  126. {
  127. return $this->packages;
  128. }
  129. /**
  130. * Process jobs
  131. *
  132. * @return int
  133. */
  134. public function process()
  135. {
  136. $returnStatus = 0;
  137. $this->start = $this->lastJobStarted = time();
  138. $packages = $this->packages;
  139. while (count($packages) && $this->checkTimeout()) {
  140. foreach ($packages as $name => $packageJob) {
  141. $this->assertAndExecute($name, $packages, $packageJob);
  142. }
  143. $this->logger->info('.');
  144. sleep(3);
  145. foreach ($this->inProgress as $name => $package) {
  146. if ($this->isDeployed($package)) {
  147. unset($this->inProgress[$name]);
  148. }
  149. }
  150. }
  151. $this->awaitForAllProcesses();
  152. return $returnStatus;
  153. }
  154. /**
  155. * Check that all depended packages deployed and execute
  156. *
  157. * @param string $name
  158. * @param array $packages
  159. * @param array $packageJob
  160. * @return void
  161. */
  162. private function assertAndExecute($name, array & $packages, array $packageJob)
  163. {
  164. /** @var Package $package */
  165. $package = $packageJob['package'];
  166. $dependenciesNotFinished = false;
  167. if ($package->getParent() && $package->getParent() !== $package) {
  168. foreach ($packageJob['dependencies'] as $dependencyName => $dependency) {
  169. if (!$this->isDeployed($dependency)) {
  170. //If it's not present in $packages then it's already
  171. //in progress so just waiting...
  172. if (!array_key_exists($dependencyName, $packages)) {
  173. $dependenciesNotFinished = true;
  174. } else {
  175. $this->assertAndExecute(
  176. $dependencyName,
  177. $packages,
  178. $packages[$dependencyName]
  179. );
  180. }
  181. }
  182. }
  183. }
  184. $this->executePackage($package, $name, $packages, $dependenciesNotFinished);
  185. }
  186. /**
  187. * @param Package $package
  188. * @param string $name
  189. * @param array $packages
  190. * @param bool $dependenciesNotFinished
  191. * @return void
  192. */
  193. private function executePackage(
  194. Package $package,
  195. string $name,
  196. array &$packages,
  197. bool $dependenciesNotFinished
  198. ) {
  199. if (!$dependenciesNotFinished
  200. && !$this->isDeployed($package)
  201. && ($this->maxProcesses < 2 || (count($this->inProgress) < $this->maxProcesses))
  202. ) {
  203. unset($packages[$name]);
  204. $this->execute($package);
  205. }
  206. }
  207. /**
  208. * Need to wait till all processes finished
  209. *
  210. * @return void
  211. */
  212. private function awaitForAllProcesses()
  213. {
  214. while ($this->inProgress && $this->checkTimeout()) {
  215. foreach ($this->inProgress as $name => $package) {
  216. if ($this->isDeployed($package)) {
  217. unset($this->inProgress[$name]);
  218. }
  219. }
  220. $this->logger->info('.');
  221. sleep(5);
  222. }
  223. if ($this->isCanBeParalleled()) {
  224. // close connections only if ran with forks
  225. $this->resourceConnection->closeConnection();
  226. }
  227. }
  228. /**
  229. * @return bool
  230. */
  231. private function isCanBeParalleled()
  232. {
  233. return function_exists('pcntl_fork') && $this->maxProcesses > 1;
  234. }
  235. /**
  236. * @param Package $package
  237. * @return bool true on success for main process and exit for child process
  238. * @SuppressWarnings(PHPMD.ExitExpression)
  239. */
  240. private function execute(Package $package)
  241. {
  242. $this->lastJobStarted = time();
  243. $this->logger->info(
  244. "Execute: " . $package->getPath(),
  245. [
  246. 'process' => $package->getPath(),
  247. 'count' => count($package->getFiles()),
  248. ]
  249. );
  250. $this->appState->emulateAreaCode(
  251. $package->getArea() == Package::BASE_AREA ? 'global' : $package->getArea(),
  252. function () use ($package) {
  253. // emulate application locale needed for correct file path resolving
  254. $this->localeResolver->setLocale($package->getLocale());
  255. // execute package pre-processors
  256. // (may add more files to deploy, so it needs to be executed in main thread)
  257. foreach ($package->getPreProcessors() as $processor) {
  258. $processor->process($package, $this->options);
  259. }
  260. }
  261. );
  262. if ($this->isCanBeParalleled()) {
  263. $pid = pcntl_fork();
  264. if ($pid === -1) {
  265. throw new \RuntimeException('Unable to fork a new process');
  266. }
  267. if ($pid) {
  268. $this->inProgress[$package->getPath()] = $package;
  269. $this->processIds[$package->getPath()] = $pid;
  270. return true;
  271. }
  272. // process child process
  273. $this->inProgress = [];
  274. $this->deployPackageService->deploy($package, $this->options, true);
  275. exit(0);
  276. } else {
  277. $this->deployPackageService->deploy($package, $this->options);
  278. return true;
  279. }
  280. }
  281. /**
  282. * @param Package $package
  283. * @return bool
  284. */
  285. private function isDeployed(Package $package)
  286. {
  287. if ($this->isCanBeParalleled()) {
  288. if ($package->getState() === null) {
  289. $pid = pcntl_waitpid($this->getPid($package), $status, WNOHANG);
  290. if ($pid === $this->getPid($package)) {
  291. $package->setState(Package::STATE_COMPLETED);
  292. unset($this->inProgress[$package->getPath()]);
  293. return pcntl_wexitstatus($status) === 0;
  294. }
  295. return false;
  296. }
  297. }
  298. return $package->getState();
  299. }
  300. /**
  301. * @param Package $package
  302. * @return int|null
  303. */
  304. private function getPid(Package $package)
  305. {
  306. return isset($this->processIds[$package->getPath()])
  307. ? $this->processIds[$package->getPath()]
  308. : null;
  309. }
  310. /**
  311. * @return bool
  312. */
  313. private function checkTimeout()
  314. {
  315. return time() - $this->lastJobStarted < $this->maxExecTime;
  316. }
  317. /**
  318. * Free resources
  319. *
  320. * Protect against zombie process
  321. *
  322. * @return void
  323. */
  324. public function __destruct()
  325. {
  326. foreach ($this->inProgress as $package) {
  327. if (pcntl_waitpid($this->getPid($package), $status) === -1) {
  328. throw new \RuntimeException(
  329. 'Error while waiting for package deployed: ' . $this->getPid($package) . '; Status: ' . $status
  330. );
  331. }
  332. }
  333. }
  334. }