123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331 |
- <?php
- /**
- * Copyright © Magento, Inc. All rights reserved.
- * See COPYING.txt for license details.
- */
- namespace Magento\Framework\MessageQueue;
- use Magento\Framework\App\ResourceConnection;
- use Magento\Framework\Exception\LocalizedException;
- use Magento\Framework\Phrase;
- use Magento\Framework\MessageQueue\Consumer\ConfigInterface as ConsumerConfig;
- use Magento\Framework\Communication\ConfigInterface as CommunicationConfig;
- use Magento\Framework\MessageQueue\QueueRepository;
- use Psr\Log\LoggerInterface;
- /**
- * Class Consumer used to process a single message, unlike batch consumer.
- *
- * This could be used for both synchronous and asynchronous processing, depending on topic.
- *
- * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
- */
- class Consumer implements ConsumerInterface
- {
- /**
- * @var ConsumerConfigurationInterface
- */
- private $configuration;
- /**
- * @var ResourceConnection
- */
- private $resource;
- /**
- * @var MessageEncoder
- */
- private $messageEncoder;
- /**
- * @var CallbackInvoker
- */
- private $invoker;
- /**
- * @var MessageController
- */
- private $messageController;
- /**
- * @var QueueRepository
- */
- private $queueRepository;
- /**
- * @var EnvelopeFactory
- */
- private $envelopeFactory;
- /**
- * @var MessageValidator
- */
- private $messageValidator;
- /**
- * @var ConsumerConfig
- */
- private $consumerConfig;
- /**
- * @var CommunicationConfig
- */
- private $communicationConfig;
- /**
- * @var LoggerInterface
- */
- private $logger;
- /**
- * Initialize dependencies.
- *
- * @param CallbackInvoker $invoker
- * @param MessageEncoder $messageEncoder
- * @param ResourceConnection $resource
- * @param ConsumerConfigurationInterface $configuration
- * @param LoggerInterface $logger
- *
- * @SuppressWarnings(PHPMD.UnusedFormalParameter)
- */
- public function __construct(
- CallbackInvoker $invoker,
- MessageEncoder $messageEncoder,
- ResourceConnection $resource,
- ConsumerConfigurationInterface $configuration,
- LoggerInterface $logger = null
- ) {
- $this->invoker = $invoker;
- $this->messageEncoder = $messageEncoder;
- $this->resource = $resource;
- $this->configuration = $configuration;
- $this->logger = $logger ?: \Magento\Framework\App\ObjectManager::getInstance()->get(LoggerInterface::class);
- }
- /**
- * {@inheritdoc}
- */
- public function process($maxNumberOfMessages = null)
- {
- $queue = $this->configuration->getQueue();
- if (!isset($maxNumberOfMessages)) {
- $queue->subscribe($this->getTransactionCallback($queue));
- } else {
- $this->invoker->invoke($queue, $maxNumberOfMessages, $this->getTransactionCallback($queue));
- }
- }
- /**
- * Decode message and invoke callback method, return reply back for sync processing.
- *
- * @param EnvelopeInterface $message
- * @param boolean $isSync
- * @return string|null
- * @throws LocalizedException
- */
- private function dispatchMessage(EnvelopeInterface $message, $isSync = false)
- {
- $properties = $message->getProperties();
- $topicName = $properties['topic_name'];
- $handlers = $this->configuration->getHandlers($topicName);
- $decodedMessage = $this->messageEncoder->decode($topicName, $message->getBody());
- if (isset($decodedMessage)) {
- $messageSchemaType = $this->configuration->getMessageSchemaType($topicName);
- if ($messageSchemaType == CommunicationConfig::TOPIC_REQUEST_TYPE_METHOD) {
- foreach ($handlers as $callback) {
- $result = call_user_func_array($callback, $decodedMessage);
- return $this->processSyncResponse($topicName, $result);
- }
- } else {
- foreach ($handlers as $callback) {
- $result = call_user_func($callback, $decodedMessage);
- if ($isSync) {
- return $this->processSyncResponse($topicName, $result);
- }
- }
- }
- }
- return null;
- }
- /**
- * Validate and encode synchronous handler output.
- *
- * @param string $topicName
- * @param mixed $result
- * @return string
- * @throws LocalizedException
- */
- private function processSyncResponse($topicName, $result)
- {
- if (isset($result)) {
- $this->getMessageValidator()->validate($topicName, $result, false);
- return $this->messageEncoder->encode($topicName, $result, false);
- } else {
- throw new LocalizedException(new Phrase('No reply message resulted in RPC.'));
- }
- }
- /**
- * Send RPC response message.
- *
- * @param EnvelopeInterface $envelope
- * @return void
- */
- private function sendResponse(EnvelopeInterface $envelope)
- {
- $messageProperties = $envelope->getProperties();
- $connectionName = $this->getConsumerConfig()
- ->getConsumer($this->configuration->getConsumerName())->getConnection();
- $queue = $this->getQueueRepository()->get($connectionName, $messageProperties['reply_to']);
- $queue->push($envelope);
- }
- /**
- * Get transaction callback. This handles the case of both sync and async.
- *
- * @param QueueInterface $queue
- * @return \Closure
- */
- private function getTransactionCallback(QueueInterface $queue)
- {
- return function (EnvelopeInterface $message) use ($queue) {
- /** @var LockInterface $lock */
- $lock = null;
- try {
- $topicName = $message->getProperties()['topic_name'];
- $topicConfig = $this->getCommunicationConfig()->getTopic($topicName);
- $lock = $this->getMessageController()->lock($message, $this->configuration->getConsumerName());
- if ($topicConfig[CommunicationConfig::TOPIC_IS_SYNCHRONOUS]) {
- $responseBody = $this->dispatchMessage($message, true);
- $responseMessage = $this->getEnvelopeFactory()->create(
- ['body' => $responseBody, 'properties' => $message->getProperties()]
- );
- $this->sendResponse($responseMessage);
- } else {
- $allowedTopics = $this->configuration->getTopicNames();
- if (in_array($topicName, $allowedTopics)) {
- $this->dispatchMessage($message);
- } else {
- $queue->reject($message);
- return;
- }
- }
- $queue->acknowledge($message);
- } catch (MessageLockException $exception) {
- $queue->acknowledge($message);
- } catch (\Magento\Framework\MessageQueue\ConnectionLostException $e) {
- if ($lock) {
- $this->resource->getConnection()
- ->delete($this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()]);
- }
- } catch (\Magento\Framework\Exception\NotFoundException $e) {
- $queue->acknowledge($message);
- $this->logger->warning($e->getMessage());
- } catch (\Exception $e) {
- $queue->reject($message, false, $e->getMessage());
- if ($lock) {
- $this->resource->getConnection()
- ->delete($this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()]);
- }
- }
- };
- }
- /**
- * Get consumer config.
- *
- * @return ConsumerConfig
- *
- * @deprecated 102.0.1
- */
- private function getConsumerConfig()
- {
- if ($this->consumerConfig === null) {
- $this->consumerConfig = \Magento\Framework\App\ObjectManager::getInstance()->get(ConsumerConfig::class);
- }
- return $this->consumerConfig;
- }
- /**
- * Get communication config.
- *
- * @return CommunicationConfig
- *
- * @deprecated 102.0.1
- */
- private function getCommunicationConfig()
- {
- if ($this->communicationConfig === null) {
- $this->communicationConfig = \Magento\Framework\App\ObjectManager::getInstance()
- ->get(CommunicationConfig::class);
- }
- return $this->communicationConfig;
- }
- /**
- * Get queue repository.
- *
- * @return QueueRepository
- *
- * @deprecated 102.0.1
- */
- private function getQueueRepository()
- {
- if ($this->queueRepository === null) {
- $this->queueRepository = \Magento\Framework\App\ObjectManager::getInstance()->get(QueueRepository::class);
- }
- return $this->queueRepository;
- }
- /**
- * Get message controller.
- *
- * @return MessageController
- *
- * @deprecated 102.0.1
- */
- private function getMessageController()
- {
- if ($this->messageController === null) {
- $this->messageController = \Magento\Framework\App\ObjectManager::getInstance()
- ->get(MessageController::class);
- }
- return $this->messageController;
- }
- /**
- * Get message validator.
- *
- * @return MessageValidator
- *
- * @deprecated 102.0.1
- */
- private function getMessageValidator()
- {
- if ($this->messageValidator === null) {
- $this->messageValidator = \Magento\Framework\App\ObjectManager::getInstance()
- ->get(MessageValidator::class);
- }
- return $this->messageValidator;
- }
- /**
- * Get envelope factory.
- *
- * @return EnvelopeFactory
- *
- * @deprecated 102.0.1
- */
- private function getEnvelopeFactory()
- {
- if ($this->envelopeFactory === null) {
- $this->envelopeFactory = \Magento\Framework\App\ObjectManager::getInstance()
- ->get(EnvelopeFactory::class);
- }
- return $this->envelopeFactory;
- }
- }
|