Exchange.php 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\Framework\Amqp\Bulk;
  7. use Magento\Framework\MessageQueue\Bulk\ExchangeInterface;
  8. use PhpAmqpLib\Message\AMQPMessage;
  9. use Magento\Framework\Communication\ConfigInterface as CommunicationConfigInterface;
  10. use Magento\Framework\MessageQueue\Publisher\ConfigInterface as PublisherConfig;
  11. /**
  12. * Used to send messages in bulk in AMQP queue.
  13. */
  14. class Exchange implements ExchangeInterface
  15. {
  16. /**
  17. * @var \Magento\Framework\Amqp\Config
  18. */
  19. private $amqpConfig;
  20. /**
  21. * @var CommunicationConfigInterface
  22. */
  23. private $communicationConfig;
  24. /**
  25. * @var PublisherConfig
  26. */
  27. private $publisherConfig;
  28. /**
  29. * @var \Magento\Framework\Amqp\Exchange
  30. */
  31. private $exchange;
  32. /**
  33. * Initialize dependencies.
  34. *
  35. * @param \Magento\Framework\Amqp\Config $amqpConfig
  36. * @param PublisherConfig $publisherConfig
  37. * @param CommunicationConfigInterface $communicationConfig
  38. * @param \Magento\Framework\Amqp\Exchange $exchange
  39. */
  40. public function __construct(
  41. \Magento\Framework\Amqp\Config $amqpConfig,
  42. PublisherConfig $publisherConfig,
  43. CommunicationConfigInterface $communicationConfig,
  44. \Magento\Framework\Amqp\Exchange $exchange
  45. ) {
  46. $this->amqpConfig = $amqpConfig;
  47. $this->communicationConfig = $communicationConfig;
  48. $this->publisherConfig = $publisherConfig;
  49. $this->exchange = $exchange;
  50. }
  51. /**
  52. * @inheritdoc
  53. */
  54. public function enqueue($topic, array $envelopes)
  55. {
  56. $topicData = $this->communicationConfig->getTopic($topic);
  57. $isSync = $topicData[CommunicationConfigInterface::TOPIC_IS_SYNCHRONOUS];
  58. if ($isSync) {
  59. $responses = [];
  60. foreach ($envelopes as $envelope) {
  61. $responses[] = $this->exchange->enqueue($topic, $envelope);
  62. }
  63. return $responses;
  64. }
  65. $channel = $this->amqpConfig->getChannel();
  66. $publisher = $this->publisherConfig->getPublisher($topic);
  67. $exchange = $publisher->getConnection()->getExchange();
  68. foreach ($envelopes as $envelope) {
  69. $msg = new AMQPMessage($envelope->getBody(), $envelope->getProperties());
  70. $channel->batch_basic_publish($msg, $exchange, $topic);
  71. }
  72. $channel->publish_batch();
  73. return null;
  74. }
  75. }