MergedMessageProcessor.php 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\Framework\MessageQueue;
  7. /**
  8. * Processing messages implementing MergedMessageInterface.
  9. */
  10. class MergedMessageProcessor implements MessageProcessorInterface
  11. {
  12. /**
  13. * @var \Magento\Framework\MessageQueue\MessageStatusProcessor
  14. */
  15. private $messageStatusProcessor;
  16. /**
  17. * @param MessageStatusProcessor $messageStatusProcessor
  18. */
  19. public function __construct(MessageStatusProcessor $messageStatusProcessor)
  20. {
  21. $this->messageStatusProcessor = $messageStatusProcessor;
  22. }
  23. /**
  24. * @inheritdoc
  25. */
  26. public function process(
  27. QueueInterface $queue,
  28. ConsumerConfigurationInterface $configuration,
  29. array $messages,
  30. array $messagesToAcknowledge,
  31. array $mergedMessages
  32. ) {
  33. $this->messageStatusProcessor->acknowledgeMessages($queue, $messagesToAcknowledge);
  34. $this->dispatchMessages($queue, $configuration, $mergedMessages, $messages);
  35. }
  36. /**
  37. * Processing decoded messages, invoking callbacks, changing statuses for messages.
  38. *
  39. * @param QueueInterface $queue
  40. * @param ConsumerConfigurationInterface $configuration
  41. * @param array $messageList
  42. * @param array $originalMessages
  43. */
  44. private function dispatchMessages(
  45. QueueInterface $queue,
  46. ConsumerConfigurationInterface $configuration,
  47. array $messageList,
  48. array $originalMessages
  49. ) {
  50. $originalMessagesIds = [];
  51. try {
  52. foreach ($messageList as $topicName => $messages) {
  53. foreach ($messages as $message) {
  54. /**
  55. * @var \Magento\Framework\MessageQueue\MergedMessageInterface $message
  56. */
  57. $callbacks = $configuration->getHandlers($topicName);
  58. $originalMessagesIds = $message->getOriginalMessagesIds();
  59. foreach ($callbacks as $callback) {
  60. call_user_func($callback, $message->getMergedMessage());
  61. }
  62. $originalMessages = $this->getOriginalMessages($originalMessages, $originalMessagesIds);
  63. $this->messageStatusProcessor->acknowledgeMessages($queue, $originalMessages);
  64. }
  65. }
  66. } catch (\Exception $e) {
  67. $originalMessages = $this->getOriginalMessages($originalMessages, $originalMessagesIds);
  68. $this->messageStatusProcessor->rejectMessages($queue, $originalMessages);
  69. }
  70. }
  71. /**
  72. * Get original messages by messages ids.
  73. *
  74. * @param array $messages
  75. * @param array $messagesIds
  76. * @return array
  77. */
  78. private function getOriginalMessages(array $messages, array $messagesIds)
  79. {
  80. $originalMessages = [];
  81. foreach ($messagesIds as $messageId) {
  82. if (isset($messages[$messageId])) {
  83. $originalMessages[] = $messages[$messageId];
  84. }
  85. }
  86. return $originalMessages;
  87. }
  88. }