| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 | 
							- <?php
 
- /**
 
-  * Copyright © Magento, Inc. All rights reserved.
 
-  * See COPYING.txt for license details.
 
-  */
 
- declare(strict_types=1);
 
- namespace Magento\AsynchronousOperations\Model;
 
- use Magento\Framework\App\ResourceConnection;
 
- use Psr\Log\LoggerInterface;
 
- use Magento\Framework\MessageQueue\MessageLockException;
 
- use Magento\Framework\MessageQueue\ConnectionLostException;
 
- use Magento\Framework\Exception\NotFoundException;
 
- use Magento\Framework\MessageQueue\CallbackInvoker;
 
- use Magento\Framework\MessageQueue\ConsumerConfigurationInterface;
 
- use Magento\Framework\MessageQueue\EnvelopeInterface;
 
- use Magento\Framework\MessageQueue\QueueInterface;
 
- use Magento\Framework\MessageQueue\LockInterface;
 
- use Magento\Framework\MessageQueue\MessageController;
 
- use Magento\Framework\MessageQueue\ConsumerInterface;
 
- /**
 
-  * Class Consumer used to process OperationInterface messages.
 
-  *
 
-  * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
 
-  */
 
- class MassConsumer implements ConsumerInterface
 
- {
 
-     /**
 
-      * @var \Magento\Framework\MessageQueue\CallbackInvoker
 
-      */
 
-     private $invoker;
 
-     /**
 
-      * @var \Magento\Framework\App\ResourceConnection
 
-      */
 
-     private $resource;
 
-     /**
 
-      * @var \Magento\Framework\MessageQueue\ConsumerConfigurationInterface
 
-      */
 
-     private $configuration;
 
-     /**
 
-      * @var \Magento\Framework\MessageQueue\MessageController
 
-      */
 
-     private $messageController;
 
-     /**
 
-      * @var LoggerInterface
 
-      */
 
-     private $logger;
 
-     /**
 
-      * @var OperationProcessor
 
-      */
 
-     private $operationProcessor;
 
-     /**
 
-      * Initialize dependencies.
 
-      *
 
-      * @param CallbackInvoker $invoker
 
-      * @param ResourceConnection $resource
 
-      * @param MessageController $messageController
 
-      * @param ConsumerConfigurationInterface $configuration
 
-      * @param OperationProcessorFactory $operationProcessorFactory
 
-      * @param LoggerInterface $logger
 
-      */
 
-     public function __construct(
 
-         CallbackInvoker $invoker,
 
-         ResourceConnection $resource,
 
-         MessageController $messageController,
 
-         ConsumerConfigurationInterface $configuration,
 
-         OperationProcessorFactory $operationProcessorFactory,
 
-         LoggerInterface $logger
 
-     ) {
 
-         $this->invoker = $invoker;
 
-         $this->resource = $resource;
 
-         $this->messageController = $messageController;
 
-         $this->configuration = $configuration;
 
-         $this->operationProcessor = $operationProcessorFactory->create([
 
-             'configuration' => $configuration
 
-         ]);
 
-         $this->logger = $logger;
 
-     }
 
-     /**
 
-      * {@inheritdoc}
 
-      */
 
-     public function process($maxNumberOfMessages = null)
 
-     {
 
-         $queue = $this->configuration->getQueue();
 
-         if (!isset($maxNumberOfMessages)) {
 
-             $queue->subscribe($this->getTransactionCallback($queue));
 
-         } else {
 
-             $this->invoker->invoke($queue, $maxNumberOfMessages, $this->getTransactionCallback($queue));
 
-         }
 
-     }
 
-     /**
 
-      * Get transaction callback. This handles the case of async.
 
-      *
 
-      * @param QueueInterface $queue
 
-      * @return \Closure
 
-      */
 
-     private function getTransactionCallback(QueueInterface $queue)
 
-     {
 
-         return function (EnvelopeInterface $message) use ($queue) {
 
-             /** @var LockInterface $lock */
 
-             $lock = null;
 
-             try {
 
-                 $topicName = $message->getProperties()['topic_name'];
 
-                 $lock = $this->messageController->lock($message, $this->configuration->getConsumerName());
 
-                 $allowedTopics = $this->configuration->getTopicNames();
 
-                 if (in_array($topicName, $allowedTopics)) {
 
-                     $this->operationProcessor->process($message->getBody());
 
-                 } else {
 
-                     $queue->reject($message);
 
-                     return;
 
-                 }
 
-                 $queue->acknowledge($message);
 
-             } catch (MessageLockException $exception) {
 
-                 $queue->acknowledge($message);
 
-             } catch (ConnectionLostException $e) {
 
-                 if ($lock) {
 
-                     $this->resource->getConnection()
 
-                         ->delete($this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()]);
 
-                 }
 
-             } catch (NotFoundException $e) {
 
-                 $queue->acknowledge($message);
 
-                 $this->logger->warning($e->getMessage());
 
-             } catch (\Exception $e) {
 
-                 $queue->reject($message, false, $e->getMessage());
 
-                 if ($lock) {
 
-                     $this->resource->getConnection()
 
-                         ->delete($this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()]);
 
-                 }
 
-             }
 
-         };
 
-     }
 
- }
 
 
  |