Queue.php 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\MysqlMq\Model\Driver;
  7. use Magento\Framework\MessageQueue\EnvelopeInterface;
  8. use Magento\Framework\MessageQueue\QueueInterface;
  9. use Magento\MysqlMq\Model\QueueManagement;
  10. use Magento\Framework\MessageQueue\EnvelopeFactory;
  11. use Psr\Log\LoggerInterface;
  12. /**
  13. * Queue based on MessageQueue protocol
  14. */
  15. class Queue implements QueueInterface
  16. {
  17. /**
  18. * @var QueueManagement
  19. */
  20. private $queueManagement;
  21. /**
  22. * @var EnvelopeFactory
  23. */
  24. private $envelopeFactory;
  25. /**
  26. * @var string
  27. */
  28. private $queueName;
  29. /**
  30. * @var int
  31. */
  32. private $interval;
  33. /**
  34. * @var int
  35. */
  36. private $maxNumberOfTrials;
  37. /**
  38. * @var LoggerInterface $logger
  39. */
  40. private $logger;
  41. /**
  42. * Queue constructor.
  43. *
  44. * @param QueueManagement $queueManagement
  45. * @param EnvelopeFactory $envelopeFactory
  46. * @param LoggerInterface $logger
  47. * @param string $queueName
  48. * @param int $interval
  49. * @param int $maxNumberOfTrials
  50. */
  51. public function __construct(
  52. QueueManagement $queueManagement,
  53. EnvelopeFactory $envelopeFactory,
  54. LoggerInterface $logger,
  55. $queueName,
  56. $interval = 5,
  57. $maxNumberOfTrials = 3
  58. ) {
  59. $this->queueManagement = $queueManagement;
  60. $this->envelopeFactory = $envelopeFactory;
  61. $this->queueName = $queueName;
  62. $this->interval = $interval;
  63. $this->maxNumberOfTrials = $maxNumberOfTrials;
  64. $this->logger = $logger;
  65. }
  66. /**
  67. * {@inheritdoc}
  68. */
  69. public function dequeue()
  70. {
  71. $envelope = null;
  72. $messages = $this->queueManagement->readMessages($this->queueName, 1);
  73. if (isset($messages[0])) {
  74. $properties = $messages[0];
  75. $body = $properties[QueueManagement::MESSAGE_BODY];
  76. unset($properties[QueueManagement::MESSAGE_BODY]);
  77. $envelope = $this->envelopeFactory->create(['body' => $body, 'properties' => $properties]);
  78. }
  79. return $envelope;
  80. }
  81. /**
  82. * {@inheritdoc}
  83. */
  84. public function acknowledge(EnvelopeInterface $envelope)
  85. {
  86. $properties = $envelope->getProperties();
  87. $relationId = $properties[QueueManagement::MESSAGE_QUEUE_RELATION_ID];
  88. $this->queueManagement->changeStatus($relationId, QueueManagement::MESSAGE_STATUS_COMPLETE);
  89. }
  90. /**
  91. * {@inheritdoc}
  92. */
  93. public function subscribe($callback)
  94. {
  95. while (true) {
  96. while ($envelope = $this->dequeue()) {
  97. try {
  98. call_user_func($callback, $envelope);
  99. $this->acknowledge($envelope);
  100. } catch (\Exception $e) {
  101. $this->reject($envelope);
  102. }
  103. }
  104. sleep($this->interval);
  105. }
  106. }
  107. /**
  108. * {@inheritdoc}
  109. */
  110. public function reject(EnvelopeInterface $envelope, $requeue = true, $rejectionMessage = null)
  111. {
  112. $properties = $envelope->getProperties();
  113. $relationId = $properties[QueueManagement::MESSAGE_QUEUE_RELATION_ID];
  114. if ($properties[QueueManagement::MESSAGE_NUMBER_OF_TRIALS] < $this->maxNumberOfTrials && $requeue) {
  115. $this->queueManagement->pushToQueueForRetry($relationId);
  116. } else {
  117. $this->queueManagement->changeStatus([$relationId], QueueManagement::MESSAGE_STATUS_ERROR);
  118. if ($rejectionMessage !== null) {
  119. $this->logger->critical(__('Message has been rejected: %1', $rejectionMessage));
  120. }
  121. }
  122. }
  123. /**
  124. * {@inheritDoc}
  125. */
  126. public function push(EnvelopeInterface $envelope)
  127. {
  128. $properties = $envelope->getProperties();
  129. $this->queueManagement->addMessageToQueues(
  130. $properties[QueueManagement::MESSAGE_TOPIC],
  131. $envelope->getBody(),
  132. [$this->queueName]
  133. );
  134. }
  135. }