topologyConfig = $topologyConfig; $this->exchangeInstaller = $exchangeInstaller; $this->configPool = $configPool; $this->queueInstaller = $queueInstaller; $this->connectionTypeResolver = $connectionTypeResolver; $this->logger = $logger; } /** * Install Amqp Exchanges, Queues and bind them * * @return void */ public function install() { try { foreach ($this->topologyConfig->getQueues() as $queue) { if ($this->connectionTypeResolver->getConnectionType($queue->getConnection()) != 'amqp') { continue; } $amqpConfig = $this->configPool->get($queue->getConnection()); $this->queueInstaller->install($amqpConfig->getChannel(), $queue); } foreach ($this->topologyConfig->getExchanges() as $exchange) { if ($this->connectionTypeResolver->getConnectionType($exchange->getConnection()) != 'amqp') { continue; } $amqpConfig = $this->configPool->get($exchange->getConnection()); $this->exchangeInstaller->install($amqpConfig->getChannel(), $exchange); } } catch (\Exception $e) { $this->logger->error("AMQP topology installation failed: {$e->getMessage()}\n{$e->getTraceAsString()}"); } } }