DataMapper.php 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\Framework\MessageQueue\Topology\Config\QueueConfigItem;
  7. use Magento\Framework\MessageQueue\Topology\Config\Data;
  8. use Magento\Framework\Communication\ConfigInterface as CommunicationConfig;
  9. use Magento\Framework\Exception\LocalizedException;
  10. use Magento\Framework\MessageQueue\Rpc\ResponseQueueNameBuilder;
  11. use Magento\Framework\Phrase;
  12. /**
  13. * Topology queue config data mapper.
  14. */
  15. class DataMapper
  16. {
  17. /**
  18. * Config data.
  19. *
  20. * @var array
  21. */
  22. private $mappedData;
  23. /**
  24. * @var Data
  25. */
  26. private $configData;
  27. /**
  28. * @var CommunicationConfig
  29. */
  30. private $communicationConfig;
  31. /**
  32. * @var ResponseQueueNameBuilder
  33. */
  34. private $queueNameBuilder;
  35. /**
  36. * Initialize dependencies.
  37. *
  38. * @param Data $configData
  39. * @param CommunicationConfig $communicationConfig
  40. * @param ResponseQueueNameBuilder $queueNameBuilder
  41. */
  42. public function __construct(
  43. Data $configData,
  44. CommunicationConfig $communicationConfig,
  45. ResponseQueueNameBuilder $queueNameBuilder
  46. ) {
  47. $this->configData = $configData;
  48. $this->communicationConfig = $communicationConfig;
  49. $this->queueNameBuilder = $queueNameBuilder;
  50. }
  51. /**
  52. * Get mapped config data.
  53. *
  54. * @return array
  55. */
  56. public function getMappedData()
  57. {
  58. if (null === $this->mappedData) {
  59. $this->mappedData = [];
  60. foreach ($this->configData->get() as $exchange) {
  61. $connection = $exchange['connection'];
  62. foreach ($exchange['bindings'] as $binding) {
  63. if ($binding['destinationType'] === 'queue') {
  64. $queueItems = $this->createQueueItems($binding['destination'], $binding['topic'], $connection);
  65. $this->mappedData = array_merge($this->mappedData, $queueItems);
  66. }
  67. }
  68. }
  69. }
  70. return $this->mappedData;
  71. }
  72. /**
  73. * Create queue config item.
  74. *
  75. * @param string $name
  76. * @param string $topic
  77. * @param string $connection
  78. * @return array
  79. */
  80. private function createQueueItems($name, $topic, $connection)
  81. {
  82. $output = [];
  83. $synchronousTopics = [];
  84. if (strpos($topic, '*') !== false || strpos($topic, '#') !== false) {
  85. $synchronousTopics = $this->matchSynchronousTopics($topic);
  86. } elseif ($this->isSynchronousTopic($topic)) {
  87. $synchronousTopics[$topic] = $topic;
  88. }
  89. foreach ($synchronousTopics as $topicName) {
  90. $callbackQueueName = $this->queueNameBuilder->getQueueName($topicName);
  91. $output[$callbackQueueName . '--' . $connection] = [
  92. 'name' => $callbackQueueName,
  93. 'connection' => $connection,
  94. 'durable' => true,
  95. 'autoDelete' => false,
  96. 'arguments' => [],
  97. ];
  98. }
  99. $output[$name . '--' . $connection] = [
  100. 'name' => $name,
  101. 'connection' => $connection,
  102. 'durable' => true,
  103. 'autoDelete' => false,
  104. 'arguments' => [],
  105. ];
  106. return $output;
  107. }
  108. /**
  109. * Check whether the topic is in synchronous mode
  110. *
  111. * @param string $topicName
  112. * @return bool
  113. * @throws LocalizedException
  114. */
  115. private function isSynchronousTopic($topicName)
  116. {
  117. try {
  118. $topic = $this->communicationConfig->getTopic($topicName);
  119. $isSync = (bool)$topic[CommunicationConfig::TOPIC_IS_SYNCHRONOUS];
  120. } catch (LocalizedException $e) {
  121. throw new LocalizedException(new Phrase('Error while checking if topic is synchronous'));
  122. }
  123. return $isSync;
  124. }
  125. /**
  126. * Generate topics list based on wildcards.
  127. *
  128. * @param string $wildcard
  129. * @return array
  130. */
  131. private function matchSynchronousTopics($wildcard)
  132. {
  133. $topicDefinitions = array_filter(
  134. $this->communicationConfig->getTopics(),
  135. function ($item) {
  136. return (bool)$item[CommunicationConfig::TOPIC_IS_SYNCHRONOUS];
  137. }
  138. );
  139. $topics = [];
  140. $pattern = $this->buildWildcardPattern($wildcard);
  141. foreach (array_keys($topicDefinitions) as $topicName) {
  142. if (preg_match($pattern, $topicName)) {
  143. $topics[$topicName] = $topicName;
  144. }
  145. }
  146. return $topics;
  147. }
  148. /**
  149. * Construct perl regexp pattern for matching topic names from wildcard key.
  150. *
  151. * @param string $wildcardKey
  152. * @return string
  153. */
  154. private function buildWildcardPattern($wildcardKey)
  155. {
  156. $pattern = '/^' . str_replace('.', '\.', $wildcardKey);
  157. $pattern = str_replace('#', '.+', $pattern);
  158. $pattern = str_replace('*', '[^\.]+', $pattern);
  159. $pattern .= strpos($wildcardKey, '#') === strlen($wildcardKey) ? '/' : '$/';
  160. return $pattern;
  161. }
  162. }