PublisherPool.php 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\Framework\MessageQueue;
  7. use Magento\Framework\MessageQueue\ConfigInterface as QueueConfig;
  8. use Magento\Framework\Communication\ConfigInterface as CommunicationConfig;
  9. use Magento\Framework\MessageQueue\Publisher\ConfigInterface as PublisherConfig;
  10. /**
  11. * Publishers pool.
  12. *
  13. * @api
  14. * @since 102.0.1
  15. */
  16. class PublisherPool implements PublisherInterface, BulkPublisherInterface
  17. {
  18. const MODE_SYNC = 'sync';
  19. const MODE_ASYNC = 'async';
  20. /**
  21. * @deprecated
  22. */
  23. const TYPE = 'type';
  24. /**
  25. * @deprecated
  26. */
  27. const CONNECTION_NAME = 'connectionName';
  28. /**
  29. * Publisher objects pool.
  30. *
  31. * @var \Magento\Framework\MessageQueue\PublisherInterface[]
  32. * @since 102.0.1
  33. */
  34. protected $publishers = [];
  35. /**
  36. * Communication config.
  37. *
  38. * @var CommunicationConfig
  39. * @since 102.0.1
  40. */
  41. protected $communicationConfig;
  42. /**
  43. * @var PublisherConfig
  44. */
  45. private $publisherConfig;
  46. /**
  47. * @var ConnectionTypeResolver
  48. */
  49. private $connectionTypeResolver;
  50. /**
  51. * Initialize dependencies.
  52. *
  53. * @param CommunicationConfig $communicationConfig
  54. * @param QueueConfig $queueConfig
  55. * @param string[] $publishers
  56. *
  57. * @SuppressWarnings(PHPMD.UnusedFormalParameter)
  58. */
  59. public function __construct(
  60. CommunicationConfig $communicationConfig,
  61. QueueConfig $queueConfig,
  62. array $publishers
  63. ) {
  64. $this->communicationConfig = $communicationConfig;
  65. $this->initializePublishers($publishers);
  66. }
  67. /**
  68. * {@inheritdoc}
  69. * @since 102.0.1
  70. */
  71. public function publish($topicName, $data)
  72. {
  73. $publisherType = $this->communicationConfig->getTopic($topicName)[CommunicationConfig::TOPIC_IS_SYNCHRONOUS]
  74. ? self::MODE_SYNC
  75. : self::MODE_ASYNC;
  76. $connectionName = $this->getPublisherConfig()->getPublisher($topicName)->getConnection()->getName();
  77. $publisher = $this->getPublisherForConnectionNameAndType($publisherType, $connectionName);
  78. return $publisher->publish($topicName, $data);
  79. }
  80. /**
  81. * Initialize publisher objects pool.
  82. *
  83. * @param array $publishers
  84. * @return void
  85. */
  86. private function initializePublishers(array $publishers)
  87. {
  88. $asyncPublishers = isset($publishers[self::MODE_ASYNC]) ? $publishers[self::MODE_ASYNC] : [];
  89. $syncPublishers = isset($publishers[self::MODE_SYNC]) ? $publishers[self::MODE_SYNC] : [];
  90. foreach ($asyncPublishers as $connectionType => $publisher) {
  91. $this->addPublisherToPool(
  92. self::MODE_ASYNC,
  93. $connectionType,
  94. $publisher
  95. );
  96. }
  97. foreach ($syncPublishers as $connectionType => $publisher) {
  98. $this->addPublisherToPool(
  99. self::MODE_SYNC,
  100. $connectionType,
  101. $publisher
  102. );
  103. }
  104. }
  105. /**
  106. * Add publisher.
  107. *
  108. * @param string $type
  109. * @param string $connectionType
  110. * @param PublisherInterface $publisher
  111. * @return $this
  112. */
  113. private function addPublisherToPool($type, $connectionType, PublisherInterface $publisher)
  114. {
  115. $this->publishers[$type][$connectionType] = $publisher;
  116. return $this;
  117. }
  118. /**
  119. * Return an instance of a publisher for a connection name.
  120. *
  121. * @param string $type
  122. * @param string $connectionName
  123. * @return PublisherInterface
  124. * @throws \LogicException
  125. * @throws \InvalidArgumentException
  126. */
  127. private function getPublisherForConnectionNameAndType($type, $connectionName)
  128. {
  129. $connectionType = $this->getConnectionTypeResolver()->getConnectionType($connectionName);
  130. if (!isset($this->publishers[$type])) {
  131. throw new \InvalidArgumentException('Unknown publisher type ' . $type);
  132. }
  133. if (!isset($this->publishers[$type][$connectionType])) {
  134. throw new \LogicException(
  135. sprintf(
  136. 'Could not find an implementation type for type "%s" and connection "%s".',
  137. $type,
  138. $connectionName
  139. )
  140. );
  141. }
  142. return $this->publishers[$type][$connectionType];
  143. }
  144. /**
  145. * Get publisher config.
  146. *
  147. * @return PublisherConfig
  148. *
  149. * @deprecated 102.0.1
  150. */
  151. private function getPublisherConfig()
  152. {
  153. if ($this->publisherConfig === null) {
  154. $this->publisherConfig = \Magento\Framework\App\ObjectManager::getInstance()->get(PublisherConfig::class);
  155. }
  156. return $this->publisherConfig;
  157. }
  158. /**
  159. * Get connection type resolver.
  160. *
  161. * @return ConnectionTypeResolver
  162. *
  163. * @deprecated 102.0.1
  164. */
  165. private function getConnectionTypeResolver()
  166. {
  167. if ($this->connectionTypeResolver === null) {
  168. $this->connectionTypeResolver = \Magento\Framework\App\ObjectManager::getInstance()
  169. ->get(ConnectionTypeResolver::class);
  170. }
  171. return $this->connectionTypeResolver;
  172. }
  173. }