TopologyInstaller.php 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\Framework\Amqp;
  7. use Magento\Framework\Amqp\Topology\ExchangeInstaller;
  8. use Magento\Framework\Amqp\Topology\QueueInstaller;
  9. use Magento\Framework\MessageQueue\Topology\ConfigInterface;
  10. /**
  11. * Class Topology creates topology for Amqp messaging
  12. */
  13. class TopologyInstaller
  14. {
  15. /**
  16. * @var ConfigInterface
  17. */
  18. private $topologyConfig;
  19. /**
  20. * @var \Magento\Framework\Amqp\Topology\ExchangeInstaller
  21. */
  22. private $exchangeInstaller;
  23. /**
  24. * @var ConfigPool
  25. */
  26. private $configPool;
  27. /**
  28. * @var \Magento\Framework\Amqp\Topology\QueueInstaller
  29. */
  30. private $queueInstaller;
  31. /**
  32. * @var ConnectionTypeResolver
  33. */
  34. private $connectionTypeResolver;
  35. /**
  36. * @var \Psr\Log\LoggerInterface
  37. */
  38. protected $logger;
  39. /**
  40. * Initialize dependencies.
  41. *
  42. * @param ConfigInterface $topologyConfig
  43. * @param ExchangeInstaller $exchangeInstaller
  44. * @param ConfigPool $configPool
  45. * @param QueueInstaller $queueInstaller
  46. * @param ConnectionTypeResolver $connectionTypeResolver
  47. * @param \Psr\Log\LoggerInterface $logger
  48. */
  49. public function __construct(
  50. ConfigInterface $topologyConfig,
  51. ExchangeInstaller $exchangeInstaller,
  52. ConfigPool $configPool,
  53. QueueInstaller $queueInstaller,
  54. ConnectionTypeResolver $connectionTypeResolver,
  55. \Psr\Log\LoggerInterface $logger
  56. ) {
  57. $this->topologyConfig = $topologyConfig;
  58. $this->exchangeInstaller = $exchangeInstaller;
  59. $this->configPool = $configPool;
  60. $this->queueInstaller = $queueInstaller;
  61. $this->connectionTypeResolver = $connectionTypeResolver;
  62. $this->logger = $logger;
  63. }
  64. /**
  65. * Install Amqp Exchanges, Queues and bind them
  66. *
  67. * @return void
  68. */
  69. public function install()
  70. {
  71. try {
  72. foreach ($this->topologyConfig->getQueues() as $queue) {
  73. if ($this->connectionTypeResolver->getConnectionType($queue->getConnection()) != 'amqp') {
  74. continue;
  75. }
  76. $amqpConfig = $this->configPool->get($queue->getConnection());
  77. $this->queueInstaller->install($amqpConfig->getChannel(), $queue);
  78. }
  79. foreach ($this->topologyConfig->getExchanges() as $exchange) {
  80. if ($this->connectionTypeResolver->getConnectionType($exchange->getConnection()) != 'amqp') {
  81. continue;
  82. }
  83. $amqpConfig = $this->configPool->get($exchange->getConnection());
  84. $this->exchangeInstaller->install($amqpConfig->getChannel(), $exchange);
  85. }
  86. } catch (\Exception $e) {
  87. $this->logger->error("AMQP topology installation failed: {$e->getMessage()}\n{$e->getTraceAsString()}");
  88. }
  89. }
  90. }