ConsumerConfiguration.php 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\Framework\MessageQueue;
  7. use Magento\Framework\MessageQueue\ConfigInterface as MessageQueueConfig;
  8. use Magento\Framework\MessageQueue\Consumer\ConfigInterface as ConsumerConfig;
  9. use Magento\Framework\Communication\ConfigInterface as CommunicationConfig;
  10. /**
  11. * Value class which stores the configuration
  12. */
  13. class ConsumerConfiguration implements ConsumerConfigurationInterface
  14. {
  15. /**
  16. * @deprecated
  17. * @see ConsumerConfigurationInterface::TOPIC_TYPE
  18. */
  19. const CONSUMER_TYPE = "consumer_type";
  20. /**
  21. * @deprecated
  22. * @see ConsumerConfigurationInterface::TOPIC_HANDLERS
  23. */
  24. const HANDLERS = 'handlers';
  25. /**
  26. * @var array
  27. */
  28. private $data;
  29. /**
  30. * @var QueueRepository
  31. */
  32. private $queueRepository;
  33. /**
  34. * @var ConsumerConfig
  35. */
  36. private $consumerConfig;
  37. /**
  38. * @var CommunicationConfig
  39. */
  40. private $communicationConfig;
  41. /**
  42. * Initialize dependencies.
  43. *
  44. * @param QueueRepository $queueRepository
  45. * @param MessageQueueConfig $messageQueueConfig
  46. * @param array $data configuration data
  47. *
  48. * @SuppressWarnings(PHPMD.UnusedFormalParameter)
  49. */
  50. public function __construct(QueueRepository $queueRepository, MessageQueueConfig $messageQueueConfig, $data = [])
  51. {
  52. $this->data = $data;
  53. $this->queueRepository = $queueRepository;
  54. }
  55. /**
  56. * {@inheritdoc}
  57. */
  58. public function getConsumerName()
  59. {
  60. return $this->getData(self::CONSUMER_NAME);
  61. }
  62. /**
  63. * {@inheritdoc}
  64. */
  65. public function getMaxMessages()
  66. {
  67. return $this->getData(self::MAX_MESSAGES);
  68. }
  69. /**
  70. * {@inheritdoc}
  71. */
  72. public function getQueueName()
  73. {
  74. return $this->getData(self::QUEUE_NAME);
  75. }
  76. /**
  77. * {@inheritdoc}
  78. */
  79. public function getType()
  80. {
  81. $topics = $this->getData(self::TOPICS);
  82. if (count($topics) > 1) {
  83. throw new \LogicException(
  84. 'Current method is deprecated and does not support more than 1 topic declarations for consumer. '
  85. . 'Use \Magento\Framework\MessageQueue\ConsumerConfiguration::getConsumerType instead. '
  86. . "Multiple topics declared for consumer '{$this->getConsumerName()}'"
  87. );
  88. } elseif (count($topics) < 1) {
  89. throw new \LogicException(
  90. "There must be at least one topic declared for consumer '{$this->getConsumerName()}'."
  91. );
  92. }
  93. // Get the only topic and read consumer type from its declaration. Necessary for backward compatibility
  94. $topicConfig = reset($topics);
  95. return $topicConfig[self::TOPIC_TYPE];
  96. }
  97. /**
  98. * {@inheritdoc}
  99. */
  100. public function getHandlers($topicName)
  101. {
  102. return $this->getTopicConfig($topicName)[self::TOPIC_HANDLERS];
  103. }
  104. /**
  105. * {@inheritdoc}
  106. */
  107. public function getTopicNames()
  108. {
  109. $topics = $this->getData(self::TOPICS);
  110. return is_array($topics) && count($topics) ? array_keys($topics) : [];
  111. }
  112. /**
  113. * {@inheritdoc}
  114. */
  115. public function getQueue()
  116. {
  117. $connectionName = $this->getConsumerConfig()->getConsumer($this->getConsumerName())->getConnection();
  118. return $this->queueRepository->get($connectionName, $this->getQueueName());
  119. }
  120. /**
  121. * {@inheritdoc}
  122. */
  123. public function getMessageSchemaType($topicName)
  124. {
  125. return $this->getCommunicationConfig()->getTopic($topicName)[CommunicationConfig::TOPIC_REQUEST_TYPE];
  126. }
  127. /**
  128. * Get topic configuration for current consumer.
  129. * @param string $topicName
  130. * @return array
  131. * @throws \LogicException
  132. */
  133. private function getTopicConfig($topicName)
  134. {
  135. if (!isset($this->getData(self::TOPICS)[$topicName])) {
  136. throw new \LogicException("Consumer configuration for {$topicName} topic not found.");
  137. }
  138. return $this->getData(self::TOPICS)[$topicName];
  139. }
  140. /**
  141. * Get specified data item.
  142. *
  143. * @param string $key
  144. * @return string|null
  145. */
  146. private function getData($key)
  147. {
  148. if (!isset($this->data[$key])) {
  149. return null;
  150. }
  151. return $this->data[$key];
  152. }
  153. /**
  154. * Get consumer config.
  155. *
  156. * @return ConsumerConfig
  157. *
  158. * @deprecated 102.0.1
  159. */
  160. private function getConsumerConfig()
  161. {
  162. if ($this->consumerConfig === null) {
  163. $this->consumerConfig = \Magento\Framework\App\ObjectManager::getInstance()->get(ConsumerConfig::class);
  164. }
  165. return $this->consumerConfig;
  166. }
  167. /**
  168. * Get communication config.
  169. *
  170. * @return CommunicationConfig
  171. *
  172. * @deprecated 102.0.1
  173. */
  174. private function getCommunicationConfig()
  175. {
  176. if ($this->communicationConfig === null) {
  177. $this->communicationConfig = \Magento\Framework\App\ObjectManager::getInstance()
  178. ->get(CommunicationConfig::class);
  179. }
  180. return $this->communicationConfig;
  181. }
  182. }