| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331 | 
							- <?php
 
- /**
 
-  * Copyright © Magento, Inc. All rights reserved.
 
-  * See COPYING.txt for license details.
 
-  */
 
- namespace Magento\Framework\MessageQueue;
 
- use Magento\Framework\App\ResourceConnection;
 
- use Magento\Framework\Exception\LocalizedException;
 
- use Magento\Framework\Phrase;
 
- use Magento\Framework\MessageQueue\Consumer\ConfigInterface as ConsumerConfig;
 
- use Magento\Framework\Communication\ConfigInterface as CommunicationConfig;
 
- use Magento\Framework\MessageQueue\QueueRepository;
 
- use Psr\Log\LoggerInterface;
 
- /**
 
-  * Class Consumer used to process a single message, unlike batch consumer.
 
-  *
 
-  * This could be used for both synchronous and asynchronous processing, depending on topic.
 
-  *
 
-  * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
 
-  */
 
- class Consumer implements ConsumerInterface
 
- {
 
-     /**
 
-      * @var ConsumerConfigurationInterface
 
-      */
 
-     private $configuration;
 
-     /**
 
-      * @var ResourceConnection
 
-      */
 
-     private $resource;
 
-     /**
 
-      * @var MessageEncoder
 
-      */
 
-     private $messageEncoder;
 
-     /**
 
-      * @var CallbackInvoker
 
-      */
 
-     private $invoker;
 
-     /**
 
-      * @var MessageController
 
-      */
 
-     private $messageController;
 
-     /**
 
-      * @var QueueRepository
 
-      */
 
-     private $queueRepository;
 
-     /**
 
-      * @var EnvelopeFactory
 
-      */
 
-     private $envelopeFactory;
 
-     /**
 
-      * @var MessageValidator
 
-      */
 
-     private $messageValidator;
 
-     /**
 
-      * @var ConsumerConfig
 
-      */
 
-     private $consumerConfig;
 
-     /**
 
-      * @var CommunicationConfig
 
-      */
 
-     private $communicationConfig;
 
-     /**
 
-      * @var LoggerInterface
 
-      */
 
-     private $logger;
 
-     /**
 
-      * Initialize dependencies.
 
-      *
 
-      * @param CallbackInvoker $invoker
 
-      * @param MessageEncoder $messageEncoder
 
-      * @param ResourceConnection $resource
 
-      * @param ConsumerConfigurationInterface $configuration
 
-      * @param LoggerInterface $logger
 
-      *
 
-      * @SuppressWarnings(PHPMD.UnusedFormalParameter)
 
-      */
 
-     public function __construct(
 
-         CallbackInvoker $invoker,
 
-         MessageEncoder $messageEncoder,
 
-         ResourceConnection $resource,
 
-         ConsumerConfigurationInterface $configuration,
 
-         LoggerInterface $logger = null
 
-     ) {
 
-         $this->invoker = $invoker;
 
-         $this->messageEncoder = $messageEncoder;
 
-         $this->resource = $resource;
 
-         $this->configuration = $configuration;
 
-         $this->logger = $logger ?: \Magento\Framework\App\ObjectManager::getInstance()->get(LoggerInterface::class);
 
-     }
 
-     /**
 
-      * {@inheritdoc}
 
-      */
 
-     public function process($maxNumberOfMessages = null)
 
-     {
 
-         $queue = $this->configuration->getQueue();
 
-         if (!isset($maxNumberOfMessages)) {
 
-             $queue->subscribe($this->getTransactionCallback($queue));
 
-         } else {
 
-             $this->invoker->invoke($queue, $maxNumberOfMessages, $this->getTransactionCallback($queue));
 
-         }
 
-     }
 
-     /**
 
-      * Decode message and invoke callback method, return reply back for sync processing.
 
-      *
 
-      * @param EnvelopeInterface $message
 
-      * @param boolean $isSync
 
-      * @return string|null
 
-      * @throws LocalizedException
 
-      */
 
-     private function dispatchMessage(EnvelopeInterface $message, $isSync = false)
 
-     {
 
-         $properties = $message->getProperties();
 
-         $topicName = $properties['topic_name'];
 
-         $handlers = $this->configuration->getHandlers($topicName);
 
-         $decodedMessage = $this->messageEncoder->decode($topicName, $message->getBody());
 
-         if (isset($decodedMessage)) {
 
-             $messageSchemaType = $this->configuration->getMessageSchemaType($topicName);
 
-             if ($messageSchemaType == CommunicationConfig::TOPIC_REQUEST_TYPE_METHOD) {
 
-                 foreach ($handlers as $callback) {
 
-                     $result = call_user_func_array($callback, $decodedMessage);
 
-                     return $this->processSyncResponse($topicName, $result);
 
-                 }
 
-             } else {
 
-                 foreach ($handlers as $callback) {
 
-                     $result = call_user_func($callback, $decodedMessage);
 
-                     if ($isSync) {
 
-                         return $this->processSyncResponse($topicName, $result);
 
-                     }
 
-                 }
 
-             }
 
-         }
 
-         return null;
 
-     }
 
-     /**
 
-      * Validate and encode synchronous handler output.
 
-      *
 
-      * @param string $topicName
 
-      * @param mixed $result
 
-      * @return string
 
-      * @throws LocalizedException
 
-      */
 
-     private function processSyncResponse($topicName, $result)
 
-     {
 
-         if (isset($result)) {
 
-             $this->getMessageValidator()->validate($topicName, $result, false);
 
-             return $this->messageEncoder->encode($topicName, $result, false);
 
-         } else {
 
-             throw new LocalizedException(new Phrase('No reply message resulted in RPC.'));
 
-         }
 
-     }
 
-     /**
 
-      * Send RPC response message.
 
-      *
 
-      * @param EnvelopeInterface $envelope
 
-      * @return void
 
-      */
 
-     private function sendResponse(EnvelopeInterface $envelope)
 
-     {
 
-         $messageProperties = $envelope->getProperties();
 
-         $connectionName = $this->getConsumerConfig()
 
-             ->getConsumer($this->configuration->getConsumerName())->getConnection();
 
-         $queue = $this->getQueueRepository()->get($connectionName, $messageProperties['reply_to']);
 
-         $queue->push($envelope);
 
-     }
 
-     /**
 
-      * Get transaction callback. This handles the case of both sync and async.
 
-      *
 
-      * @param QueueInterface $queue
 
-      * @return \Closure
 
-      */
 
-     private function getTransactionCallback(QueueInterface $queue)
 
-     {
 
-         return function (EnvelopeInterface $message) use ($queue) {
 
-             /** @var LockInterface $lock */
 
-             $lock = null;
 
-             try {
 
-                 $topicName = $message->getProperties()['topic_name'];
 
-                 $topicConfig = $this->getCommunicationConfig()->getTopic($topicName);
 
-                 $lock = $this->getMessageController()->lock($message, $this->configuration->getConsumerName());
 
-                 if ($topicConfig[CommunicationConfig::TOPIC_IS_SYNCHRONOUS]) {
 
-                     $responseBody = $this->dispatchMessage($message, true);
 
-                     $responseMessage = $this->getEnvelopeFactory()->create(
 
-                         ['body' => $responseBody, 'properties' => $message->getProperties()]
 
-                     );
 
-                     $this->sendResponse($responseMessage);
 
-                 } else {
 
-                     $allowedTopics = $this->configuration->getTopicNames();
 
-                     if (in_array($topicName, $allowedTopics)) {
 
-                         $this->dispatchMessage($message);
 
-                     } else {
 
-                         $queue->reject($message);
 
-                         return;
 
-                     }
 
-                 }
 
-                 $queue->acknowledge($message);
 
-             } catch (MessageLockException $exception) {
 
-                 $queue->acknowledge($message);
 
-             } catch (\Magento\Framework\MessageQueue\ConnectionLostException $e) {
 
-                 if ($lock) {
 
-                     $this->resource->getConnection()
 
-                         ->delete($this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()]);
 
-                 }
 
-             } catch (\Magento\Framework\Exception\NotFoundException $e) {
 
-                 $queue->acknowledge($message);
 
-                 $this->logger->warning($e->getMessage());
 
-             } catch (\Exception $e) {
 
-                 $queue->reject($message, false, $e->getMessage());
 
-                 if ($lock) {
 
-                     $this->resource->getConnection()
 
-                         ->delete($this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()]);
 
-                 }
 
-             }
 
-         };
 
