ConsumersRunner.php 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\MessageQueue\Model\Cron;
  7. use Magento\Framework\App\ObjectManager;
  8. use Magento\Framework\MessageQueue\ConnectionTypeResolver;
  9. use Magento\Framework\MessageQueue\Consumer\Config\ConsumerConfigItemInterface;
  10. use Magento\Framework\ShellInterface;
  11. use Magento\Framework\MessageQueue\Consumer\ConfigInterface as ConsumerConfigInterface;
  12. use Magento\Framework\App\DeploymentConfig;
  13. use Psr\Log\LoggerInterface;
  14. use Symfony\Component\Process\PhpExecutableFinder;
  15. use Magento\MessageQueue\Model\Cron\ConsumersRunner\PidConsumerManager;
  16. /**
  17. * Class for running consumers processes by cron
  18. */
  19. class ConsumersRunner
  20. {
  21. /**
  22. * Extension of PID file
  23. */
  24. const PID_FILE_EXT = '.pid';
  25. /**
  26. * Shell command line wrapper for executing command in background
  27. *
  28. * @var ShellInterface
  29. */
  30. private $shellBackground;
  31. /**
  32. * Consumer config provider
  33. *
  34. * @var ConsumerConfigInterface
  35. */
  36. private $consumerConfig;
  37. /**
  38. * Application deployment configuration
  39. *
  40. * @var DeploymentConfig
  41. */
  42. private $deploymentConfig;
  43. /**
  44. * The executable finder specifically designed for the PHP executable
  45. *
  46. * @var PhpExecutableFinder
  47. */
  48. private $phpExecutableFinder;
  49. /**
  50. * The class for checking status of process by PID
  51. *
  52. * @var PidConsumerManager
  53. */
  54. private $pidConsumerManager;
  55. /**
  56. * @var ConnectionTypeResolver
  57. */
  58. private $mqConnectionTypeResolver;
  59. /**
  60. * @var LoggerInterface
  61. */
  62. private $logger;
  63. /**
  64. * @param PhpExecutableFinder $phpExecutableFinder The executable finder specifically designed
  65. * for the PHP executable
  66. * @param ConsumerConfigInterface $consumerConfig The consumer config provider
  67. * @param DeploymentConfig $deploymentConfig The application deployment configuration
  68. * @param ShellInterface $shellBackground The shell command line wrapper for executing command in background
  69. * @param PidConsumerManager $pidConsumerManager The class for checking status of process by PID
  70. * @param ConnectionTypeResolver $mqConnectionTypeResolver Consumer connection resolver
  71. * @param LoggerInterface $logger Logger
  72. */
  73. public function __construct(
  74. PhpExecutableFinder $phpExecutableFinder,
  75. ConsumerConfigInterface $consumerConfig,
  76. DeploymentConfig $deploymentConfig,
  77. ShellInterface $shellBackground,
  78. PidConsumerManager $pidConsumerManager,
  79. ConnectionTypeResolver $mqConnectionTypeResolver = null,
  80. LoggerInterface $logger = null
  81. ) {
  82. $this->phpExecutableFinder = $phpExecutableFinder;
  83. $this->consumerConfig = $consumerConfig;
  84. $this->deploymentConfig = $deploymentConfig;
  85. $this->shellBackground = $shellBackground;
  86. $this->pidConsumerManager = $pidConsumerManager;
  87. $this->mqConnectionTypeResolver = $mqConnectionTypeResolver
  88. ?: ObjectManager::getInstance()->get(ConnectionTypeResolver::class);
  89. $this->logger = $logger
  90. ?: ObjectManager::getInstance()->get(LoggerInterface::class);
  91. }
  92. /**
  93. * Runs consumers processes
  94. */
  95. public function run()
  96. {
  97. $runByCron = $this->deploymentConfig->get('cron_consumers_runner/cron_run', true);
  98. if (!$runByCron) {
  99. return;
  100. }
  101. $maxMessages = (int) $this->deploymentConfig->get('cron_consumers_runner/max_messages', 10000);
  102. $allowedConsumers = $this->deploymentConfig->get('cron_consumers_runner/consumers', []);
  103. $php = $this->phpExecutableFinder->find() ?: 'php';
  104. foreach ($this->consumerConfig->getConsumers() as $consumer) {
  105. if (!$this->canBeRun($consumer, $allowedConsumers)) {
  106. continue;
  107. }
  108. $consumerName = $consumer->getName();
  109. $arguments = [
  110. $consumerName,
  111. '--pid-file-path=' . $this->getPidFilePath($consumerName),
  112. ];
  113. if ($maxMessages) {
  114. $arguments[] = '--max-messages=' . $maxMessages;
  115. }
  116. $command = $php . ' ' . BP . '/bin/magento queue:consumers:start %s %s'
  117. . ($maxMessages ? ' %s' : '');
  118. $this->shellBackground->execute($command, $arguments);
  119. }
  120. }
  121. /**
  122. * Checks that the consumer can be run
  123. *
  124. * @param ConsumerConfigItemInterface $consumerConfig The consumer config
  125. * @param array $allowedConsumers The list of allowed consumers
  126. * If $allowedConsumers is empty it means that all consumers are allowed
  127. * @return bool Returns true if the consumer can be run
  128. * @throws \Magento\Framework\Exception\FileSystemException
  129. */
  130. private function canBeRun(ConsumerConfigItemInterface $consumerConfig, array $allowedConsumers = []): bool
  131. {
  132. $consumerName = $consumerConfig->getName();
  133. if (!empty($allowedConsumers) && !in_array($consumerName, $allowedConsumers)) {
  134. return false;
  135. }
  136. if ($this->pidConsumerManager->isRun($this->getPidFilePath($consumerName))) {
  137. return false;
  138. }
  139. $connectionName = $consumerConfig->getConnection();
  140. try {
  141. $this->mqConnectionTypeResolver->getConnectionType($connectionName);
  142. } catch (\LogicException $e) {
  143. $this->logger->info(sprintf(
  144. 'Consumer "%s" skipped as required connection "%s" is not configured. %s',
  145. $consumerName,
  146. $connectionName,
  147. $e->getMessage()
  148. ));
  149. return false;
  150. }
  151. return true;
  152. }
  153. /**
  154. * Returns default path to file with PID by consumers name
  155. *
  156. * @param string $consumerName The consumers name
  157. * @return string The path to file with PID
  158. */
  159. private function getPidFilePath($consumerName)
  160. {
  161. $sanitizedHostname = preg_replace('/[^a-z0-9]/i', '', gethostname());
  162. return $consumerName . '-' . $sanitizedHostname . static::PID_FILE_EXT;
  163. }
  164. }