123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305 |
- <?php
- /**
- * Copyright © Magento, Inc. All rights reserved.
- * See COPYING.txt for license details.
- */
- namespace Magento\Framework\MessageQueue;
- use Magento\Framework\MessageQueue\ConfigInterface as MessageQueueConfig;
- use Magento\Framework\App\ResourceConnection;
- use Magento\Framework\MessageQueue\Consumer\ConfigInterface as ConsumerConfig;
- /**
- * Class BatchConsumer
- * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
- */
- class BatchConsumer implements ConsumerInterface
- {
- /**
- * @var ConsumerConfigurationInterface
- */
- private $configuration;
- /**
- * @var MessageEncoder
- */
- private $messageEncoder;
- /**
- * @var QueueRepository
- */
- private $queueRepository;
- /**
- * @var MergerFactory
- */
- private $mergerFactory;
- /**
- * @var int
- */
- private $interval;
- /**
- * @var int
- */
- private $batchSize;
- /**
- * @var MessageProcessorLoader
- */
- private $messageProcessorLoader;
- /**
- * @var Resource
- */
- private $resource;
- /**
- * @var MessageController
- */
- private $messageController;
- /**
- * @var ConsumerConfig
- */
- private $consumerConfig;
- /**
- * @param ConfigInterface $messageQueueConfig
- * @param MessageEncoder $messageEncoder
- * @param QueueRepository $queueRepository
- * @param MergerFactory $mergerFactory
- * @param ResourceConnection $resource
- * @param ConsumerConfigurationInterface $configuration
- * @param int $interval [optional]
- * @param int $batchSize [optional]
- * @param MessageProcessorLoader $messageProcessorLoader [optional]
- * @param MessageController $messageController [optional]
- * @param ConsumerConfig $consumerConfig [optional]
- *
- * @SuppressWarnings(PHPMD.UnusedFormalParameter)
- */
- public function __construct(
- MessageQueueConfig $messageQueueConfig,
- MessageEncoder $messageEncoder,
- QueueRepository $queueRepository,
- MergerFactory $mergerFactory,
- ResourceConnection $resource,
- ConsumerConfigurationInterface $configuration,
- $interval = 5,
- $batchSize = 0,
- MessageProcessorLoader $messageProcessorLoader = null
- ) {
- $this->messageEncoder = $messageEncoder;
- $this->queueRepository = $queueRepository;
- $this->mergerFactory = $mergerFactory;
- $this->interval = $interval;
- $this->batchSize = $batchSize;
- $this->resource = $resource;
- $this->configuration = $configuration;
- $this->messageProcessorLoader = $messageProcessorLoader
- ?: \Magento\Framework\App\ObjectManager::getInstance()->get(MessageProcessorLoader::class);
- }
- /**
- * {@inheritdoc}
- */
- public function process($maxNumberOfMessages = null)
- {
- $queueName = $this->configuration->getQueueName();
- $consumerName = $this->configuration->getConsumerName();
- $connectionName = $this->getConsumerConfig()->getConsumer($consumerName)->getConnection();
- $queue = $this->queueRepository->get($connectionName, $queueName);
- $merger = $this->mergerFactory->create($consumerName);
- if (!isset($maxNumberOfMessages)) {
- $this->runDaemonMode($queue, $merger);
- } else {
- $this->run($queue, $merger, $maxNumberOfMessages);
- }
- }
- /**
- * Run process in a daemon mode.
- *
- * @param QueueInterface $queue
- * @param MergerInterface $merger
- * @return void
- */
- private function runDaemonMode(QueueInterface $queue, MergerInterface $merger)
- {
- $transactionCallback = $this->getTransactionCallback($queue, $merger);
- while (true) {
- $messages = $this->batchSize > 0
- ? $this->getMessages($queue, $this->batchSize)
- : $this->getAllMessages($queue);
- $transactionCallback($messages);
- sleep($this->interval);
- }
- }
- /**
- * Run short running process.
- *
- * @param QueueInterface $queue
- * @param MergerInterface $merger
- * @param int $maxNumberOfMessages
- * @return void
- */
- private function run(QueueInterface $queue, MergerInterface $merger, $maxNumberOfMessages)
- {
- $count = $maxNumberOfMessages
- ? $maxNumberOfMessages
- : $this->configuration->getMaxMessages() ?: 1;
- $transactionCallback = $this->getTransactionCallback($queue, $merger);
- if ($this->batchSize) {
- while ($count > 0) {
- $messages = $this->getMessages($queue, $count > $this->batchSize ? $this->batchSize : $count);
- $transactionCallback($messages);
- $count -= $this->batchSize;
- }
- } else {
- $messages = $this->getMessages($queue, $count);
- $transactionCallback($messages);
- }
- }
- /**
- * Get all messages from a queue.
- *
- * @param QueueInterface $queue
- * @return EnvelopeInterface[]
- */
- private function getAllMessages(QueueInterface $queue)
- {
- $messages = [];
- while ($message = $queue->dequeue()) {
- $messages[] = $message;
- }
- return $messages;
- }
- /**
- * Get $count messages from a queue.
- *
- * @param QueueInterface $queue
- * @param int $count
- * @return EnvelopeInterface[]
- */
- private function getMessages(QueueInterface $queue, $count)
- {
- $messages = [];
- for ($i = $count; $i > 0; $i--) {
- $message = $queue->dequeue();
- if ($message === null) {
- break;
- }
- $messages[] = $message;
- }
- return $messages;
- }
- /**
- * Decode provided messages.
- *
- * @param EnvelopeInterface[] $messages
- * @return object[]
- */
- private function decodeMessages(array $messages)
- {
- $decodedMessages = [];
- foreach ($messages as $messageId => $message) {
- $properties = $message->getProperties();
- $topicName = $properties['topic_name'];
- $decodedMessages[$topicName][$messageId] = $this->messageEncoder->decode($topicName, $message->getBody());
- }
- return $decodedMessages;
- }
- /**
- * Get transaction callback.
- *
- * @param QueueInterface $queue
- * @param MergerInterface $merger
- * @return \Closure
- */
- private function getTransactionCallback(QueueInterface $queue, MergerInterface $merger)
- {
- return function (array $messages) use ($queue, $merger) {
- list($messages, $messagesToAcknowledge) = $this->lockMessages($messages);
- $decodedMessages = $this->decodeMessages($messages);
- $mergedMessages = $merger->merge($decodedMessages);
- $messageProcessor = $this->messageProcessorLoader->load($mergedMessages);
- $messageProcessor->process(
- $queue,
- $this->configuration,
- $messages,
- $messagesToAcknowledge,
- $mergedMessages
- );
- };
- }
- /**
- * Create lock for the messages.
- *
- * @param array $messages
- * @return array
- */
- private function lockMessages(array $messages)
- {
- $toProcess = [];
- $toAcknowledge = [];
- foreach ($messages as $message) {
- try {
- $this->getMessageController()->lock($message, $this->configuration->getConsumerName());
- $toProcess[] = $message;
- } catch (MessageLockException $exception) {
- $toAcknowledge[] = $message;
- }
- }
- return [$toProcess, $toAcknowledge];
- }
- /**
- * Get consumer config.
- *
- * This getter serves as a workaround to add this dependency to this class without breaking constructor structure
- *
- * @return ConsumerConfig
- *
- * @deprecated 102.0.1
- */
- private function getConsumerConfig()
- {
- if ($this->consumerConfig === null) {
- $this->consumerConfig = \Magento\Framework\App\ObjectManager::getInstance()->get(ConsumerConfig::class);
- }
- return $this->consumerConfig;
- }
- /**
- * Get message controller.
- *
- * This getter serves as a workaround to add this dependency to this class without breaking constructor structure
- *
- * @return MessageController
- *
- * @deprecated 102.0.1
- */
- private function getMessageController()
- {
- if ($this->messageController === null) {
- $this->messageController = \Magento\Framework\App\ObjectManager::getInstance()
- ->get(\Magento\Framework\MessageQueue\MessageController::class);
- }
- return $this->messageController;
- }
- }
|