Publisher.php 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\Framework\MessageQueue\Bulk;
  7. use Magento\Framework\MessageQueue\EnvelopeFactory;
  8. use Magento\Framework\MessageQueue\PublisherInterface;
  9. use Magento\Framework\MessageQueue\MessageEncoder;
  10. use Magento\Framework\MessageQueue\MessageValidator;
  11. use Magento\Framework\MessageQueue\Publisher\ConfigInterface as PublisherConfig;
  12. /**
  13. * A MessageQueue Publisher to handle publishing messages in bulk.
  14. */
  15. class Publisher implements PublisherInterface
  16. {
  17. /**
  18. * @var ExchangeRepository
  19. */
  20. private $exchangeRepository;
  21. /**
  22. * @var EnvelopeFactory
  23. */
  24. private $envelopeFactory;
  25. /**
  26. * @var MessageEncoder
  27. */
  28. private $messageEncoder;
  29. /**
  30. * @var MessageValidator
  31. */
  32. private $messageValidator;
  33. /**
  34. * @var PublisherConfig
  35. */
  36. private $publisherConfig;
  37. /**
  38. * @var \Magento\Framework\MessageQueue\MessageIdGeneratorInterface
  39. */
  40. private $messageIdGenerator;
  41. /**
  42. * @param ExchangeRepository $exchangeRepository
  43. * @param EnvelopeFactory $envelopeFactory
  44. * @param MessageEncoder $messageEncoder
  45. * @param MessageValidator $messageValidator
  46. * @param PublisherConfig $publisherConfig
  47. * @param \Magento\Framework\MessageQueue\MessageIdGeneratorInterface $messageIdGenerator
  48. */
  49. public function __construct(
  50. ExchangeRepository $exchangeRepository,
  51. EnvelopeFactory $envelopeFactory,
  52. MessageEncoder $messageEncoder,
  53. MessageValidator $messageValidator,
  54. PublisherConfig $publisherConfig,
  55. \Magento\Framework\MessageQueue\MessageIdGeneratorInterface $messageIdGenerator
  56. ) {
  57. $this->exchangeRepository = $exchangeRepository;
  58. $this->envelopeFactory = $envelopeFactory;
  59. $this->messageEncoder = $messageEncoder;
  60. $this->messageValidator = $messageValidator;
  61. $this->publisherConfig = $publisherConfig;
  62. $this->messageIdGenerator = $messageIdGenerator;
  63. }
  64. /**
  65. * @inheritdoc
  66. */
  67. public function publish($topicName, $data)
  68. {
  69. $envelopes = [];
  70. foreach ($data as $message) {
  71. $this->messageValidator->validate($topicName, $message);
  72. $message = $this->messageEncoder->encode($topicName, $message);
  73. $envelopes[] = $this->envelopeFactory->create(
  74. [
  75. 'body' => $message,
  76. 'properties' => [
  77. 'delivery_mode' => 2,
  78. 'message_id' => $this->messageIdGenerator->generate($topicName),
  79. ]
  80. ]
  81. );
  82. }
  83. $publisher = $this->publisherConfig->getPublisher($topicName);
  84. $connectionName = $publisher->getConnection()->getName();
  85. $exchange = $this->exchangeRepository->getByConnectionName($connectionName);
  86. $exchange->enqueue($topicName, $envelopes);
  87. return null;
  88. }
  89. }