Queue.php 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\Framework\Amqp;
  7. use Magento\Framework\MessageQueue\ConnectionLostException;
  8. use Magento\Framework\MessageQueue\EnvelopeInterface;
  9. use Magento\Framework\MessageQueue\QueueInterface;
  10. use PhpAmqpLib\Exception\AMQPProtocolConnectionException;
  11. use PhpAmqpLib\Message\AMQPMessage;
  12. use Magento\Framework\MessageQueue\EnvelopeFactory;
  13. use Psr\Log\LoggerInterface;
  14. /**
  15. * Class Queue
  16. *
  17. * @api
  18. * @since 102.0.1
  19. */
  20. class Queue implements QueueInterface
  21. {
  22. /**
  23. * @var Config
  24. */
  25. private $amqpConfig;
  26. /**
  27. * @var string
  28. */
  29. private $queueName;
  30. /**
  31. * @var EnvelopeFactory
  32. */
  33. private $envelopeFactory;
  34. /**
  35. * @var LoggerInterface
  36. */
  37. private $logger;
  38. /**
  39. * Initialize dependencies.
  40. *
  41. * @param Config $amqpConfig
  42. * @param EnvelopeFactory $envelopeFactory
  43. * @param string $queueName
  44. * @param LoggerInterface $logger
  45. */
  46. public function __construct(
  47. Config $amqpConfig,
  48. EnvelopeFactory $envelopeFactory,
  49. $queueName,
  50. LoggerInterface $logger
  51. ) {
  52. $this->amqpConfig = $amqpConfig;
  53. $this->queueName = $queueName;
  54. $this->envelopeFactory = $envelopeFactory;
  55. $this->logger = $logger;
  56. }
  57. /**
  58. * @inheritdoc
  59. * @since 102.0.1
  60. */
  61. public function dequeue()
  62. {
  63. $envelope = null;
  64. $channel = $this->amqpConfig->getChannel();
  65. // @codingStandardsIgnoreStart
  66. /** @var AMQPMessage $message */
  67. try {
  68. $message = $channel->basic_get($this->queueName);
  69. } catch (AMQPProtocolConnectionException $e) {
  70. throw new ConnectionLostException(
  71. $e->getMessage(),
  72. $e->getCode(),
  73. $e
  74. );
  75. }
  76. if ($message !== null) {
  77. $properties = array_merge(
  78. $message->get_properties(),
  79. [
  80. 'topic_name' => $message->delivery_info['routing_key'],
  81. 'delivery_tag' => $message->delivery_info['delivery_tag'],
  82. ]
  83. );
  84. $envelope = $this->envelopeFactory->create(['body' => $message->body, 'properties' => $properties]);
  85. }
  86. // @codingStandardsIgnoreEnd
  87. return $envelope;
  88. }
  89. /**
  90. * @inheritdoc
  91. * @since 102.0.1
  92. */
  93. public function acknowledge(EnvelopeInterface $envelope)
  94. {
  95. $properties = $envelope->getProperties();
  96. $channel = $this->amqpConfig->getChannel();
  97. // @codingStandardsIgnoreStart
  98. try {
  99. $channel->basic_ack($properties['delivery_tag']);
  100. } catch (AMQPProtocolConnectionException $e) {
  101. throw new ConnectionLostException(
  102. $e->getMessage(),
  103. $e->getCode(),
  104. $e
  105. );
  106. }
  107. // @codingStandardsIgnoreEnd
  108. }
  109. /**
  110. * @inheritdoc
  111. * @since 102.0.1
  112. */
  113. public function subscribe($callback)
  114. {
  115. $callbackConverter = function (AMQPMessage $message) use ($callback) {
  116. // @codingStandardsIgnoreStart
  117. $properties = array_merge(
  118. $message->get_properties(),
  119. [
  120. 'topic_name' => $message->delivery_info['routing_key'],
  121. 'delivery_tag' => $message->delivery_info['delivery_tag'],
  122. ]
  123. );
  124. // @codingStandardsIgnoreEnd
  125. $envelope = $this->envelopeFactory->create(['body' => $message->body, 'properties' => $properties]);
  126. if ($callback instanceof \Closure) {
  127. $callback($envelope);
  128. } else {
  129. call_user_func($callback, $envelope);
  130. }
  131. };
  132. $channel = $this->amqpConfig->getChannel();
  133. // @codingStandardsIgnoreStart
  134. $channel->basic_consume($this->queueName, '', false, false, false, false, $callbackConverter);
  135. // @codingStandardsIgnoreEnd
  136. while (count($channel->callbacks)) {
  137. $channel->wait();
  138. }
  139. }
  140. /**
  141. * @inheritdoc
  142. * @since 102.0.1
  143. */
  144. public function reject(EnvelopeInterface $envelope, $requeue = true, $rejectionMessage = null)
  145. {
  146. $properties = $envelope->getProperties();
  147. $channel = $this->amqpConfig->getChannel();
  148. // @codingStandardsIgnoreStart
  149. $channel->basic_reject($properties['delivery_tag'], $requeue);
  150. // @codingStandardsIgnoreEnd
  151. if ($rejectionMessage !== null) {
  152. $this->logger->critical(
  153. new \Magento\Framework\Phrase('Message has been rejected: %message', ['message' => $rejectionMessage])
  154. );
  155. }
  156. }
  157. /**
  158. * @inheritdoc
  159. * @since 102.0.1
  160. */
  161. public function push(EnvelopeInterface $envelope)
  162. {
  163. $messageProperties = $envelope->getProperties();
  164. $msg = new AMQPMessage(
  165. $envelope->getBody(),
  166. [
  167. 'correlation_id' => $messageProperties['correlation_id'],
  168. 'delivery_mode' => 2
  169. ]
  170. );
  171. $this->amqpConfig->getChannel()->basic_publish($msg, '', $this->queueName);
  172. }
  173. }