123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313 |
- <?php
- /**
- * Copyright © Magento, Inc. All rights reserved.
- * See COPYING.txt for license details.
- */
- namespace Magento\Framework\MessageQueue\Config\Reader\Xml\Converter;
- use Magento\Framework\MessageQueue\ConfigInterface;
- use Magento\Framework\MessageQueue\Config\Validator;
- use Magento\Framework\Reflection\MethodsMap;
- use Magento\Framework\Communication\ConfigInterface as CommunicationConfig;
- use Magento\Framework\MessageQueue\ConfigInterface as QueueConfig;
- use Magento\Framework\MessageQueue\ConsumerInterface;
- /**
- * Converts MessageQueue config from \DOMDocument to array
- *
- * @deprecated 102.0.1
- */
- class TopicConfig implements \Magento\Framework\Config\ConverterInterface
- {
- const DEFAULT_TYPE = 'amqp';
- const DEFAULT_EXCHANGE = 'magento';
- const DEFAULT_INSTANCE = ConsumerInterface::class;
- /**
- * @var Validator
- */
- private $xmlValidator;
- /**
- * @var MethodsMap
- */
- private $methodsMap;
- /**
- * @var CommunicationConfig
- */
- private $communicationConfig;
- /**
- * Initialize dependencies.
- *
- * @param MethodsMap $methodsMap
- * @param Validator $xmlValidator
- * @param CommunicationConfig $communicationConfig
- */
- public function __construct(
- MethodsMap $methodsMap,
- Validator $xmlValidator,
- CommunicationConfig $communicationConfig
- ) {
- $this->methodsMap = $methodsMap;
- $this->xmlValidator = $xmlValidator;
- $this->communicationConfig = $communicationConfig;
- }
- /**
- * {@inheritDoc}
- */
- public function convert($source)
- {
- $topics = $this->extractTopics($source);
- $topics = $this->processWildcard($topics);
- $publishers = $this->buildPublishers($topics);
- $binds = $this->buildBinds($topics);
- $map = $this->buildExchangeTopicToQueue($topics);
- $consumers = $this->buildConsumers($topics);
- return [
- ConfigInterface::TOPICS => $this->buildTopicsConfiguration($topics),
- ConfigInterface::PUBLISHERS => $publishers,
- ConfigInterface::BINDS => $binds,
- ConfigInterface::CONSUMERS => $consumers,
- ConfigInterface::EXCHANGE_TOPIC_TO_QUEUES_MAP => $map,
- ];
- }
- /**
- * Generate list of topics.
- *
- * @param array $topics
- * @return array
- */
- private function buildTopicsConfiguration($topics)
- {
- $output = [];
- foreach ($topics as $topicName => $topicConfig) {
- $topicDefinition = $this->communicationConfig->getTopic($topicName);
- $schemaType =
- $topicDefinition['request_type'] == CommunicationConfig::TOPIC_REQUEST_TYPE_CLASS
- ? QueueConfig::TOPIC_SCHEMA_TYPE_OBJECT
- : QueueConfig::TOPIC_SCHEMA_TYPE_METHOD;
- $schemaValue = $topicDefinition[CommunicationConfig::TOPIC_REQUEST];
- $output[$topicName] = [
- 'name' => $topicName,
- 'schema' => [
- 'schema_type' => $schemaType,
- 'schema_value' => $schemaValue
- ],
- 'response_schema' => [
- 'schema_type' => isset($topicDefinition['response']) ? QueueConfig::TOPIC_SCHEMA_TYPE_OBJECT : null,
- 'schema_value' => $topicDefinition['response']
- ],
- 'is_synchronous' => $topicDefinition[CommunicationConfig::TOPIC_IS_SYNCHRONOUS],
- 'publisher' => $topicConfig['type'] . '-' . $topicConfig['exchange']
- ];
- }
- return $output;
- }
- /**
- * Generate consumers.
- *
- * @param array $topics
- * @return array
- */
- private function buildConsumers($topics)
- {
- $output = [];
- foreach ($topics as $topicName => $topicConfig) {
- $topic = $this->communicationConfig->getTopic($topicName);
- foreach ($topicConfig['queues'] as $queueName => $queueConfig) {
- $handlers = [];
- foreach ($queueConfig['handlers'] as $handler) {
- if (!isset($handler[QueueConfig::CONSUMER_CLASS])) {
- $handlerExploded = explode('::', $handler);
- unset($handler);
- $handler[QueueConfig::CONSUMER_CLASS] = $handlerExploded[0];
- $handler[QueueConfig::CONSUMER_METHOD] = $handlerExploded[1];
- }
- $handlers[] = $handler;
- }
- $queueConfig['handlers'] = $handlers;
- $output[$queueConfig['consumer']] = [
- 'name' => $queueConfig['consumer'],
- 'queue' => $queueName,
- 'handlers' => [$topicName => $queueConfig['handlers']],
- 'instance_type' => $queueConfig['consumerInstance'] != null
- ? $queueConfig['consumerInstance'] : self::DEFAULT_INSTANCE,
- 'consumer_type' => $topic[CommunicationConfig::TOPIC_IS_SYNCHRONOUS] ? 'sync' : 'async',
- 'max_messages' => $queueConfig['maxMessages'],
- 'connection' => $topicConfig['type']
- ];
- }
- }
- return $output;
- }
- /**
- * Generate topics list based on wildcards.
- *
- * @param array $topics
- * @return array
- */
- private function processWildcard($topics)
- {
- $topicDefinitions = $this->communicationConfig->getTopics();
- $wildcardKeys = [];
- $topicNames = array_keys($topics);
- foreach ($topicNames as $topicName) {
- if (strpos($topicName, '*') !== false || strpos($topicName, '#') !== false) {
- $wildcardKeys[] = $topicName;
- }
- }
- foreach (array_unique($wildcardKeys) as $wildcardKey) {
- $pattern = $this->xmlValidator->buildWildcardPattern($wildcardKey);
- foreach (array_keys($topicDefinitions) as $topicName) {
- if (preg_match($pattern, $topicName)) {
- if (isset($topics[$topicName])) {
- $topics[$topicName] = array_merge($topics[$topicName], $topics[$wildcardKey]);
- } else {
- $topics[$topicName] = $topics[$wildcardKey];
- }
- }
- }
- unset($topics[$wildcardKey]);
- }
- return $topics;
- }
- /**
- * Generate publishers
- *
- * @param array $topics
- * @return array
- */
- private function buildPublishers($topics)
- {
- $output = [];
- foreach ($topics as $topicConfig) {
- $publisherName = $topicConfig['type'] . '-' . $topicConfig['exchange'];
- $output[$publisherName] = [
- 'name' => $publisherName,
- 'connection' => $topicConfig['type'],
- 'exchange' => $topicConfig['exchange']
- ];
- }
- return $output;
- }
- /**
- * Generate binds
- *
- * @param array $topics
- * @return array
- */
- private function buildBinds($topics)
- {
- $output = [];
- foreach ($topics as $topicName => $topicConfig) {
- $queueNames = array_keys($topicConfig['queues']);
- foreach ($queueNames as $queueName) {
- $name = $topicName . '--' . $topicConfig['exchange']. '--' .$queueName;
- $output[$name] = [
- 'queue' => $queueName,
- 'exchange' => $topicConfig['exchange'],
- 'topic' => $topicName,
- ];
- }
- }
- return $output;
- }
- /**
- * Generate topic-to-queues map.
- *
- * @param array $topics
- * @return array
- */
- private function buildExchangeTopicToQueue($topics)
- {
- $output = [];
- foreach ($topics as $topicName => $topicConfig) {
- $key = $topicConfig['type'] . '-' . $topicConfig['exchange'] . '--' . $topicName;
- $queueNames = array_keys($topicConfig['queues']);
- foreach ($queueNames as $queueName) {
- $output[$key][] = $queueName;
- $output[$key] = array_unique($output[$key]);
- }
- }
- return $output;
- }
- /**
- * Extract topics configuration.
- *
- * @param \DOMDocument $config
- * @return array
- */
- private function extractTopics($config)
- {
- $output = [];
- /** @var $brokerNode \DOMElement */
- foreach ($config->getElementsByTagName('broker') as $brokerNode) {
- $topicName = $this->getAttributeValue($brokerNode, 'topic');
- $output[$topicName] = [
- ConfigInterface::TOPIC_NAME => $topicName,
- 'type' => $this->getAttributeValue($brokerNode, 'type', self::DEFAULT_TYPE),
- 'exchange' => $this->getAttributeValue($brokerNode, 'exchange', self::DEFAULT_EXCHANGE),
- 'consumerInstance' => $this->getAttributeValue($brokerNode, 'consumerInstance'),
- 'maxMessages' => $this->getAttributeValue($brokerNode, 'maxMessages'),
- 'queues' => $this->extractQueuesFromBroker($brokerNode, $topicName)
- ];
- }
- return $output;
- }
- /**
- * Extract queues configuration from the topic node.
- *
- * @param \DOMElement $brokerNode
- * @param string $topicName
- * @return array
- */
- protected function extractQueuesFromBroker(\DOMElement $brokerNode, $topicName)
- {
- $queues = [];
- $topicConfig = $this->communicationConfig->getTopic($topicName);
- /** @var $queueNode \DOMElement */
- foreach ($brokerNode->getElementsByTagName('queue') as $queueNode) {
- $handler = $this->getAttributeValue($queueNode, 'handler');
- $queueName = $this->getAttributeValue($queueNode, 'name');
- $queue = [
- 'name'=> $queueName,
- 'handlerName' => $this->getAttributeValue($queueNode, 'handlerName'),
- 'handlers' => $handler ? ['default' => $handler] : $topicConfig['handlers'],
- 'exchange' => $this->getAttributeValue($queueNode, 'exchange'),
- 'consumer' => $this->getAttributeValue($queueNode, 'consumer'),
- 'consumerInstance' => $this->getAttributeValue($queueNode, 'consumerInstance'),
- 'maxMessages' => $this->getAttributeValue($queueNode, 'maxMessages', null),
- 'type' => $this->getAttributeValue($queueNode, 'type')
- ];
- $queues[$queueName] = $queue;
- }
- return $queues;
- }
- /**
- * Get attribute value of the given node
- *
- * @param \DOMNode $node
- * @param string $attributeName
- * @param mixed $default
- * @return string|null
- */
- protected function getAttributeValue(\DOMNode $node, $attributeName, $default = null)
- {
- $item = $node->attributes->getNamedItem($attributeName);
- return $item ? $item->nodeValue : $default;
- }
- }
|