MassConsumer.php 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. declare(strict_types=1);
  7. namespace Magento\AsynchronousOperations\Model;
  8. use Magento\Framework\App\ResourceConnection;
  9. use Psr\Log\LoggerInterface;
  10. use Magento\Framework\MessageQueue\MessageLockException;
  11. use Magento\Framework\MessageQueue\ConnectionLostException;
  12. use Magento\Framework\Exception\NotFoundException;
  13. use Magento\Framework\MessageQueue\CallbackInvoker;
  14. use Magento\Framework\MessageQueue\ConsumerConfigurationInterface;
  15. use Magento\Framework\MessageQueue\EnvelopeInterface;
  16. use Magento\Framework\MessageQueue\QueueInterface;
  17. use Magento\Framework\MessageQueue\LockInterface;
  18. use Magento\Framework\MessageQueue\MessageController;
  19. use Magento\Framework\MessageQueue\ConsumerInterface;
  20. /**
  21. * Class Consumer used to process OperationInterface messages.
  22. *
  23. * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
  24. */
  25. class MassConsumer implements ConsumerInterface
  26. {
  27. /**
  28. * @var \Magento\Framework\MessageQueue\CallbackInvoker
  29. */
  30. private $invoker;
  31. /**
  32. * @var \Magento\Framework\App\ResourceConnection
  33. */
  34. private $resource;
  35. /**
  36. * @var \Magento\Framework\MessageQueue\ConsumerConfigurationInterface
  37. */
  38. private $configuration;
  39. /**
  40. * @var \Magento\Framework\MessageQueue\MessageController
  41. */
  42. private $messageController;
  43. /**
  44. * @var LoggerInterface
  45. */
  46. private $logger;
  47. /**
  48. * @var OperationProcessor
  49. */
  50. private $operationProcessor;
  51. /**
  52. * Initialize dependencies.
  53. *
  54. * @param CallbackInvoker $invoker
  55. * @param ResourceConnection $resource
  56. * @param MessageController $messageController
  57. * @param ConsumerConfigurationInterface $configuration
  58. * @param OperationProcessorFactory $operationProcessorFactory
  59. * @param LoggerInterface $logger
  60. */
  61. public function __construct(
  62. CallbackInvoker $invoker,
  63. ResourceConnection $resource,
  64. MessageController $messageController,
  65. ConsumerConfigurationInterface $configuration,
  66. OperationProcessorFactory $operationProcessorFactory,
  67. LoggerInterface $logger
  68. ) {
  69. $this->invoker = $invoker;
  70. $this->resource = $resource;
  71. $this->messageController = $messageController;
  72. $this->configuration = $configuration;
  73. $this->operationProcessor = $operationProcessorFactory->create([
  74. 'configuration' => $configuration
  75. ]);
  76. $this->logger = $logger;
  77. }
  78. /**
  79. * {@inheritdoc}
  80. */
  81. public function process($maxNumberOfMessages = null)
  82. {
  83. $queue = $this->configuration->getQueue();
  84. if (!isset($maxNumberOfMessages)) {
  85. $queue->subscribe($this->getTransactionCallback($queue));
  86. } else {
  87. $this->invoker->invoke($queue, $maxNumberOfMessages, $this->getTransactionCallback($queue));
  88. }
  89. }
  90. /**
  91. * Get transaction callback. This handles the case of async.
  92. *
  93. * @param QueueInterface $queue
  94. * @return \Closure
  95. */
  96. private function getTransactionCallback(QueueInterface $queue)
  97. {
  98. return function (EnvelopeInterface $message) use ($queue) {
  99. /** @var LockInterface $lock */
  100. $lock = null;
  101. try {
  102. $topicName = $message->getProperties()['topic_name'];
  103. $lock = $this->messageController->lock($message, $this->configuration->getConsumerName());
  104. $allowedTopics = $this->configuration->getTopicNames();
  105. if (in_array($topicName, $allowedTopics)) {
  106. $this->operationProcessor->process($message->getBody());
  107. } else {
  108. $queue->reject($message);
  109. return;
  110. }
  111. $queue->acknowledge($message);
  112. } catch (MessageLockException $exception) {
  113. $queue->acknowledge($message);
  114. } catch (ConnectionLostException $e) {
  115. if ($lock) {
  116. $this->resource->getConnection()
  117. ->delete($this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()]);
  118. }
  119. } catch (NotFoundException $e) {
  120. $queue->acknowledge($message);
  121. $this->logger->warning($e->getMessage());
  122. } catch (\Exception $e) {
  123. $queue->reject($message, false, $e->getMessage());
  124. if ($lock) {
  125. $this->resource->getConnection()
  126. ->delete($this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()]);
  127. }
  128. }
  129. };
  130. }
  131. }