Config.php 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\Framework\MessageQueue;
  7. use Magento\Framework\Exception\LocalizedException;
  8. use Magento\Framework\Phrase;
  9. /**
  10. * Queue configuration.
  11. *
  12. * @deprecated 102.0.1
  13. */
  14. class Config implements ConfigInterface
  15. {
  16. /**
  17. * @var \Magento\Framework\MessageQueue\Config\Data
  18. */
  19. protected $queueConfigData;
  20. /**
  21. * @param Config\Data $queueConfigData
  22. */
  23. public function __construct(Config\Data $queueConfigData)
  24. {
  25. $this->queueConfigData = $queueConfigData;
  26. }
  27. /**
  28. * @inheritdoc
  29. */
  30. public function getExchangeByTopic($topicName)
  31. {
  32. $publisherConfig = $this->getPublisherConfigByTopic($topicName);
  33. return $publisherConfig[ConfigInterface::PUBLISHER_EXCHANGE] ?? null;
  34. }
  35. /**
  36. * @inheritdoc
  37. */
  38. public function getQueuesByTopic($topic)
  39. {
  40. $publisherConfig = $this->getPublisherConfigByTopic($topic);
  41. $exchange = isset($publisherConfig[ConfigInterface::PUBLISHER_NAME])
  42. ? $publisherConfig[ConfigInterface::PUBLISHER_NAME]
  43. : null;
  44. /**
  45. * Exchange should be taken into account here to avoid retrieving queues, related to another exchange,
  46. * which is not currently associated with topic, but is configured in binds
  47. */
  48. $bindKey = $exchange . '--' . $topic;
  49. $output = $this->queueConfigData->get(ConfigInterface::EXCHANGE_TOPIC_TO_QUEUES_MAP . '/' . $bindKey);
  50. if (!$output) {
  51. throw new LocalizedException(
  52. new Phrase(
  53. 'No bindings configured for the "%topic" topic at "%exchange" exchange.',
  54. ['topic' => $topic, 'exchange' => $exchange]
  55. )
  56. );
  57. }
  58. return $output;
  59. }
  60. /**
  61. * @inheritdoc
  62. */
  63. public function getConnectionByTopic($topic)
  64. {
  65. try {
  66. $publisherConfig = $this->getPublisherConfigByTopic($topic);
  67. } catch (\Magento\Framework\Exception\LocalizedException $e) {
  68. return null;
  69. }
  70. return $publisherConfig[ConfigInterface::PUBLISHER_CONNECTION] ?? null;
  71. }
  72. /**
  73. * @inheritdoc
  74. */
  75. public function getConnectionByConsumer($consumer)
  76. {
  77. $connection = $this->queueConfigData->get(
  78. ConfigInterface::CONSUMERS . '/'. $consumer . '/'. ConfigInterface::CONSUMER_CONNECTION
  79. );
  80. if (!$connection) {
  81. throw new LocalizedException(
  82. new Phrase('Consumer "%consumer" has not connection.', ['consumer' => $consumer])
  83. );
  84. }
  85. return $connection;
  86. }
  87. /**
  88. * @inheritdoc
  89. */
  90. public function getMessageSchemaType($topic)
  91. {
  92. return $this->queueConfigData->get(
  93. ConfigInterface::TOPICS . '/' .
  94. $topic . '/' . ConfigInterface::TOPIC_SCHEMA . '/' . ConfigInterface::TOPIC_SCHEMA_TYPE
  95. );
  96. }
  97. /**
  98. * @inheritdoc
  99. */
  100. public function getConsumerNames()
  101. {
  102. $queueConfig = $this->queueConfigData->get(ConfigInterface::CONSUMERS, []);
  103. return array_keys($queueConfig);
  104. }
  105. /**
  106. * @inheritdoc
  107. */
  108. public function getConsumer($name)
  109. {
  110. return $this->queueConfigData->get(ConfigInterface::CONSUMERS . '/' . $name);
  111. }
  112. /**
  113. * @inheritdoc
  114. */
  115. public function getBinds()
  116. {
  117. return $this->queueConfigData->get(ConfigInterface::BINDS, []);
  118. }
  119. /**
  120. * @inheritdoc
  121. */
  122. public function getPublishers()
  123. {
  124. return $this->queueConfigData->get(ConfigInterface::PUBLISHERS, []);
  125. }
  126. /**
  127. * @inheritdoc
  128. */
  129. public function getConsumers()
  130. {
  131. return $this->queueConfigData->get(ConfigInterface::CONSUMERS, []);
  132. }
  133. /**
  134. * @inheritdoc
  135. */
  136. public function getTopic($name)
  137. {
  138. return $this->queueConfigData->get(ConfigInterface::TOPICS . '/' . $name);
  139. }
  140. /**
  141. * @inheritdoc
  142. */
  143. public function getPublisher($name)
  144. {
  145. return $this->queueConfigData->get(ConfigInterface::PUBLISHERS . '/' . $name);
  146. }
  147. /**
  148. * @inheritdoc
  149. */
  150. public function getResponseQueueName($topicName)
  151. {
  152. return ConfigInterface::RESPONSE_QUEUE_PREFIX . str_replace('-', '_', $topicName);
  153. }
  154. /**
  155. * Get publisher config by topic
  156. *
  157. * @param string $topicName
  158. * @return array|mixed|null
  159. * @throws LocalizedException
  160. */
  161. protected function getPublisherConfigByTopic($topicName)
  162. {
  163. $publisherName = $this->queueConfigData->get(
  164. ConfigInterface::TOPICS . '/' . $topicName . '/' . ConfigInterface::TOPIC_PUBLISHER
  165. );
  166. if (!$publisherName) {
  167. throw new LocalizedException(
  168. new Phrase('Message queue topic "%topic" is not configured.', ['topic' => $topicName])
  169. );
  170. }
  171. $publisherConfig = $this->queueConfigData->get(ConfigInterface::PUBLISHERS . '/' . $publisherName);
  172. if (!$publisherConfig) {
  173. throw new LocalizedException(
  174. new Phrase(
  175. 'Message queue publisher "%publisher" is not configured.',
  176. ['publisher' => $publisherName]
  177. )
  178. );
  179. }
  180. return $publisherConfig;
  181. }
  182. }