Consumer.php 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\Framework\MessageQueue;
  7. use Magento\Framework\App\ResourceConnection;
  8. use Magento\Framework\Exception\LocalizedException;
  9. use Magento\Framework\Phrase;
  10. use Magento\Framework\MessageQueue\Consumer\ConfigInterface as ConsumerConfig;
  11. use Magento\Framework\Communication\ConfigInterface as CommunicationConfig;
  12. use Magento\Framework\MessageQueue\QueueRepository;
  13. use Psr\Log\LoggerInterface;
  14. /**
  15. * Class Consumer used to process a single message, unlike batch consumer.
  16. *
  17. * This could be used for both synchronous and asynchronous processing, depending on topic.
  18. *
  19. * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
  20. */
  21. class Consumer implements ConsumerInterface
  22. {
  23. /**
  24. * @var ConsumerConfigurationInterface
  25. */
  26. private $configuration;
  27. /**
  28. * @var ResourceConnection
  29. */
  30. private $resource;
  31. /**
  32. * @var MessageEncoder
  33. */
  34. private $messageEncoder;
  35. /**
  36. * @var CallbackInvoker
  37. */
  38. private $invoker;
  39. /**
  40. * @var MessageController
  41. */
  42. private $messageController;
  43. /**
  44. * @var QueueRepository
  45. */
  46. private $queueRepository;
  47. /**
  48. * @var EnvelopeFactory
  49. */
  50. private $envelopeFactory;
  51. /**
  52. * @var MessageValidator
  53. */
  54. private $messageValidator;
  55. /**
  56. * @var ConsumerConfig
  57. */
  58. private $consumerConfig;
  59. /**
  60. * @var CommunicationConfig
  61. */
  62. private $communicationConfig;
  63. /**
  64. * @var LoggerInterface
  65. */
  66. private $logger;
  67. /**
  68. * Initialize dependencies.
  69. *
  70. * @param CallbackInvoker $invoker
  71. * @param MessageEncoder $messageEncoder
  72. * @param ResourceConnection $resource
  73. * @param ConsumerConfigurationInterface $configuration
  74. * @param LoggerInterface $logger
  75. *
  76. * @SuppressWarnings(PHPMD.UnusedFormalParameter)
  77. */
  78. public function __construct(
  79. CallbackInvoker $invoker,
  80. MessageEncoder $messageEncoder,
  81. ResourceConnection $resource,
  82. ConsumerConfigurationInterface $configuration,
  83. LoggerInterface $logger = null
  84. ) {
  85. $this->invoker = $invoker;
  86. $this->messageEncoder = $messageEncoder;
  87. $this->resource = $resource;
  88. $this->configuration = $configuration;
  89. $this->logger = $logger ?: \Magento\Framework\App\ObjectManager::getInstance()->get(LoggerInterface::class);
  90. }
  91. /**
  92. * {@inheritdoc}
  93. */
  94. public function process($maxNumberOfMessages = null)
  95. {
  96. $queue = $this->configuration->getQueue();
  97. if (!isset($maxNumberOfMessages)) {
  98. $queue->subscribe($this->getTransactionCallback($queue));
  99. } else {
  100. $this->invoker->invoke($queue, $maxNumberOfMessages, $this->getTransactionCallback($queue));
  101. }
  102. }
  103. /**
  104. * Decode message and invoke callback method, return reply back for sync processing.
  105. *
  106. * @param EnvelopeInterface $message
  107. * @param boolean $isSync
  108. * @return string|null
  109. * @throws LocalizedException
  110. */
  111. private function dispatchMessage(EnvelopeInterface $message, $isSync = false)
  112. {
  113. $properties = $message->getProperties();
  114. $topicName = $properties['topic_name'];
  115. $handlers = $this->configuration->getHandlers($topicName);
  116. $decodedMessage = $this->messageEncoder->decode($topicName, $message->getBody());
  117. if (isset($decodedMessage)) {
  118. $messageSchemaType = $this->configuration->getMessageSchemaType($topicName);
  119. if ($messageSchemaType == CommunicationConfig::TOPIC_REQUEST_TYPE_METHOD) {
  120. foreach ($handlers as $callback) {
  121. $result = call_user_func_array($callback, $decodedMessage);
  122. return $this->processSyncResponse($topicName, $result);
  123. }
  124. } else {
  125. foreach ($handlers as $callback) {
  126. $result = call_user_func($callback, $decodedMessage);
  127. if ($isSync) {
  128. return $this->processSyncResponse($topicName, $result);
  129. }
  130. }
  131. }
  132. }
  133. return null;
  134. }
  135. /**
  136. * Validate and encode synchronous handler output.
  137. *
  138. * @param string $topicName
  139. * @param mixed $result
  140. * @return string
  141. * @throws LocalizedException
  142. */
  143. private function processSyncResponse($topicName, $result)
  144. {
  145. if (isset($result)) {
  146. $this->getMessageValidator()->validate($topicName, $result, false);
  147. return $this->messageEncoder->encode($topicName, $result, false);
  148. } else {
  149. throw new LocalizedException(new Phrase('No reply message resulted in RPC.'));
  150. }
  151. }
  152. /**
  153. * Send RPC response message.
  154. *
  155. * @param EnvelopeInterface $envelope
  156. * @return void
  157. */
  158. private function sendResponse(EnvelopeInterface $envelope)
  159. {
  160. $messageProperties = $envelope->getProperties();
  161. $connectionName = $this->getConsumerConfig()
  162. ->getConsumer($this->configuration->getConsumerName())->getConnection();
  163. $queue = $this->getQueueRepository()->get($connectionName, $messageProperties['reply_to']);
  164. $queue->push($envelope);
  165. }
  166. /**
  167. * Get transaction callback. This handles the case of both sync and async.
  168. *
  169. * @param QueueInterface $queue
  170. * @return \Closure
  171. */
  172. private function getTransactionCallback(QueueInterface $queue)
  173. {
  174. return function (EnvelopeInterface $message) use ($queue) {
  175. /** @var LockInterface $lock */
  176. $lock = null;
  177. try {
  178. $topicName = $message->getProperties()['topic_name'];
  179. $topicConfig = $this->getCommunicationConfig()->getTopic($topicName);
  180. $lock = $this->getMessageController()->lock($message, $this->configuration->getConsumerName());
  181. if ($topicConfig[CommunicationConfig::TOPIC_IS_SYNCHRONOUS]) {
  182. $responseBody = $this->dispatchMessage($message, true);
  183. $responseMessage = $this->getEnvelopeFactory()->create(
  184. ['body' => $responseBody, 'properties' => $message->getProperties()]
  185. );
  186. $this->sendResponse($responseMessage);
  187. } else {
  188. $allowedTopics = $this->configuration->getTopicNames();
  189. if (in_array($topicName, $allowedTopics)) {
  190. $this->dispatchMessage($message);
  191. } else {
  192. $queue->reject($message);
  193. return;
  194. }
  195. }
  196. $queue->acknowledge($message);
  197. } catch (MessageLockException $exception) {
  198. $queue->acknowledge($message);
  199. } catch (\Magento\Framework\MessageQueue\ConnectionLostException $e) {
  200. if ($lock) {
  201. $this->resource->getConnection()
  202. ->delete($this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()]);
  203. }
  204. } catch (\Magento\Framework\Exception\NotFoundException $e) {
  205. $queue->acknowledge($message);
  206. $this->logger->warning($e->getMessage());
  207. } catch (\Exception $e) {
  208. $queue->reject($message, false, $e->getMessage());
  209. if ($lock) {
  210. $this->resource->getConnection()
  211. ->delete($this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()]);
  212. }
  213. }
  214. };
  215. }
  216. /**
  217. * Get consumer config.
  218. *
  219. * @return ConsumerConfig
  220. *
  221. * @deprecated 102.0.1
  222. */
  223. private function getConsumerConfig()
  224. {
  225. if ($this->consumerConfig === null) {
  226. $this->consumerConfig = \Magento\Framework\App\ObjectManager::getInstance()->get(ConsumerConfig::class);
  227. }
  228. return $this->consumerConfig;
  229. }
  230. /**
  231. * Get communication config.
  232. *
  233. * @return CommunicationConfig
  234. *
  235. * @deprecated 102.0.1
  236. */
  237. private function getCommunicationConfig()
  238. {
  239. if ($this->communicationConfig === null) {
  240. $this->communicationConfig = \Magento\Framework\App\ObjectManager::getInstance()
  241. ->get(CommunicationConfig::class);
  242. }
  243. return $this->communicationConfig;
  244. }
  245. /**
  246. * Get queue repository.
  247. *
  248. * @return QueueRepository
  249. *
  250. * @deprecated 102.0.1
  251. */
  252. private function getQueueRepository()
  253. {
  254. if ($this->queueRepository === null) {
  255. $this->queueRepository = \Magento\Framework\App\ObjectManager::getInstance()->get(QueueRepository::class);
  256. }
  257. return $this->queueRepository;
  258. }
  259. /**
  260. * Get message controller.
  261. *
  262. * @return MessageController
  263. *
  264. * @deprecated 102.0.1
  265. */
  266. private function getMessageController()
  267. {
  268. if ($this->messageController === null) {
  269. $this->messageController = \Magento\Framework\App\ObjectManager::getInstance()
  270. ->get(MessageController::class);
  271. }
  272. return $this->messageController;
  273. }
  274. /**
  275. * Get message validator.
  276. *
  277. * @return MessageValidator
  278. *
  279. * @deprecated 102.0.1
  280. */
  281. private function getMessageValidator()
  282. {
  283. if ($this->messageValidator === null) {
  284. $this->messageValidator = \Magento\Framework\App\ObjectManager::getInstance()
  285. ->get(MessageValidator::class);
  286. }
  287. return $this->messageValidator;
  288. }
  289. /**
  290. * Get envelope factory.
  291. *
  292. * @return EnvelopeFactory
  293. *
  294. * @deprecated 102.0.1
  295. */
  296. private function getEnvelopeFactory()
  297. {
  298. if ($this->envelopeFactory === null) {
  299. $this->envelopeFactory = \Magento\Framework\App\ObjectManager::getInstance()
  300. ->get(EnvelopeFactory::class);
  301. }
  302. return $this->envelopeFactory;
  303. }
  304. }