Publisher.php 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\Framework\MessageQueue\Bulk\Rpc;
  7. use Magento\Framework\MessageQueue\PublisherInterface;
  8. use Magento\Framework\MessageQueue\EnvelopeFactory;
  9. use Magento\Framework\MessageQueue\Bulk\ExchangeRepository;
  10. use PhpAmqpLib\Message\AMQPMessage;
  11. use Magento\Framework\MessageQueue\MessageEncoder;
  12. use Magento\Framework\MessageQueue\MessageValidator;
  13. use Magento\Framework\MessageQueue\Publisher\ConfigInterface as PublisherConfig;
  14. use Magento\Framework\MessageQueue\Rpc\ResponseQueueNameBuilder;
  15. /**
  16. * A MessageQueue Publisher to handle publishing a message.
  17. */
  18. class Publisher implements PublisherInterface
  19. {
  20. /**
  21. * @var ExchangeRepository
  22. */
  23. private $exchangeRepository;
  24. /**
  25. * @var EnvelopeFactory
  26. */
  27. private $envelopeFactory;
  28. /**
  29. * @var MessageEncoder
  30. */
  31. private $messageEncoder;
  32. /**
  33. * @var MessageValidator
  34. */
  35. private $messageValidator;
  36. /**
  37. * @var ResponseQueueNameBuilder
  38. */
  39. private $responseQueueNameBuilder;
  40. /**
  41. * @var PublisherConfig
  42. */
  43. private $publisherConfig;
  44. /**
  45. * @var \Magento\Framework\MessageQueue\MessageIdGeneratorInterface
  46. */
  47. private $messageIdGenerator;
  48. /**
  49. * @param ExchangeRepository $exchangeRepository
  50. * @param EnvelopeFactory $envelopeFactory
  51. * @param MessageEncoder $messageEncoder
  52. * @param MessageValidator $messageValidator
  53. * @param ResponseQueueNameBuilder $responseQueueNameBuilder
  54. * @param PublisherConfig $publisherConfig
  55. * @param \Magento\Framework\MessageQueue\MessageIdGeneratorInterface $messageIdGenerator
  56. */
  57. public function __construct(
  58. ExchangeRepository $exchangeRepository,
  59. EnvelopeFactory $envelopeFactory,
  60. MessageEncoder $messageEncoder,
  61. MessageValidator $messageValidator,
  62. ResponseQueueNameBuilder $responseQueueNameBuilder,
  63. PublisherConfig $publisherConfig,
  64. \Magento\Framework\MessageQueue\MessageIdGeneratorInterface $messageIdGenerator
  65. ) {
  66. $this->exchangeRepository = $exchangeRepository;
  67. $this->envelopeFactory = $envelopeFactory;
  68. $this->messageEncoder = $messageEncoder;
  69. $this->messageValidator = $messageValidator;
  70. $this->responseQueueNameBuilder = $responseQueueNameBuilder;
  71. $this->publisherConfig = $publisherConfig;
  72. $this->messageIdGenerator = $messageIdGenerator;
  73. }
  74. /**
  75. * @inheritdoc
  76. */
  77. public function publish($topicName, $data)
  78. {
  79. $envelopes = [];
  80. $replyTo = $this->responseQueueNameBuilder->getQueueName($topicName);
  81. foreach ($data as $message) {
  82. $this->messageValidator->validate($topicName, $message);
  83. $message = $this->messageEncoder->encode($topicName, $message);
  84. $envelope = $this->envelopeFactory->create(
  85. [
  86. 'body' => $message,
  87. 'properties' => [
  88. 'reply_to' => $replyTo,
  89. 'delivery_mode' => 2,
  90. 'correlation_id' => rand(),
  91. 'message_id' => $this->messageIdGenerator->generate($topicName),
  92. ]
  93. ]
  94. );
  95. $envelopes[] = $envelope;
  96. }
  97. $publisher = $this->publisherConfig->getPublisher($topicName);
  98. $connectionName = $publisher->getConnection()->getName();
  99. $exchange = $this->exchangeRepository->getByConnectionName($connectionName);
  100. return $exchange->enqueue($topicName, $envelopes);
  101. }
  102. }