Exchange.php 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\MysqlMq\Model\Driver\Bulk;
  7. use Magento\Framework\MessageQueue\Bulk\ExchangeInterface;
  8. use Magento\Framework\MessageQueue\ConfigInterface as MessageQueueConfig;
  9. use Magento\MysqlMq\Model\QueueManagement;
  10. /**
  11. * Used to send messages in bulk in MySQL queue.
  12. */
  13. class Exchange implements ExchangeInterface
  14. {
  15. /**
  16. * @var MessageQueueConfig
  17. */
  18. private $messageQueueConfig;
  19. /**
  20. * @var QueueManagement
  21. */
  22. private $queueManagement;
  23. /**
  24. * Initialize dependencies.
  25. *
  26. * @param MessageQueueConfig $messageQueueConfig
  27. * @param QueueManagement $queueManagement
  28. */
  29. public function __construct(MessageQueueConfig $messageQueueConfig, QueueManagement $queueManagement)
  30. {
  31. $this->messageQueueConfig = $messageQueueConfig;
  32. $this->queueManagement = $queueManagement;
  33. }
  34. /**
  35. * @inheritdoc
  36. */
  37. public function enqueue($topic, array $envelopes)
  38. {
  39. $queueNames = $this->messageQueueConfig->getQueuesByTopic($topic);
  40. $messages = array_map(
  41. function ($envelope) {
  42. return $envelope->getBody();
  43. },
  44. $envelopes
  45. );
  46. $this->queueManagement->addMessagesToQueues($topic, $messages, $queueNames);
  47. return null;
  48. }
  49. }