StartConsumerCommand.php 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\MessageQueue\Console;
  7. use Symfony\Component\Console\Command\Command;
  8. use Symfony\Component\Console\Input\InputArgument;
  9. use Symfony\Component\Console\Input\InputInterface;
  10. use Symfony\Component\Console\Input\InputOption;
  11. use Symfony\Component\Console\Output\OutputInterface;
  12. use Magento\Framework\MessageQueue\ConsumerFactory;
  13. use Magento\MessageQueue\Model\Cron\ConsumersRunner\PidConsumerManager;
  14. /**
  15. * Command for starting MessageQueue consumers.
  16. */
  17. class StartConsumerCommand extends Command
  18. {
  19. const ARGUMENT_CONSUMER = 'consumer';
  20. const OPTION_NUMBER_OF_MESSAGES = 'max-messages';
  21. const OPTION_BATCH_SIZE = 'batch-size';
  22. const OPTION_AREACODE = 'area-code';
  23. const PID_FILE_PATH = 'pid-file-path';
  24. const COMMAND_QUEUE_CONSUMERS_START = 'queue:consumers:start';
  25. /**
  26. * @var ConsumerFactory
  27. */
  28. private $consumerFactory;
  29. /**
  30. * @var \Magento\Framework\App\State
  31. */
  32. private $appState;
  33. /**
  34. * @var PidConsumerManager
  35. */
  36. private $pidConsumerManager;
  37. /**
  38. * StartConsumerCommand constructor.
  39. * {@inheritdoc}
  40. *
  41. * @param \Magento\Framework\App\State $appState
  42. * @param ConsumerFactory $consumerFactory
  43. * @param string $name
  44. * @param PidConsumerManager $pidConsumerManager
  45. */
  46. public function __construct(
  47. \Magento\Framework\App\State $appState,
  48. ConsumerFactory $consumerFactory,
  49. $name = null,
  50. PidConsumerManager $pidConsumerManager = null
  51. ) {
  52. $this->appState = $appState;
  53. $this->consumerFactory = $consumerFactory;
  54. $this->pidConsumerManager = $pidConsumerManager ?: \Magento\Framework\App\ObjectManager::getInstance()
  55. ->get(PidConsumerManager::class);
  56. parent::__construct($name);
  57. }
  58. /**
  59. * {@inheritdoc}
  60. */
  61. protected function execute(InputInterface $input, OutputInterface $output)
  62. {
  63. $consumerName = $input->getArgument(self::ARGUMENT_CONSUMER);
  64. $numberOfMessages = $input->getOption(self::OPTION_NUMBER_OF_MESSAGES);
  65. $batchSize = (int)$input->getOption(self::OPTION_BATCH_SIZE);
  66. $areaCode = $input->getOption(self::OPTION_AREACODE);
  67. $pidFilePath = $input->getOption(self::PID_FILE_PATH);
  68. if ($pidFilePath && $this->pidConsumerManager->isRun($pidFilePath)) {
  69. $output->writeln('<error>Consumer with the same PID is running</error>');
  70. return \Magento\Framework\Console\Cli::RETURN_FAILURE;
  71. }
  72. if ($pidFilePath) {
  73. $this->pidConsumerManager->savePid($pidFilePath);
  74. }
  75. if ($areaCode !== null) {
  76. $this->appState->setAreaCode($areaCode);
  77. } else {
  78. $this->appState->setAreaCode('global');
  79. }
  80. $consumer = $this->consumerFactory->get($consumerName, $batchSize);
  81. $consumer->process($numberOfMessages);
  82. return \Magento\Framework\Console\Cli::RETURN_SUCCESS;
  83. }
  84. /**
  85. * {@inheritdoc}
  86. */
  87. protected function configure()
  88. {
  89. $this->setName(self::COMMAND_QUEUE_CONSUMERS_START);
  90. $this->setDescription('Start MessageQueue consumer');
  91. $this->addArgument(
  92. self::ARGUMENT_CONSUMER,
  93. InputArgument::REQUIRED,
  94. 'The name of the consumer to be started.'
  95. );
  96. $this->addOption(
  97. self::OPTION_NUMBER_OF_MESSAGES,
  98. null,
  99. InputOption::VALUE_REQUIRED,
  100. 'The number of messages to be processed by the consumer before process termination. '
  101. . 'If not specified - terminate after processing all queued messages.'
  102. );
  103. $this->addOption(
  104. self::OPTION_BATCH_SIZE,
  105. null,
  106. InputOption::VALUE_REQUIRED,
  107. 'The number of messages per batch. Applicable for the batch consumer only.'
  108. );
  109. $this->addOption(
  110. self::OPTION_AREACODE,
  111. null,
  112. InputOption::VALUE_REQUIRED,
  113. 'The preferred area (global, adminhtml, etc...) '
  114. . 'default is global.'
  115. );
  116. $this->addOption(
  117. self::PID_FILE_PATH,
  118. null,
  119. InputOption::VALUE_REQUIRED,
  120. 'The file path for saving PID'
  121. );
  122. $this->setHelp(
  123. <<<HELP
  124. This command starts MessageQueue consumer by its name.
  125. To start consumer which will process all queued messages and terminate execution:
  126. <comment>%command.full_name% someConsumer</comment>
  127. To specify the number of messages which should be processed by consumer before its termination:
  128. <comment>%command.full_name% someConsumer --max-messages=50</comment>
  129. To specify the number of messages per batch for the batch consumer:
  130. <comment>%command.full_name% someConsumer --batch-size=500</comment>
  131. To specify the preferred area:
  132. <comment>%command.full_name% someConsumer --area-code='adminhtml'</comment>
  133. To save PID enter path:
  134. <comment>%command.full_name% someConsumer --pid-file-path='/var/someConsumer.pid'</comment>
  135. HELP
  136. );
  137. parent::configure();
  138. }
  139. }