1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- <?php
- /**
- * Copyright © Magento, Inc. All rights reserved.
- * See COPYING.txt for license details.
- */
- namespace Magento\Framework\Amqp\Bulk;
- use Magento\Framework\MessageQueue\Bulk\ExchangeInterface;
- use PhpAmqpLib\Message\AMQPMessage;
- use Magento\Framework\Communication\ConfigInterface as CommunicationConfigInterface;
- use Magento\Framework\MessageQueue\Publisher\ConfigInterface as PublisherConfig;
- /**
- * Used to send messages in bulk in AMQP queue.
- */
- class Exchange implements ExchangeInterface
- {
- /**
- * @var \Magento\Framework\Amqp\Config
- */
- private $amqpConfig;
- /**
- * @var CommunicationConfigInterface
- */
- private $communicationConfig;
- /**
- * @var PublisherConfig
- */
- private $publisherConfig;
- /**
- * @var \Magento\Framework\Amqp\Exchange
- */
- private $exchange;
- /**
- * Initialize dependencies.
- *
- * @param \Magento\Framework\Amqp\Config $amqpConfig
- * @param PublisherConfig $publisherConfig
- * @param CommunicationConfigInterface $communicationConfig
- * @param \Magento\Framework\Amqp\Exchange $exchange
- */
- public function __construct(
- \Magento\Framework\Amqp\Config $amqpConfig,
- PublisherConfig $publisherConfig,
- CommunicationConfigInterface $communicationConfig,
- \Magento\Framework\Amqp\Exchange $exchange
- ) {
- $this->amqpConfig = $amqpConfig;
- $this->communicationConfig = $communicationConfig;
- $this->publisherConfig = $publisherConfig;
- $this->exchange = $exchange;
- }
- /**
- * @inheritdoc
- */
- public function enqueue($topic, array $envelopes)
- {
- $topicData = $this->communicationConfig->getTopic($topic);
- $isSync = $topicData[CommunicationConfigInterface::TOPIC_IS_SYNCHRONOUS];
- if ($isSync) {
- $responses = [];
- foreach ($envelopes as $envelope) {
- $responses[] = $this->exchange->enqueue($topic, $envelope);
- }
- return $responses;
- }
- $channel = $this->amqpConfig->getChannel();
- $publisher = $this->publisherConfig->getPublisher($topic);
- $exchange = $publisher->getConnection()->getExchange();
- foreach ($envelopes as $envelope) {
- $msg = new AMQPMessage($envelope->getBody(), $envelope->getProperties());
- $channel->batch_basic_publish($msg, $exchange, $topic);
- }
- $channel->publish_batch();
- return null;
- }
- }
|