| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 | 
							- <?php
 
- /**
 
-  * Copyright © Magento, Inc. All rights reserved.
 
-  * See COPYING.txt for license details.
 
-  */
 
- namespace Magento\Framework\MessageQueue;
 
- use Magento\Framework\App\ResourceConnection;
 
- /**
 
-  * Processes any type of messages except messages implementing MergedMessageInterface.
 
-  */
 
- class MessageProcessor implements MessageProcessorInterface
 
- {
 
-     /**
 
-      * Maximum number of transaction retries
 
-      */
 
-     const MAX_TRANSACTION_RETRIES = 10;
 
-     /**
 
-      * @var \Magento\Framework\MessageQueue\MessageStatusProcessor
 
-      */
 
-     private $messageStatusProcessor;
 
-     /**
 
-      * @var \Magento\Framework\App\ResourceConnection
 
-      */
 
-     private $resource;
 
-     /**
 
-      * @var int
 
-      */
 
-     private $retryCount = 0;
 
-     /**
 
-      * @param MessageStatusProcessor $messageStatusProcessor
 
-      * @param ResourceConnection $resource
 
-      */
 
-     public function __construct(
 
-         MessageStatusProcessor $messageStatusProcessor,
 
-         ResourceConnection $resource
 
-     ) {
 
-         $this->messageStatusProcessor = $messageStatusProcessor;
 
-         $this->resource = $resource;
 
-     }
 
-     /**
 
-      * @inheritdoc
 
-      */
 
-     public function process(
 
-         QueueInterface $queue,
 
-         ConsumerConfigurationInterface $configuration,
 
-         array $messages,
 
-         array $messagesToAcknowledge,
 
-         array $mergedMessages
 
-     ) {
 
-         try {
 
-             $this->resource->getConnection()->beginTransaction();
 
-             $this->messageStatusProcessor->acknowledgeMessages($queue, $messagesToAcknowledge);
 
-             $this->dispatchMessages($configuration, $mergedMessages);
 
-             $this->resource->getConnection()->commit();
 
-             $this->messageStatusProcessor->acknowledgeMessages($queue, $messages);
 
-         } catch (ConnectionLostException $e) {
 
-             $this->resource->getConnection()->rollBack();
 
-         } catch (\Exception $e) {
 
-             $retry = false;
 
-             $this->resource->getConnection()->rollBack();
 
-             if (strpos($e->getMessage(), 'Error while sending QUERY packet') !== false
 
-                 && $this->retryCount < self::MAX_TRANSACTION_RETRIES
 
-             ) {
 
-                 $retry = true;
 
-                 $this->retryCount++;
 
-                 $this->resource->closeConnection();
 
-                 $this->process($queue, $configuration, $messages, $messagesToAcknowledge, $mergedMessages);
 
-             }
 
-             if (!$retry) {
 
-                 $this->messageStatusProcessor->rejectMessages($queue, $messages);
 
-             }
 
-         }
 
-     }
 
-     /**
 
-      * Processes decoded messages, invokes callbacks, changes statuses for messages.
 
-      *
 
-      * @param ConsumerConfigurationInterface $configuration
 
-      * @param array $messageList
 
-      */
 
-     private function dispatchMessages(ConsumerConfigurationInterface $configuration, array $messageList)
 
-     {
 
-         foreach ($messageList as $topicName => $messages) {
 
-             foreach ($messages as $message) {
 
-                 $callbacks = $configuration->getHandlers($topicName);
 
-                 foreach ($callbacks as $callback) {
 
-                     call_user_func($callback, $message);
 
-                 }
 
-             }
 
-         }
 
-     }
 
- }
 
 
  |