1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- <?php
- /**
- * Copyright © Magento, Inc. All rights reserved.
- * See COPYING.txt for license details.
- */
- namespace Magento\Framework\MessageQueue;
- /**
- * Processing messages implementing MergedMessageInterface.
- */
- class MergedMessageProcessor implements MessageProcessorInterface
- {
- /**
- * @var \Magento\Framework\MessageQueue\MessageStatusProcessor
- */
- private $messageStatusProcessor;
- /**
- * @param MessageStatusProcessor $messageStatusProcessor
- */
- public function __construct(MessageStatusProcessor $messageStatusProcessor)
- {
- $this->messageStatusProcessor = $messageStatusProcessor;
- }
- /**
- * @inheritdoc
- */
- public function process(
- QueueInterface $queue,
- ConsumerConfigurationInterface $configuration,
- array $messages,
- array $messagesToAcknowledge,
- array $mergedMessages
- ) {
- $this->messageStatusProcessor->acknowledgeMessages($queue, $messagesToAcknowledge);
- $this->dispatchMessages($queue, $configuration, $mergedMessages, $messages);
- }
- /**
- * Processing decoded messages, invoking callbacks, changing statuses for messages.
- *
- * @param QueueInterface $queue
- * @param ConsumerConfigurationInterface $configuration
- * @param array $messageList
- * @param array $originalMessages
- */
- private function dispatchMessages(
- QueueInterface $queue,
- ConsumerConfigurationInterface $configuration,
- array $messageList,
- array $originalMessages
- ) {
- $originalMessagesIds = [];
- try {
- foreach ($messageList as $topicName => $messages) {
- foreach ($messages as $message) {
- /**
- * @var \Magento\Framework\MessageQueue\MergedMessageInterface $message
- */
- $callbacks = $configuration->getHandlers($topicName);
- $originalMessagesIds = $message->getOriginalMessagesIds();
- foreach ($callbacks as $callback) {
- call_user_func($callback, $message->getMergedMessage());
- }
- $originalMessages = $this->getOriginalMessages($originalMessages, $originalMessagesIds);
- $this->messageStatusProcessor->acknowledgeMessages($queue, $originalMessages);
- }
- }
- } catch (\Exception $e) {
- $originalMessages = $this->getOriginalMessages($originalMessages, $originalMessagesIds);
- $this->messageStatusProcessor->rejectMessages($queue, $originalMessages);
- }
- }
- /**
- * Get original messages by messages ids.
- *
- * @param array $messages
- * @param array $messagesIds
- * @return array
- */
- private function getOriginalMessages(array $messages, array $messagesIds)
- {
- $originalMessages = [];
- foreach ($messagesIds as $messageId) {
- if (isset($messages[$messageId])) {
- $originalMessages[] = $messages[$messageId];
- }
- }
- return $originalMessages;
- }
- }
|