Exchange.php 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\Framework\Amqp;
  7. use Magento\Framework\Exception\LocalizedException;
  8. use Magento\Framework\MessageQueue\EnvelopeInterface;
  9. use Magento\Framework\MessageQueue\ExchangeInterface;
  10. use PhpAmqpLib\Message\AMQPMessage;
  11. use Magento\Framework\Communication\ConfigInterface as CommunicationConfigInterface;
  12. use Magento\Framework\MessageQueue\Publisher\ConfigInterface as PublisherConfig;
  13. use Magento\Framework\MessageQueue\Rpc\ResponseQueueNameBuilder;
  14. /**
  15. * Class message exchange.
  16. *
  17. * @api
  18. * @since 102.0.1
  19. */
  20. class Exchange implements ExchangeInterface
  21. {
  22. const RPC_CONNECTION_TIMEOUT = 30;
  23. /**
  24. * @var Config
  25. */
  26. private $amqpConfig;
  27. /**
  28. * @var CommunicationConfigInterface
  29. */
  30. private $communicationConfig;
  31. /**
  32. * @var int
  33. */
  34. private $rpcConnectionTimeout;
  35. /**
  36. * @var PublisherConfig
  37. */
  38. private $publisherConfig;
  39. /**
  40. * @var ResponseQueueNameBuilder
  41. */
  42. private $responseQueueNameBuilder;
  43. /**
  44. * Initialize dependencies.
  45. *
  46. * @param Config $amqpConfig
  47. * @param PublisherConfig $publisherConfig
  48. * @param ResponseQueueNameBuilder $responseQueueNameBuilder
  49. * @param CommunicationConfigInterface $communicationConfig
  50. * @param int $rpcConnectionTimeout
  51. */
  52. public function __construct(
  53. Config $amqpConfig,
  54. PublisherConfig $publisherConfig,
  55. ResponseQueueNameBuilder $responseQueueNameBuilder,
  56. CommunicationConfigInterface $communicationConfig,
  57. $rpcConnectionTimeout = self::RPC_CONNECTION_TIMEOUT
  58. ) {
  59. $this->amqpConfig = $amqpConfig;
  60. $this->communicationConfig = $communicationConfig;
  61. $this->rpcConnectionTimeout = $rpcConnectionTimeout;
  62. $this->publisherConfig = $publisherConfig;
  63. $this->responseQueueNameBuilder = $responseQueueNameBuilder;
  64. }
  65. /**
  66. * {@inheritdoc}
  67. * @since 102.0.1
  68. */
  69. public function enqueue($topic, EnvelopeInterface $envelope)
  70. {
  71. $topicData = $this->communicationConfig->getTopic($topic);
  72. $isSync = $topicData[CommunicationConfigInterface::TOPIC_IS_SYNCHRONOUS];
  73. $channel = $this->amqpConfig->getChannel();
  74. $exchange = $this->publisherConfig->getPublisher($topic)->getConnection()->getExchange();
  75. $responseBody = null;
  76. $msg = new AMQPMessage($envelope->getBody(), $envelope->getProperties());
  77. if ($isSync) {
  78. $correlationId = $envelope->getProperties()['correlation_id'];
  79. /** @var AMQPMessage $response */
  80. $callback = function ($response) use ($correlationId, &$responseBody, $channel) {
  81. if ($response->get('correlation_id') == $correlationId) {
  82. $responseBody = $response->body;
  83. $channel->basic_ack($response->get('delivery_tag'));
  84. } else {
  85. //push message back to the queue
  86. $channel->basic_reject($response->get('delivery_tag'), true);
  87. }
  88. };
  89. if ($envelope->getProperties()['reply_to']) {
  90. $replyTo = $envelope->getProperties()['reply_to'];
  91. } else {
  92. $replyTo = $this->responseQueueNameBuilder->getQueueName($topic);
  93. }
  94. $channel->basic_consume(
  95. $replyTo,
  96. '',
  97. false,
  98. false,
  99. false,
  100. false,
  101. $callback
  102. );
  103. $channel->basic_publish($msg, $exchange, $topic);
  104. while ($responseBody === null) {
  105. try {
  106. $channel->wait(null, false, $this->rpcConnectionTimeout);
  107. } catch (\PhpAmqpLib\Exception\AMQPTimeoutException $e) {
  108. throw new LocalizedException(
  109. new \Magento\Framework\Phrase(
  110. "The RPC (Remote Procedure Call) failed. The connection timed out after %time_out. "
  111. . "Please try again later.",
  112. ['time_out' => $this->rpcConnectionTimeout]
  113. )
  114. );
  115. }
  116. }
  117. } else {
  118. $channel->basic_publish($msg, $exchange, $topic);
  119. }
  120. return $responseBody;
  121. }
  122. }