MessageProcessor.php 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\Framework\MessageQueue;
  7. use Magento\Framework\App\ResourceConnection;
  8. /**
  9. * Processes any type of messages except messages implementing MergedMessageInterface.
  10. */
  11. class MessageProcessor implements MessageProcessorInterface
  12. {
  13. /**
  14. * Maximum number of transaction retries
  15. */
  16. const MAX_TRANSACTION_RETRIES = 10;
  17. /**
  18. * @var \Magento\Framework\MessageQueue\MessageStatusProcessor
  19. */
  20. private $messageStatusProcessor;
  21. /**
  22. * @var \Magento\Framework\App\ResourceConnection
  23. */
  24. private $resource;
  25. /**
  26. * @var int
  27. */
  28. private $retryCount = 0;
  29. /**
  30. * @param MessageStatusProcessor $messageStatusProcessor
  31. * @param ResourceConnection $resource
  32. */
  33. public function __construct(
  34. MessageStatusProcessor $messageStatusProcessor,
  35. ResourceConnection $resource
  36. ) {
  37. $this->messageStatusProcessor = $messageStatusProcessor;
  38. $this->resource = $resource;
  39. }
  40. /**
  41. * @inheritdoc
  42. */
  43. public function process(
  44. QueueInterface $queue,
  45. ConsumerConfigurationInterface $configuration,
  46. array $messages,
  47. array $messagesToAcknowledge,
  48. array $mergedMessages
  49. ) {
  50. try {
  51. $this->resource->getConnection()->beginTransaction();
  52. $this->messageStatusProcessor->acknowledgeMessages($queue, $messagesToAcknowledge);
  53. $this->dispatchMessages($configuration, $mergedMessages);
  54. $this->resource->getConnection()->commit();
  55. $this->messageStatusProcessor->acknowledgeMessages($queue, $messages);
  56. } catch (ConnectionLostException $e) {
  57. $this->resource->getConnection()->rollBack();
  58. } catch (\Exception $e) {
  59. $retry = false;
  60. $this->resource->getConnection()->rollBack();
  61. if (strpos($e->getMessage(), 'Error while sending QUERY packet') !== false
  62. && $this->retryCount < self::MAX_TRANSACTION_RETRIES
  63. ) {
  64. $retry = true;
  65. $this->retryCount++;
  66. $this->resource->closeConnection();
  67. $this->process($queue, $configuration, $messages, $messagesToAcknowledge, $mergedMessages);
  68. }
  69. if (!$retry) {
  70. $this->messageStatusProcessor->rejectMessages($queue, $messages);
  71. }
  72. }
  73. }
  74. /**
  75. * Processes decoded messages, invokes callbacks, changes statuses for messages.
  76. *
  77. * @param ConsumerConfigurationInterface $configuration
  78. * @param array $messageList
  79. */
  80. private function dispatchMessages(ConsumerConfigurationInterface $configuration, array $messageList)
  81. {
  82. foreach ($messageList as $topicName => $messages) {
  83. foreach ($messages as $message) {
  84. $callbacks = $configuration->getHandlers($topicName);
  85. foreach ($callbacks as $callback) {
  86. call_user_func($callback, $message);
  87. }
  88. }
  89. }
  90. }
  91. }