amqpConfig = $amqpConfig; $this->communicationConfig = $communicationConfig; $this->publisherConfig = $publisherConfig; $this->exchange = $exchange; } /** * @inheritdoc */ public function enqueue($topic, array $envelopes) { $topicData = $this->communicationConfig->getTopic($topic); $isSync = $topicData[CommunicationConfigInterface::TOPIC_IS_SYNCHRONOUS]; if ($isSync) { $responses = []; foreach ($envelopes as $envelope) { $responses[] = $this->exchange->enqueue($topic, $envelope); } return $responses; } $channel = $this->amqpConfig->getChannel(); $publisher = $this->publisherConfig->getPublisher($topic); $exchange = $publisher->getConnection()->getExchange(); foreach ($envelopes as $envelope) { $msg = new AMQPMessage($envelope->getBody(), $envelope->getProperties()); $channel->batch_basic_publish($msg, $exchange, $topic); } $channel->publish_batch(); return null; } }