-     }
 
-     /**
 
-      * Get consumer config.
 
-      *
 
-      * @return ConsumerConfig
 
-      *
 
-      * @deprecated 102.0.1
 
-      */
 
-     private function getConsumerConfig()
 
-     {
 
-         if ($this->consumerConfig === null) {
 
-             $this->consumerConfig = \Magento\Framework\App\ObjectManager::getInstance()->get(ConsumerConfig::class);
 
-         }
 
-         return $this->consumerConfig;
 
-     }
 
-     /**
 
-      * Get communication config.
 
-      *
 
-      * @return CommunicationConfig
 
-      *
 
-      * @deprecated 102.0.1
 
-      */
 
-     private function getCommunicationConfig()
 
-     {
 
-         if ($this->communicationConfig === null) {
 
-             $this->communicationConfig = \Magento\Framework\App\ObjectManager::getInstance()
 
-                 ->get(CommunicationConfig::class);
 
-         }
 
-         return $this->communicationConfig;
 
-     }
 
-     /**
 
-      * Get queue repository.
 
-      *
 
-      * @return QueueRepository
 
-      *
 
-      * @deprecated 102.0.1
 
-      */
 
-     private function getQueueRepository()
 
-     {
 
-         if ($this->queueRepository === null) {
 
-             $this->queueRepository = \Magento\Framework\App\ObjectManager::getInstance()->get(QueueRepository::class);
 
-         }
 
-         return $this->queueRepository;
 
-     }
 
-     /**
 
-      * Get message controller.
 
-      *
 
-      * @return MessageController
 
-      *
 
-      * @deprecated 102.0.1
 
-      */
 
-     private function getMessageController()
 
-     {
 
-         if ($this->messageController === null) {
 
-             $this->messageController = \Magento\Framework\App\ObjectManager::getInstance()
 
-                 ->get(MessageController::class);
 
-         }
 
-         return $this->messageController;
 
-     }
 
-     /**
 
-      * Get message validator.
 
-      *
 
-      * @return MessageValidator
 
-      *
 
-      * @deprecated 102.0.1
 
-      */
 
-     private function getMessageValidator()
 
-     {
 
-         if ($this->messageValidator === null) {
 
-             $this->messageValidator = \Magento\Framework\App\ObjectManager::getInstance()
 
-                 ->get(MessageValidator::class);
 
-         }
 
-         return $this->messageValidator;
 
-     }
 
-     /**
 
-      * Get envelope factory.
 
-      *
 
-      * @return EnvelopeFactory
 
-      *
 
-      * @deprecated 102.0.1
 
-      */
 
-     private function getEnvelopeFactory()
 
-     {
 
-         if ($this->envelopeFactory === null) {
 
-             $this->envelopeFactory = \Magento\Framework\App\ObjectManager::getInstance()
 
-                 ->get(EnvelopeFactory::class);
 
-         }
 
-         return $this->envelopeFactory;
 
-     }
 
- }
 
 
  |