ConsumerFactory.php 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\Framework\MessageQueue;
  7. use Magento\Framework\MessageQueue\ConfigInterface as QueueConfig;
  8. use Magento\Framework\Exception\LocalizedException;
  9. use Magento\Framework\ObjectManagerInterface;
  10. use Magento\Framework\Phrase;
  11. use Magento\Framework\MessageQueue\ConsumerInterface;
  12. use Magento\Framework\MessageQueue\Consumer\ConfigInterface as ConsumerConfig;
  13. use Magento\Framework\MessageQueue\Consumer\Config\ConsumerConfigItemInterface;
  14. use Magento\Framework\Communication\ConfigInterface as CommunicationConfig;
  15. /**
  16. * Class which creates Consumers
  17. * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
  18. */
  19. class ConsumerFactory
  20. {
  21. /**
  22. * Object Manager instance
  23. *
  24. * @var ObjectManagerInterface
  25. */
  26. private $objectManager = null;
  27. /**
  28. * @var ConsumerConfig
  29. */
  30. private $consumerConfig;
  31. /**
  32. * @var CommunicationConfig
  33. */
  34. private $communicationConfig;
  35. /**
  36. * Initialize dependencies.
  37. *
  38. * @param QueueConfig $queueConfig
  39. * @param ObjectManagerInterface $objectManager
  40. *
  41. * @SuppressWarnings(PHPMD.UnusedFormalParameter)
  42. */
  43. public function __construct(
  44. QueueConfig $queueConfig,
  45. ObjectManagerInterface $objectManager
  46. ) {
  47. $this->objectManager = $objectManager;
  48. }
  49. /**
  50. * Return the actual Consumer implementation for the given consumer name.
  51. *
  52. * @param string $consumerName
  53. * @param int $batchSize [optional]
  54. * @return ConsumerInterface
  55. * @throws LocalizedException
  56. */
  57. public function get($consumerName, $batchSize = 0)
  58. {
  59. $consumerConfig = $this->getConsumerConfig()->getConsumer($consumerName);
  60. if ($consumerConfig === null) {
  61. throw new LocalizedException(
  62. new Phrase('Specified consumer "%consumer" is not declared.', ['consumer' => $consumerName])
  63. );
  64. }
  65. return $this->objectManager->create(
  66. $consumerConfig->getConsumerInstance(),
  67. [
  68. 'configuration' => $this->createConsumerConfiguration($consumerConfig),
  69. 'batchSize' => $batchSize,
  70. ]
  71. );
  72. }
  73. /**
  74. * Creates the objects necessary for the ConsumerConfigurationInterface to configure a Consumer.
  75. *
  76. * @param ConsumerConfigItemInterface $consumerConfigItem
  77. * @return ConsumerConfigurationInterface
  78. */
  79. private function createConsumerConfiguration($consumerConfigItem)
  80. {
  81. $customConsumerHandlers = [];
  82. foreach ($consumerConfigItem->getHandlers() as $handlerConfig) {
  83. $customConsumerHandlers[] = [
  84. $this->objectManager->create($handlerConfig->getType()),
  85. $handlerConfig->getMethod()
  86. ];
  87. }
  88. $topics = [];
  89. foreach ($this->getCommunicationConfig()->getTopics() as $topicConfig) {
  90. $topicName = $topicConfig[CommunicationConfig::TOPIC_NAME];
  91. $topics[$topicName] = [
  92. ConsumerConfigurationInterface::TOPIC_HANDLERS => $customConsumerHandlers
  93. ?: $this->getHandlersFromCommunicationConfig($topicName),
  94. ConsumerConfigurationInterface::TOPIC_TYPE => $topicConfig[CommunicationConfig::TOPIC_IS_SYNCHRONOUS]
  95. ? ConsumerConfiguration::TYPE_SYNC
  96. : ConsumerConfiguration::TYPE_ASYNC
  97. ];
  98. }
  99. $configData = [
  100. ConsumerConfigurationInterface::CONSUMER_NAME => $consumerConfigItem->getName(),
  101. ConsumerConfigurationInterface::QUEUE_NAME => $consumerConfigItem->getQueue(),
  102. ConsumerConfigurationInterface::TOPICS => $topics,
  103. ConsumerConfigurationInterface::MAX_MESSAGES => $consumerConfigItem->getMaxMessages(),
  104. ];
  105. return $this->objectManager->create(
  106. \Magento\Framework\MessageQueue\ConsumerConfiguration::class,
  107. ['data' => $configData]
  108. );
  109. }
  110. /**
  111. * Get consumer config.
  112. *
  113. * @return ConsumerConfig
  114. *
  115. * @deprecated 102.0.1
  116. */
  117. private function getConsumerConfig()
  118. {
  119. if ($this->consumerConfig === null) {
  120. $this->consumerConfig = \Magento\Framework\App\ObjectManager::getInstance()->get(ConsumerConfig::class);
  121. }
  122. return $this->consumerConfig;
  123. }
  124. /**
  125. * Get communication config.
  126. *
  127. * @return CommunicationConfig
  128. *
  129. * @deprecated 102.0.1
  130. */
  131. private function getCommunicationConfig()
  132. {
  133. if ($this->communicationConfig === null) {
  134. $this->communicationConfig = \Magento\Framework\App\ObjectManager::getInstance()
  135. ->get(CommunicationConfig::class);
  136. }
  137. return $this->communicationConfig;
  138. }
  139. /**
  140. * Get handlers by topic based on communication config.
  141. *
  142. * @param string $topicName
  143. * @return array
  144. */
  145. private function getHandlersFromCommunicationConfig($topicName)
  146. {
  147. $topicConfig = $this->getCommunicationConfig()->getTopic($topicName);
  148. $handlers = [];
  149. foreach ($topicConfig[CommunicationConfig::TOPIC_HANDLERS] as $handlerConfig) {
  150. $handlers[] = [
  151. $this->objectManager->create($handlerConfig[CommunicationConfig::HANDLER_TYPE]),
  152. $handlerConfig[CommunicationConfig::HANDLER_METHOD]
  153. ];
  154. }
  155. return $handlers;
  156. }
  157. }