| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 | 
							- <?php
 
- /**
 
-  * Copyright © Magento, Inc. All rights reserved.
 
-  * See COPYING.txt for license details.
 
-  */
 
- namespace Magento\Framework\MessageQueue;
 
- use Magento\Framework\MessageQueue\ConfigInterface as QueueConfig;
 
- use Magento\Framework\Communication\ConfigInterface as CommunicationConfig;
 
- use Magento\Framework\MessageQueue\Publisher\ConfigInterface as PublisherConfig;
 
- /**
 
-  * Publishers pool.
 
-  *
 
-  * @api
 
-  * @since 102.0.1
 
-  */
 
- class PublisherPool implements PublisherInterface, BulkPublisherInterface
 
- {
 
-     const MODE_SYNC = 'sync';
 
-     const MODE_ASYNC = 'async';
 
-     /**
 
-      * @deprecated
 
-      */
 
-     const TYPE = 'type';
 
-     /**
 
-      * @deprecated
 
-      */
 
-     const CONNECTION_NAME = 'connectionName';
 
-     /**
 
-      * Publisher objects pool.
 
-      *
 
-      * @var \Magento\Framework\MessageQueue\PublisherInterface[]
 
-      * @since 102.0.1
 
-      */
 
-     protected $publishers = [];
 
-     /**
 
-      * Communication config.
 
-      *
 
-      * @var CommunicationConfig
 
-      * @since 102.0.1
 
-      */
 
-     protected $communicationConfig;
 
-     /**
 
-      * @var PublisherConfig
 
-      */
 
-     private $publisherConfig;
 
-     /**
 
-      * @var ConnectionTypeResolver
 
-      */
 
-     private $connectionTypeResolver;
 
-     /**
 
-      * Initialize dependencies.
 
-      *
 
-      * @param CommunicationConfig $communicationConfig
 
-      * @param QueueConfig $queueConfig
 
-      * @param string[] $publishers
 
-      *
 
-      * @SuppressWarnings(PHPMD.UnusedFormalParameter)
 
-      */
 
-     public function __construct(
 
-         CommunicationConfig $communicationConfig,
 
-         QueueConfig $queueConfig,
 
-         array $publishers
 
-     ) {
 
-         $this->communicationConfig = $communicationConfig;
 
-         $this->initializePublishers($publishers);
 
-     }
 
-     /**
 
-      * {@inheritdoc}
 
-      * @since 102.0.1
 
-      */
 
-     public function publish($topicName, $data)
 
-     {
 
-         $publisherType = $this->communicationConfig->getTopic($topicName)[CommunicationConfig::TOPIC_IS_SYNCHRONOUS]
 
-             ? self::MODE_SYNC
 
-             : self::MODE_ASYNC;
 
-         $connectionName = $this->getPublisherConfig()->getPublisher($topicName)->getConnection()->getName();
 
-         $publisher = $this->getPublisherForConnectionNameAndType($publisherType, $connectionName);
 
-         return $publisher->publish($topicName, $data);
 
-     }
 
-     /**
 
-      * Initialize publisher objects pool.
 
-      *
 
-      * @param array $publishers
 
-      * @return void
 
-      */
 
-     private function initializePublishers(array $publishers)
 
-     {
 
-         $asyncPublishers = isset($publishers[self::MODE_ASYNC]) ? $publishers[self::MODE_ASYNC] : [];
 
-         $syncPublishers = isset($publishers[self::MODE_SYNC]) ? $publishers[self::MODE_SYNC] : [];
 
-         foreach ($asyncPublishers as $connectionType => $publisher) {
 
-             $this->addPublisherToPool(
 
-                 self::MODE_ASYNC,
 
-                 $connectionType,
 
-                 $publisher
 
-             );
 
-         }
 
-         foreach ($syncPublishers as $connectionType => $publisher) {
 
-             $this->addPublisherToPool(
 
-                 self::MODE_SYNC,
 
-                 $connectionType,
 
-                 $publisher
 
-             );
 
-         }
 
-     }
 
-     /**
 
-      * Add publisher.
 
-      *
 
-      * @param string $type
 
-      * @param string $connectionType
 
-      * @param PublisherInterface $publisher
 
-      * @return $this
 
-      */
 
-     private function addPublisherToPool($type, $connectionType, PublisherInterface $publisher)
 
-     {
 
-         $this->publishers[$type][$connectionType] = $publisher;
 
-         return $this;
 
-     }
 
-     /**
 
-      * Return an instance of a publisher for a connection name.
 
-      *
 
-      * @param string $type
 
-      * @param string $connectionName
 
-      * @return PublisherInterface
 
-      * @throws \LogicException
 
-      * @throws \InvalidArgumentException
 
-      */
 
-     private function getPublisherForConnectionNameAndType($type, $connectionName)
 
-     {
 
-         $connectionType = $this->getConnectionTypeResolver()->getConnectionType($connectionName);
 
-         if (!isset($this->publishers[$type])) {
 
-             throw new \InvalidArgumentException('Unknown publisher type ' . $type);
 
-         }
 
-         if (!isset($this->publishers[$type][$connectionType])) {
 
-             throw new \LogicException(
 
-                 sprintf(
 
-                     'Could not find an implementation type for type "%s" and connection "%s".',
 
-                     $type,
 
-                     $connectionName
 
-                 )
 
-             );
 
-         }
 
-         return $this->publishers[$type][$connectionType];
 
-     }
 
-     /**
 
-      * Get publisher config.
 
-      *
 
-      * @return PublisherConfig
 
-      *
 
-      * @deprecated 102.0.1
 
-      */
 
-     private function getPublisherConfig()
 
-     {
 
-         if ($this->publisherConfig === null) {
 
-             $this->publisherConfig = \Magento\Framework\App\ObjectManager::getInstance()->get(PublisherConfig::class);
 
-         }
 
-         return $this->publisherConfig;
 
-     }
 
-     /**
 
-      * Get connection type resolver.
 
-      *
 
-      * @return ConnectionTypeResolver
 
-      *
 
-      * @deprecated 102.0.1
 
-      */
 
-     private function getConnectionTypeResolver()
 
-     {
 
-         if ($this->connectionTypeResolver === null) {
 
-             $this->connectionTypeResolver = \Magento\Framework\App\ObjectManager::getInstance()
 
-                 ->get(ConnectionTypeResolver::class);
 
-         }
 
-         return $this->connectionTypeResolver;
 
-     }
 
- }
 
 
  |