BatchConsumer.php 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\Framework\MessageQueue;
  7. use Magento\Framework\MessageQueue\ConfigInterface as MessageQueueConfig;
  8. use Magento\Framework\App\ResourceConnection;
  9. use Magento\Framework\MessageQueue\Consumer\ConfigInterface as ConsumerConfig;
  10. /**
  11. * Class BatchConsumer
  12. * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
  13. */
  14. class BatchConsumer implements ConsumerInterface
  15. {
  16. /**
  17. * @var ConsumerConfigurationInterface
  18. */
  19. private $configuration;
  20. /**
  21. * @var MessageEncoder
  22. */
  23. private $messageEncoder;
  24. /**
  25. * @var QueueRepository
  26. */
  27. private $queueRepository;
  28. /**
  29. * @var MergerFactory
  30. */
  31. private $mergerFactory;
  32. /**
  33. * @var int
  34. */
  35. private $interval;
  36. /**
  37. * @var int
  38. */
  39. private $batchSize;
  40. /**
  41. * @var MessageProcessorLoader
  42. */
  43. private $messageProcessorLoader;
  44. /**
  45. * @var Resource
  46. */
  47. private $resource;
  48. /**
  49. * @var MessageController
  50. */
  51. private $messageController;
  52. /**
  53. * @var ConsumerConfig
  54. */
  55. private $consumerConfig;
  56. /**
  57. * @param ConfigInterface $messageQueueConfig
  58. * @param MessageEncoder $messageEncoder
  59. * @param QueueRepository $queueRepository
  60. * @param MergerFactory $mergerFactory
  61. * @param ResourceConnection $resource
  62. * @param ConsumerConfigurationInterface $configuration
  63. * @param int $interval [optional]
  64. * @param int $batchSize [optional]
  65. * @param MessageProcessorLoader $messageProcessorLoader [optional]
  66. * @param MessageController $messageController [optional]
  67. * @param ConsumerConfig $consumerConfig [optional]
  68. *
  69. * @SuppressWarnings(PHPMD.UnusedFormalParameter)
  70. */
  71. public function __construct(
  72. MessageQueueConfig $messageQueueConfig,
  73. MessageEncoder $messageEncoder,
  74. QueueRepository $queueRepository,
  75. MergerFactory $mergerFactory,
  76. ResourceConnection $resource,
  77. ConsumerConfigurationInterface $configuration,
  78. $interval = 5,
  79. $batchSize = 0,
  80. MessageProcessorLoader $messageProcessorLoader = null
  81. ) {
  82. $this->messageEncoder = $messageEncoder;
  83. $this->queueRepository = $queueRepository;
  84. $this->mergerFactory = $mergerFactory;
  85. $this->interval = $interval;
  86. $this->batchSize = $batchSize;
  87. $this->resource = $resource;
  88. $this->configuration = $configuration;
  89. $this->messageProcessorLoader = $messageProcessorLoader
  90. ?: \Magento\Framework\App\ObjectManager::getInstance()->get(MessageProcessorLoader::class);
  91. }
  92. /**
  93. * {@inheritdoc}
  94. */
  95. public function process($maxNumberOfMessages = null)
  96. {
  97. $queueName = $this->configuration->getQueueName();
  98. $consumerName = $this->configuration->getConsumerName();
  99. $connectionName = $this->getConsumerConfig()->getConsumer($consumerName)->getConnection();
  100. $queue = $this->queueRepository->get($connectionName, $queueName);
  101. $merger = $this->mergerFactory->create($consumerName);
  102. if (!isset($maxNumberOfMessages)) {
  103. $this->runDaemonMode($queue, $merger);
  104. } else {
  105. $this->run($queue, $merger, $maxNumberOfMessages);
  106. }
  107. }
  108. /**
  109. * Run process in a daemon mode.
  110. *
  111. * @param QueueInterface $queue
  112. * @param MergerInterface $merger
  113. * @return void
  114. */
  115. private function runDaemonMode(QueueInterface $queue, MergerInterface $merger)
  116. {
  117. $transactionCallback = $this->getTransactionCallback($queue, $merger);
  118. while (true) {
  119. $messages = $this->batchSize > 0
  120. ? $this->getMessages($queue, $this->batchSize)
  121. : $this->getAllMessages($queue);
  122. $transactionCallback($messages);
  123. sleep($this->interval);
  124. }
  125. }
  126. /**
  127. * Run short running process.
  128. *
  129. * @param QueueInterface $queue
  130. * @param MergerInterface $merger
  131. * @param int $maxNumberOfMessages
  132. * @return void
  133. */
  134. private function run(QueueInterface $queue, MergerInterface $merger, $maxNumberOfMessages)
  135. {
  136. $count = $maxNumberOfMessages
  137. ? $maxNumberOfMessages
  138. : $this->configuration->getMaxMessages() ?: 1;
  139. $transactionCallback = $this->getTransactionCallback($queue, $merger);
  140. if ($this->batchSize) {
  141. while ($count > 0) {
  142. $messages = $this->getMessages($queue, $count > $this->batchSize ? $this->batchSize : $count);
  143. $transactionCallback($messages);
  144. $count -= $this->batchSize;
  145. }
  146. } else {
  147. $messages = $this->getMessages($queue, $count);
  148. $transactionCallback($messages);
  149. }
  150. }
  151. /**
  152. * Get all messages from a queue.
  153. *
  154. * @param QueueInterface $queue
  155. * @return EnvelopeInterface[]
  156. */
  157. private function getAllMessages(QueueInterface $queue)
  158. {
  159. $messages = [];
  160. while ($message = $queue->dequeue()) {
  161. $messages[] = $message;
  162. }
  163. return $messages;
  164. }
  165. /**
  166. * Get $count messages from a queue.
  167. *
  168. * @param QueueInterface $queue
  169. * @param int $count
  170. * @return EnvelopeInterface[]
  171. */
  172. private function getMessages(QueueInterface $queue, $count)
  173. {
  174. $messages = [];
  175. for ($i = $count; $i > 0; $i--) {
  176. $message = $queue->dequeue();
  177. if ($message === null) {
  178. break;
  179. }
  180. $messages[] = $message;
  181. }
  182. return $messages;
  183. }
  184. /**
  185. * Decode provided messages.
  186. *
  187. * @param EnvelopeInterface[] $messages
  188. * @return object[]
  189. */
  190. private function decodeMessages(array $messages)
  191. {
  192. $decodedMessages = [];
  193. foreach ($messages as $messageId => $message) {
  194. $properties = $message->getProperties();
  195. $topicName = $properties['topic_name'];
  196. $decodedMessages[$topicName][$messageId] = $this->messageEncoder->decode($topicName, $message->getBody());
  197. }
  198. return $decodedMessages;
  199. }
  200. /**
  201. * Get transaction callback.
  202. *
  203. * @param QueueInterface $queue
  204. * @param MergerInterface $merger
  205. * @return \Closure
  206. */
  207. private function getTransactionCallback(QueueInterface $queue, MergerInterface $merger)
  208. {
  209. return function (array $messages) use ($queue, $merger) {
  210. list($messages, $messagesToAcknowledge) = $this->lockMessages($messages);
  211. $decodedMessages = $this->decodeMessages($messages);
  212. $mergedMessages = $merger->merge($decodedMessages);
  213. $messageProcessor = $this->messageProcessorLoader->load($mergedMessages);
  214. $messageProcessor->process(
  215. $queue,
  216. $this->configuration,
  217. $messages,
  218. $messagesToAcknowledge,
  219. $mergedMessages
  220. );
  221. };
  222. }
  223. /**
  224. * Create lock for the messages.
  225. *
  226. * @param array $messages
  227. * @return array
  228. */
  229. private function lockMessages(array $messages)
  230. {
  231. $toProcess = [];
  232. $toAcknowledge = [];
  233. foreach ($messages as $message) {
  234. try {
  235. $this->getMessageController()->lock($message, $this->configuration->getConsumerName());
  236. $toProcess[] = $message;
  237. } catch (MessageLockException $exception) {
  238. $toAcknowledge[] = $message;
  239. }
  240. }
  241. return [$toProcess, $toAcknowledge];
  242. }
  243. /**
  244. * Get consumer config.
  245. *
  246. * This getter serves as a workaround to add this dependency to this class without breaking constructor structure
  247. *
  248. * @return ConsumerConfig
  249. *
  250. * @deprecated 102.0.1
  251. */
  252. private function getConsumerConfig()
  253. {
  254. if ($this->consumerConfig === null) {
  255. $this->consumerConfig = \Magento\Framework\App\ObjectManager::getInstance()->get(ConsumerConfig::class);
  256. }
  257. return $this->consumerConfig;
  258. }
  259. /**
  260. * Get message controller.
  261. *
  262. * This getter serves as a workaround to add this dependency to this class without breaking constructor structure
  263. *
  264. * @return MessageController
  265. *
  266. * @deprecated 102.0.1
  267. */
  268. private function getMessageController()
  269. {
  270. if ($this->messageController === null) {
  271. $this->messageController = \Magento\Framework\App\ObjectManager::getInstance()
  272. ->get(\Magento\Framework\MessageQueue\MessageController::class);
  273. }
  274. return $this->messageController;
  275. }
  276. }