123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372 |
- <?php
- /**
- * Copyright © Magento, Inc. All rights reserved.
- * See COPYING.txt for license details.
- */
- namespace Magento\Deploy\Process;
- use Magento\Deploy\Package\Package;
- use Magento\Deploy\Service\DeployPackage;
- use Magento\Framework\App\ResourceConnection;
- use Psr\Log\LoggerInterface;
- use Magento\Framework\App\State as AppState;
- use Magento\Framework\Locale\ResolverInterface as LocaleResolver;
- /**
- * Deployment Queue
- *
- * Deploy packages in parallel forks (if available)
- */
- class Queue
- {
- /**
- * Default max amount of processes
- */
- const DEFAULT_MAX_PROCESSES_AMOUNT = 4;
- /**
- * Default max execution time
- */
- const DEFAULT_MAX_EXEC_TIME = 400;
- /**
- * @var array
- */
- private $packages = [];
- /**
- * @var int[]
- */
- private $processIds = [];
- /**
- * @var Package[]
- */
- private $inProgress = [];
- /**
- * @var int
- */
- private $maxProcesses;
- /**
- * @var int
- */
- private $maxExecTime;
- /**
- * @var AppState
- */
- private $appState;
- /**
- * @var LocaleResolver
- */
- private $localeResolver;
- /**
- * @var ResourceConnection
- */
- private $resourceConnection;
- /**
- * @var LoggerInterface
- */
- private $logger;
- /**
- * @var DeployPackage
- */
- private $deployPackageService;
- /**
- * @var array
- */
- private $options = [];
- /**
- * @var int
- */
- private $start = 0;
- /**
- * @var int
- */
- private $lastJobStarted = 0;
- /**
- * @param AppState $appState
- * @param LocaleResolver $localeResolver
- * @param ResourceConnection $resourceConnection
- * @param LoggerInterface $logger
- * @param DeployPackage $deployPackageService
- * @param array $options
- * @param int $maxProcesses
- * @param int $maxExecTime
- */
- public function __construct(
- AppState $appState,
- LocaleResolver $localeResolver,
- ResourceConnection $resourceConnection,
- LoggerInterface $logger,
- DeployPackage $deployPackageService,
- array $options = [],
- $maxProcesses = self::DEFAULT_MAX_PROCESSES_AMOUNT,
- $maxExecTime = self::DEFAULT_MAX_EXEC_TIME
- ) {
- $this->appState = $appState;
- $this->localeResolver = $localeResolver;
- $this->resourceConnection = $resourceConnection;
- $this->logger = $logger;
- $this->deployPackageService = $deployPackageService;
- $this->options = $options;
- $this->maxProcesses = $maxProcesses;
- $this->maxExecTime = $maxExecTime;
- }
- /**
- * @param Package $package
- * @param Package[] $dependencies
- * @return bool true on success
- */
- public function add(Package $package, array $dependencies = [])
- {
- $this->packages[$package->getPath()] = [
- 'package' => $package,
- 'dependencies' => $dependencies
- ];
- return true;
- }
- /**
- * @return Package[]
- */
- public function getPackages()
- {
- return $this->packages;
- }
- /**
- * Process jobs
- *
- * @return int
- */
- public function process()
- {
- $returnStatus = 0;
- $this->start = $this->lastJobStarted = time();
- $packages = $this->packages;
- while (count($packages) && $this->checkTimeout()) {
- foreach ($packages as $name => $packageJob) {
- $this->assertAndExecute($name, $packages, $packageJob);
- }
- $this->logger->info('.');
- sleep(3);
- foreach ($this->inProgress as $name => $package) {
- if ($this->isDeployed($package)) {
- unset($this->inProgress[$name]);
- }
- }
- }
- $this->awaitForAllProcesses();
- return $returnStatus;
- }
- /**
- * Check that all depended packages deployed and execute
- *
- * @param string $name
- * @param array $packages
- * @param array $packageJob
- * @return void
- */
- private function assertAndExecute($name, array & $packages, array $packageJob)
- {
- /** @var Package $package */
- $package = $packageJob['package'];
- $dependenciesNotFinished = false;
- if ($package->getParent() && $package->getParent() !== $package) {
- foreach ($packageJob['dependencies'] as $dependencyName => $dependency) {
- if (!$this->isDeployed($dependency)) {
- //If it's not present in $packages then it's already
- //in progress so just waiting...
- if (!array_key_exists($dependencyName, $packages)) {
- $dependenciesNotFinished = true;
- } else {
- $this->assertAndExecute(
- $dependencyName,
- $packages,
- $packages[$dependencyName]
- );
- }
- }
- }
- }
- $this->executePackage($package, $name, $packages, $dependenciesNotFinished);
- }
- /**
- * @param Package $package
- * @param string $name
- * @param array $packages
- * @param bool $dependenciesNotFinished
- * @return void
- */
- private function executePackage(
- Package $package,
- string $name,
- array &$packages,
- bool $dependenciesNotFinished
- ) {
- if (!$dependenciesNotFinished
- && !$this->isDeployed($package)
- && ($this->maxProcesses < 2 || (count($this->inProgress) < $this->maxProcesses))
- ) {
- unset($packages[$name]);
- $this->execute($package);
- }
- }
- /**
- * Need to wait till all processes finished
- *
- * @return void
- */
- private function awaitForAllProcesses()
- {
- while ($this->inProgress && $this->checkTimeout()) {
- foreach ($this->inProgress as $name => $package) {
- if ($this->isDeployed($package)) {
- unset($this->inProgress[$name]);
- }
- }
- $this->logger->info('.');
- sleep(5);
- }
- if ($this->isCanBeParalleled()) {
- // close connections only if ran with forks
- $this->resourceConnection->closeConnection();
- }
- }
- /**
- * @return bool
- */
- private function isCanBeParalleled()
- {
- return function_exists('pcntl_fork') && $this->maxProcesses > 1;
- }
- /**
- * @param Package $package
- * @return bool true on success for main process and exit for child process
- * @SuppressWarnings(PHPMD.ExitExpression)
- */
- private function execute(Package $package)
- {
- $this->lastJobStarted = time();
- $this->logger->info(
- "Execute: " . $package->getPath(),
- [
- 'process' => $package->getPath(),
- 'count' => count($package->getFiles()),
- ]
- );
- $this->appState->emulateAreaCode(
- $package->getArea() == Package::BASE_AREA ? 'global' : $package->getArea(),
- function () use ($package) {
- // emulate application locale needed for correct file path resolving
- $this->localeResolver->setLocale($package->getLocale());
- // execute package pre-processors
- // (may add more files to deploy, so it needs to be executed in main thread)
- foreach ($package->getPreProcessors() as $processor) {
- $processor->process($package, $this->options);
- }
- }
- );
- if ($this->isCanBeParalleled()) {
- $pid = pcntl_fork();
- if ($pid === -1) {
- throw new \RuntimeException('Unable to fork a new process');
- }
- if ($pid) {
- $this->inProgress[$package->getPath()] = $package;
- $this->processIds[$package->getPath()] = $pid;
- return true;
- }
- // process child process
- $this->inProgress = [];
- $this->deployPackageService->deploy($package, $this->options, true);
- exit(0);
- } else {
- $this->deployPackageService->deploy($package, $this->options);
- return true;
- }
- }
- /**
- * @param Package $package
- * @return bool
- */
- private function isDeployed(Package $package)
- {
- if ($this->isCanBeParalleled()) {
- if ($package->getState() === null) {
- $pid = pcntl_waitpid($this->getPid($package), $status, WNOHANG);
- if ($pid === $this->getPid($package)) {
- $package->setState(Package::STATE_COMPLETED);
- unset($this->inProgress[$package->getPath()]);
- return pcntl_wexitstatus($status) === 0;
- }
- return false;
- }
- }
- return $package->getState();
- }
- /**
- * @param Package $package
- * @return int|null
- */
- private function getPid(Package $package)
- {
- return isset($this->processIds[$package->getPath()])
- ? $this->processIds[$package->getPath()]
- : null;
- }
- /**
- * @return bool
- */
- private function checkTimeout()
- {
- return time() - $this->lastJobStarted < $this->maxExecTime;
- }
- /**
- * Free resources
- *
- * Protect against zombie process
- *
- * @return void
- */
- public function __destruct()
- {
- foreach ($this->inProgress as $package) {
- if (pcntl_waitpid($this->getPid($package), $status) === -1) {
- throw new \RuntimeException(
- 'Error while waiting for package deployed: ' . $this->getPid($package) . '; Status: ' . $status
- );
- }
- }
- }
- }
|