123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- <?php
- /**
- * Copyright © Magento, Inc. All rights reserved.
- * See COPYING.txt for license details.
- */
- namespace Magento\Framework\MessageQueue;
- use Magento\Framework\MessageQueue\ConfigInterface as QueueConfig;
- use Magento\Framework\Exception\LocalizedException;
- use Magento\Framework\ObjectManagerInterface;
- use Magento\Framework\Phrase;
- use Magento\Framework\MessageQueue\ConsumerInterface;
- use Magento\Framework\MessageQueue\Consumer\ConfigInterface as ConsumerConfig;
- use Magento\Framework\MessageQueue\Consumer\Config\ConsumerConfigItemInterface;
- use Magento\Framework\Communication\ConfigInterface as CommunicationConfig;
- /**
- * Class which creates Consumers
- * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
- */
- class ConsumerFactory
- {
- /**
- * Object Manager instance
- *
- * @var ObjectManagerInterface
- */
- private $objectManager = null;
- /**
- * @var ConsumerConfig
- */
- private $consumerConfig;
- /**
- * @var CommunicationConfig
- */
- private $communicationConfig;
- /**
- * Initialize dependencies.
- *
- * @param QueueConfig $queueConfig
- * @param ObjectManagerInterface $objectManager
- *
- * @SuppressWarnings(PHPMD.UnusedFormalParameter)
- */
- public function __construct(
- QueueConfig $queueConfig,
- ObjectManagerInterface $objectManager
- ) {
- $this->objectManager = $objectManager;
- }
- /**
- * Return the actual Consumer implementation for the given consumer name.
- *
- * @param string $consumerName
- * @param int $batchSize [optional]
- * @return ConsumerInterface
- * @throws LocalizedException
- */
- public function get($consumerName, $batchSize = 0)
- {
- $consumerConfig = $this->getConsumerConfig()->getConsumer($consumerName);
- if ($consumerConfig === null) {
- throw new LocalizedException(
- new Phrase('Specified consumer "%consumer" is not declared.', ['consumer' => $consumerName])
- );
- }
- return $this->objectManager->create(
- $consumerConfig->getConsumerInstance(),
- [
- 'configuration' => $this->createConsumerConfiguration($consumerConfig),
- 'batchSize' => $batchSize,
- ]
- );
- }
- /**
- * Creates the objects necessary for the ConsumerConfigurationInterface to configure a Consumer.
- *
- * @param ConsumerConfigItemInterface $consumerConfigItem
- * @return ConsumerConfigurationInterface
- */
- private function createConsumerConfiguration($consumerConfigItem)
- {
- $customConsumerHandlers = [];
- foreach ($consumerConfigItem->getHandlers() as $handlerConfig) {
- $customConsumerHandlers[] = [
- $this->objectManager->create($handlerConfig->getType()),
- $handlerConfig->getMethod()
- ];
- }
- $topics = [];
- foreach ($this->getCommunicationConfig()->getTopics() as $topicConfig) {
- $topicName = $topicConfig[CommunicationConfig::TOPIC_NAME];
- $topics[$topicName] = [
- ConsumerConfigurationInterface::TOPIC_HANDLERS => $customConsumerHandlers
- ?: $this->getHandlersFromCommunicationConfig($topicName),
- ConsumerConfigurationInterface::TOPIC_TYPE => $topicConfig[CommunicationConfig::TOPIC_IS_SYNCHRONOUS]
- ? ConsumerConfiguration::TYPE_SYNC
- : ConsumerConfiguration::TYPE_ASYNC
- ];
- }
- $configData = [
- ConsumerConfigurationInterface::CONSUMER_NAME => $consumerConfigItem->getName(),
- ConsumerConfigurationInterface::QUEUE_NAME => $consumerConfigItem->getQueue(),
- ConsumerConfigurationInterface::TOPICS => $topics,
- ConsumerConfigurationInterface::MAX_MESSAGES => $consumerConfigItem->getMaxMessages(),
- ];
- return $this->objectManager->create(
- \Magento\Framework\MessageQueue\ConsumerConfiguration::class,
- ['data' => $configData]
- );
- }
- /**
- * Get consumer config.
- *
- * @return ConsumerConfig
- *
- * @deprecated 102.0.1
- */
- private function getConsumerConfig()
- {
- if ($this->consumerConfig === null) {
- $this->consumerConfig = \Magento\Framework\App\ObjectManager::getInstance()->get(ConsumerConfig::class);
- }
- return $this->consumerConfig;
- }
- /**
- * Get communication config.
- *
- * @return CommunicationConfig
- *
- * @deprecated 102.0.1
- */
- private function getCommunicationConfig()
- {
- if ($this->communicationConfig === null) {
- $this->communicationConfig = \Magento\Framework\App\ObjectManager::getInstance()
- ->get(CommunicationConfig::class);
- }
- return $this->communicationConfig;
- }
- /**
- * Get handlers by topic based on communication config.
- *
- * @param string $topicName
- * @return array
- */
- private function getHandlersFromCommunicationConfig($topicName)
- {
- $topicConfig = $this->getCommunicationConfig()->getTopic($topicName);
- $handlers = [];
- foreach ($topicConfig[CommunicationConfig::TOPIC_HANDLERS] as $handlerConfig) {
- $handlers[] = [
- $this->objectManager->create($handlerConfig[CommunicationConfig::HANDLER_TYPE]),
- $handlerConfig[CommunicationConfig::HANDLER_METHOD]
- ];
- }
- return $handlers;
- }
- }
|