TopicConfig.php 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\Framework\MessageQueue\Config\Reader\Xml\Converter;
  7. use Magento\Framework\MessageQueue\ConfigInterface;
  8. use Magento\Framework\MessageQueue\Config\Validator;
  9. use Magento\Framework\Reflection\MethodsMap;
  10. use Magento\Framework\Communication\ConfigInterface as CommunicationConfig;
  11. use Magento\Framework\MessageQueue\ConfigInterface as QueueConfig;
  12. use Magento\Framework\MessageQueue\ConsumerInterface;
  13. /**
  14. * Converts MessageQueue config from \DOMDocument to array
  15. *
  16. * @deprecated 102.0.1
  17. */
  18. class TopicConfig implements \Magento\Framework\Config\ConverterInterface
  19. {
  20. const DEFAULT_TYPE = 'amqp';
  21. const DEFAULT_EXCHANGE = 'magento';
  22. const DEFAULT_INSTANCE = ConsumerInterface::class;
  23. /**
  24. * @var Validator
  25. */
  26. private $xmlValidator;
  27. /**
  28. * @var MethodsMap
  29. */
  30. private $methodsMap;
  31. /**
  32. * @var CommunicationConfig
  33. */
  34. private $communicationConfig;
  35. /**
  36. * Initialize dependencies.
  37. *
  38. * @param MethodsMap $methodsMap
  39. * @param Validator $xmlValidator
  40. * @param CommunicationConfig $communicationConfig
  41. */
  42. public function __construct(
  43. MethodsMap $methodsMap,
  44. Validator $xmlValidator,
  45. CommunicationConfig $communicationConfig
  46. ) {
  47. $this->methodsMap = $methodsMap;
  48. $this->xmlValidator = $xmlValidator;
  49. $this->communicationConfig = $communicationConfig;
  50. }
  51. /**
  52. * {@inheritDoc}
  53. */
  54. public function convert($source)
  55. {
  56. $topics = $this->extractTopics($source);
  57. $topics = $this->processWildcard($topics);
  58. $publishers = $this->buildPublishers($topics);
  59. $binds = $this->buildBinds($topics);
  60. $map = $this->buildExchangeTopicToQueue($topics);
  61. $consumers = $this->buildConsumers($topics);
  62. return [
  63. ConfigInterface::TOPICS => $this->buildTopicsConfiguration($topics),
  64. ConfigInterface::PUBLISHERS => $publishers,
  65. ConfigInterface::BINDS => $binds,
  66. ConfigInterface::CONSUMERS => $consumers,
  67. ConfigInterface::EXCHANGE_TOPIC_TO_QUEUES_MAP => $map,
  68. ];
  69. }
  70. /**
  71. * Generate list of topics.
  72. *
  73. * @param array $topics
  74. * @return array
  75. */
  76. private function buildTopicsConfiguration($topics)
  77. {
  78. $output = [];
  79. foreach ($topics as $topicName => $topicConfig) {
  80. $topicDefinition = $this->communicationConfig->getTopic($topicName);
  81. $schemaType =
  82. $topicDefinition['request_type'] == CommunicationConfig::TOPIC_REQUEST_TYPE_CLASS
  83. ? QueueConfig::TOPIC_SCHEMA_TYPE_OBJECT
  84. : QueueConfig::TOPIC_SCHEMA_TYPE_METHOD;
  85. $schemaValue = $topicDefinition[CommunicationConfig::TOPIC_REQUEST];
  86. $output[$topicName] = [
  87. 'name' => $topicName,
  88. 'schema' => [
  89. 'schema_type' => $schemaType,
  90. 'schema_value' => $schemaValue
  91. ],
  92. 'response_schema' => [
  93. 'schema_type' => isset($topicDefinition['response']) ? QueueConfig::TOPIC_SCHEMA_TYPE_OBJECT : null,
  94. 'schema_value' => $topicDefinition['response']
  95. ],
  96. 'is_synchronous' => $topicDefinition[CommunicationConfig::TOPIC_IS_SYNCHRONOUS],
  97. 'publisher' => $topicConfig['type'] . '-' . $topicConfig['exchange']
  98. ];
  99. }
  100. return $output;
  101. }
  102. /**
  103. * Generate consumers.
  104. *
  105. * @param array $topics
  106. * @return array
  107. */
  108. private function buildConsumers($topics)
  109. {
  110. $output = [];
  111. foreach ($topics as $topicName => $topicConfig) {
  112. $topic = $this->communicationConfig->getTopic($topicName);
  113. foreach ($topicConfig['queues'] as $queueName => $queueConfig) {
  114. $handlers = [];
  115. foreach ($queueConfig['handlers'] as $handler) {
  116. if (!isset($handler[QueueConfig::CONSUMER_CLASS])) {
  117. $handlerExploded = explode('::', $handler);
  118. unset($handler);
  119. $handler[QueueConfig::CONSUMER_CLASS] = $handlerExploded[0];
  120. $handler[QueueConfig::CONSUMER_METHOD] = $handlerExploded[1];
  121. }
  122. $handlers[] = $handler;
  123. }
  124. $queueConfig['handlers'] = $handlers;
  125. $output[$queueConfig['consumer']] = [
  126. 'name' => $queueConfig['consumer'],
  127. 'queue' => $queueName,
  128. 'handlers' => [$topicName => $queueConfig['handlers']],
  129. 'instance_type' => $queueConfig['consumerInstance'] != null
  130. ? $queueConfig['consumerInstance'] : self::DEFAULT_INSTANCE,
  131. 'consumer_type' => $topic[CommunicationConfig::TOPIC_IS_SYNCHRONOUS] ? 'sync' : 'async',
  132. 'max_messages' => $queueConfig['maxMessages'],
  133. 'connection' => $topicConfig['type']
  134. ];
  135. }
  136. }
  137. return $output;
  138. }
  139. /**
  140. * Generate topics list based on wildcards.
  141. *
  142. * @param array $topics
  143. * @return array
  144. */
  145. private function processWildcard($topics)
  146. {
  147. $topicDefinitions = $this->communicationConfig->getTopics();
  148. $wildcardKeys = [];
  149. $topicNames = array_keys($topics);
  150. foreach ($topicNames as $topicName) {
  151. if (strpos($topicName, '*') !== false || strpos($topicName, '#') !== false) {
  152. $wildcardKeys[] = $topicName;
  153. }
  154. }
  155. foreach (array_unique($wildcardKeys) as $wildcardKey) {
  156. $pattern = $this->xmlValidator->buildWildcardPattern($wildcardKey);
  157. foreach (array_keys($topicDefinitions) as $topicName) {
  158. if (preg_match($pattern, $topicName)) {
  159. if (isset($topics[$topicName])) {
  160. $topics[$topicName] = array_merge($topics[$topicName], $topics[$wildcardKey]);
  161. } else {
  162. $topics[$topicName] = $topics[$wildcardKey];
  163. }
  164. }
  165. }
  166. unset($topics[$wildcardKey]);
  167. }
  168. return $topics;
  169. }
  170. /**
  171. * Generate publishers
  172. *
  173. * @param array $topics
  174. * @return array
  175. */
  176. private function buildPublishers($topics)
  177. {
  178. $output = [];
  179. foreach ($topics as $topicConfig) {
  180. $publisherName = $topicConfig['type'] . '-' . $topicConfig['exchange'];
  181. $output[$publisherName] = [
  182. 'name' => $publisherName,
  183. 'connection' => $topicConfig['type'],
  184. 'exchange' => $topicConfig['exchange']
  185. ];
  186. }
  187. return $output;
  188. }
  189. /**
  190. * Generate binds
  191. *
  192. * @param array $topics
  193. * @return array
  194. */
  195. private function buildBinds($topics)
  196. {
  197. $output = [];
  198. foreach ($topics as $topicName => $topicConfig) {
  199. $queueNames = array_keys($topicConfig['queues']);
  200. foreach ($queueNames as $queueName) {
  201. $name = $topicName . '--' . $topicConfig['exchange']. '--' .$queueName;
  202. $output[$name] = [
  203. 'queue' => $queueName,
  204. 'exchange' => $topicConfig['exchange'],
  205. 'topic' => $topicName,
  206. ];
  207. }
  208. }
  209. return $output;
  210. }
  211. /**
  212. * Generate topic-to-queues map.
  213. *
  214. * @param array $topics
  215. * @return array
  216. */
  217. private function buildExchangeTopicToQueue($topics)
  218. {
  219. $output = [];
  220. foreach ($topics as $topicName => $topicConfig) {
  221. $key = $topicConfig['type'] . '-' . $topicConfig['exchange'] . '--' . $topicName;
  222. $queueNames = array_keys($topicConfig['queues']);
  223. foreach ($queueNames as $queueName) {
  224. $output[$key][] = $queueName;
  225. $output[$key] = array_unique($output[$key]);
  226. }
  227. }
  228. return $output;
  229. }
  230. /**
  231. * Extract topics configuration.
  232. *
  233. * @param \DOMDocument $config
  234. * @return array
  235. */
  236. private function extractTopics($config)
  237. {
  238. $output = [];
  239. /** @var $brokerNode \DOMElement */
  240. foreach ($config->getElementsByTagName('broker') as $brokerNode) {
  241. $topicName = $this->getAttributeValue($brokerNode, 'topic');
  242. $output[$topicName] = [
  243. ConfigInterface::TOPIC_NAME => $topicName,
  244. 'type' => $this->getAttributeValue($brokerNode, 'type', self::DEFAULT_TYPE),
  245. 'exchange' => $this->getAttributeValue($brokerNode, 'exchange', self::DEFAULT_EXCHANGE),
  246. 'consumerInstance' => $this->getAttributeValue($brokerNode, 'consumerInstance'),
  247. 'maxMessages' => $this->getAttributeValue($brokerNode, 'maxMessages'),
  248. 'queues' => $this->extractQueuesFromBroker($brokerNode, $topicName)
  249. ];
  250. }
  251. return $output;
  252. }
  253. /**
  254. * Extract queues configuration from the topic node.
  255. *
  256. * @param \DOMElement $brokerNode
  257. * @param string $topicName
  258. * @return array
  259. */
  260. protected function extractQueuesFromBroker(\DOMElement $brokerNode, $topicName)
  261. {
  262. $queues = [];
  263. $topicConfig = $this->communicationConfig->getTopic($topicName);
  264. /** @var $queueNode \DOMElement */
  265. foreach ($brokerNode->getElementsByTagName('queue') as $queueNode) {
  266. $handler = $this->getAttributeValue($queueNode, 'handler');
  267. $queueName = $this->getAttributeValue($queueNode, 'name');
  268. $queue = [
  269. 'name'=> $queueName,
  270. 'handlerName' => $this->getAttributeValue($queueNode, 'handlerName'),
  271. 'handlers' => $handler ? ['default' => $handler] : $topicConfig['handlers'],
  272. 'exchange' => $this->getAttributeValue($queueNode, 'exchange'),
  273. 'consumer' => $this->getAttributeValue($queueNode, 'consumer'),
  274. 'consumerInstance' => $this->getAttributeValue($queueNode, 'consumerInstance'),
  275. 'maxMessages' => $this->getAttributeValue($queueNode, 'maxMessages', null),
  276. 'type' => $this->getAttributeValue($queueNode, 'type')
  277. ];
  278. $queues[$queueName] = $queue;
  279. }
  280. return $queues;
  281. }
  282. /**
  283. * Get attribute value of the given node
  284. *
  285. * @param \DOMNode $node
  286. * @param string $attributeName
  287. * @param mixed $default
  288. * @return string|null
  289. */
  290. protected function getAttributeValue(\DOMNode $node, $attributeName, $default = null)
  291. {
  292. $item = $node->attributes->getNamedItem($attributeName);
  293. return $item ? $item->nodeValue : $default;
  294. }
  295. }