ConsumerRunner.php 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\MessageQueue\Model;
  7. use Magento\Framework\MessageQueue\ConsumerFactory;
  8. use Magento\Framework\Exception\LocalizedException;
  9. use Magento\Framework\App\MaintenanceMode;
  10. use Magento\Framework\App\ObjectManager;
  11. /**
  12. * Consumer runner class is used to run consumer, which name matches the magic method invoked on this class.
  13. *
  14. * Is used to schedule consumers execution in crontab.xml as follows:
  15. * <code>
  16. * <job name="consumerConsumerName" instance="Magento\MessageQueue\Model\ConsumerRunner" method="consumerName">
  17. * </code>
  18. * Where <i>consumerName</i> should be a valid name of consumer registered in some queue.xml
  19. *
  20. * @api
  21. * @since 100.0.2
  22. */
  23. class ConsumerRunner
  24. {
  25. /**
  26. * @var ConsumerFactory
  27. */
  28. private $consumerFactory;
  29. /**
  30. * @var MaintenanceMode
  31. */
  32. private $maintenanceMode;
  33. /**
  34. * @var integer
  35. */
  36. private $maintenanceSleepInterval;
  37. /**
  38. * Initialize dependencies.
  39. *
  40. * @param ConsumerFactory $consumerFactory
  41. * @param MaintenanceMode $maintenanceMode
  42. * @param integer $maintenanceSleepInterval
  43. */
  44. public function __construct(
  45. ConsumerFactory $consumerFactory,
  46. MaintenanceMode $maintenanceMode = null,
  47. $maintenanceSleepInterval = 30
  48. ) {
  49. $this->consumerFactory = $consumerFactory;
  50. $this->maintenanceMode = $maintenanceMode ?: ObjectManager::getInstance()->get(MaintenanceMode::class);
  51. $this->maintenanceSleepInterval = $maintenanceSleepInterval;
  52. }
  53. /**
  54. * Process messages in queue using consumer, which name is equal to the current magic method name.
  55. *
  56. * @param string $name
  57. * @param array $arguments
  58. * @throws LocalizedException
  59. * @return void
  60. */
  61. public function __call($name, $arguments)
  62. {
  63. try {
  64. $consumer = $this->consumerFactory->get($name);
  65. } catch (\Exception $e) {
  66. $errorMsg = '"%callbackMethod" callback method specified in crontab.xml '
  67. . 'must have corresponding consumer declared in some queue.xml.';
  68. throw new LocalizedException(__($errorMsg, ['callbackMethod' => $name]));
  69. }
  70. if (!$this->maintenanceMode->isOn()) {
  71. $consumer->process();
  72. } else {
  73. sleep($this->maintenanceSleepInterval);
  74. }
  75. }
  76. }