MassPublisher.php 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. declare(strict_types=1);
  7. namespace Magento\AsynchronousOperations\Model;
  8. use Magento\Framework\MessageQueue\MessageValidator;
  9. use Magento\Framework\MessageQueue\MessageEncoder;
  10. use Magento\Framework\MessageQueue\Publisher\ConfigInterface as PublisherConfig;
  11. use Magento\Framework\MessageQueue\Bulk\ExchangeRepository;
  12. use Magento\Framework\MessageQueue\EnvelopeFactory;
  13. use Magento\AsynchronousOperations\Model\ConfigInterface as AsyncConfig;
  14. use Magento\Framework\MessageQueue\PublisherInterface;
  15. use Magento\Framework\MessageQueue\MessageIdGeneratorInterface;
  16. /**
  17. * Class MassPublisher used for encoding topic entities to OperationInterface and publish them.
  18. */
  19. class MassPublisher implements PublisherInterface
  20. {
  21. /**
  22. * @var \Magento\Framework\MessageQueue\Bulk\ExchangeRepository
  23. */
  24. private $exchangeRepository;
  25. /**
  26. * @var \Magento\Framework\MessageQueue\EnvelopeFactory
  27. */
  28. private $envelopeFactory;
  29. /**
  30. * @var \Magento\Framework\MessageQueue\MessageEncoder
  31. */
  32. private $messageEncoder;
  33. /**
  34. * @var \Magento\Framework\MessageQueue\MessageValidator
  35. */
  36. private $messageValidator;
  37. /**
  38. * @var \Magento\Framework\MessageQueue\Publisher\ConfigInterface
  39. */
  40. private $publisherConfig;
  41. /**
  42. * @var \Magento\Framework\MessageQueue\MessageIdGeneratorInterface
  43. */
  44. private $messageIdGenerator;
  45. /**
  46. * Initialize dependencies.
  47. *
  48. * @param \Magento\Framework\MessageQueue\Bulk\ExchangeRepository $exchangeRepository
  49. * @param \Magento\Framework\MessageQueue\EnvelopeFactory $envelopeFactory
  50. * @param \Magento\Framework\MessageQueue\MessageEncoder $messageEncoder
  51. * @param \Magento\Framework\MessageQueue\MessageValidator $messageValidator
  52. * @param \Magento\Framework\MessageQueue\Publisher\ConfigInterface $publisherConfig
  53. * @param \Magento\Framework\MessageQueue\MessageIdGeneratorInterface $messageIdGenerator
  54. */
  55. public function __construct(
  56. ExchangeRepository $exchangeRepository,
  57. EnvelopeFactory $envelopeFactory,
  58. MessageEncoder $messageEncoder,
  59. MessageValidator $messageValidator,
  60. PublisherConfig $publisherConfig,
  61. MessageIdGeneratorInterface $messageIdGenerator
  62. ) {
  63. $this->exchangeRepository = $exchangeRepository;
  64. $this->envelopeFactory = $envelopeFactory;
  65. $this->messageEncoder = $messageEncoder;
  66. $this->messageValidator = $messageValidator;
  67. $this->publisherConfig = $publisherConfig;
  68. $this->messageIdGenerator = $messageIdGenerator;
  69. }
  70. /**
  71. * {@inheritdoc}
  72. */
  73. public function publish($topicName, $data)
  74. {
  75. $envelopes = [];
  76. foreach ($data as $message) {
  77. $this->messageValidator->validate(AsyncConfig::SYSTEM_TOPIC_NAME, $message);
  78. $message = $this->messageEncoder->encode(AsyncConfig::SYSTEM_TOPIC_NAME, $message);
  79. $envelopes[] = $this->envelopeFactory->create(
  80. [
  81. 'body' => $message,
  82. 'properties' => [
  83. 'delivery_mode' => 2,
  84. 'message_id' => $this->messageIdGenerator->generate($topicName),
  85. ]
  86. ]
  87. );
  88. }
  89. $publisher = $this->publisherConfig->getPublisher($topicName);
  90. $connectionName = $publisher->getConnection()->getName();
  91. $exchange = $this->exchangeRepository->getByConnectionName($connectionName);
  92. $exchange->enqueue($topicName, $envelopes);
  93. return null;
  94. }
  95. }