123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- <?php
- /**
- * Copyright © Magento, Inc. All rights reserved.
- * See COPYING.txt for license details.
- */
- namespace Magento\MysqlMq\Model\Driver;
- use Magento\Framework\MessageQueue\EnvelopeInterface;
- use Magento\Framework\MessageQueue\QueueInterface;
- use Magento\MysqlMq\Model\QueueManagement;
- use Magento\Framework\MessageQueue\EnvelopeFactory;
- use Psr\Log\LoggerInterface;
- /**
- * Queue based on MessageQueue protocol
- */
- class Queue implements QueueInterface
- {
- /**
- * @var QueueManagement
- */
- private $queueManagement;
- /**
- * @var EnvelopeFactory
- */
- private $envelopeFactory;
- /**
- * @var string
- */
- private $queueName;
- /**
- * @var int
- */
- private $interval;
- /**
- * @var int
- */
- private $maxNumberOfTrials;
- /**
- * @var LoggerInterface $logger
- */
- private $logger;
- /**
- * Queue constructor.
- *
- * @param QueueManagement $queueManagement
- * @param EnvelopeFactory $envelopeFactory
- * @param LoggerInterface $logger
- * @param string $queueName
- * @param int $interval
- * @param int $maxNumberOfTrials
- */
- public function __construct(
- QueueManagement $queueManagement,
- EnvelopeFactory $envelopeFactory,
- LoggerInterface $logger,
- $queueName,
- $interval = 5,
- $maxNumberOfTrials = 3
- ) {
- $this->queueManagement = $queueManagement;
- $this->envelopeFactory = $envelopeFactory;
- $this->queueName = $queueName;
- $this->interval = $interval;
- $this->maxNumberOfTrials = $maxNumberOfTrials;
- $this->logger = $logger;
- }
- /**
- * {@inheritdoc}
- */
- public function dequeue()
- {
- $envelope = null;
- $messages = $this->queueManagement->readMessages($this->queueName, 1);
- if (isset($messages[0])) {
- $properties = $messages[0];
- $body = $properties[QueueManagement::MESSAGE_BODY];
- unset($properties[QueueManagement::MESSAGE_BODY]);
- $envelope = $this->envelopeFactory->create(['body' => $body, 'properties' => $properties]);
- }
- return $envelope;
- }
- /**
- * {@inheritdoc}
- */
- public function acknowledge(EnvelopeInterface $envelope)
- {
- $properties = $envelope->getProperties();
- $relationId = $properties[QueueManagement::MESSAGE_QUEUE_RELATION_ID];
- $this->queueManagement->changeStatus($relationId, QueueManagement::MESSAGE_STATUS_COMPLETE);
- }
- /**
- * {@inheritdoc}
- */
- public function subscribe($callback)
- {
- while (true) {
- while ($envelope = $this->dequeue()) {
- try {
- call_user_func($callback, $envelope);
- $this->acknowledge($envelope);
- } catch (\Exception $e) {
- $this->reject($envelope);
- }
- }
- sleep($this->interval);
- }
- }
- /**
- * {@inheritdoc}
- */
- public function reject(EnvelopeInterface $envelope, $requeue = true, $rejectionMessage = null)
- {
- $properties = $envelope->getProperties();
- $relationId = $properties[QueueManagement::MESSAGE_QUEUE_RELATION_ID];
- if ($properties[QueueManagement::MESSAGE_NUMBER_OF_TRIALS] < $this->maxNumberOfTrials && $requeue) {
- $this->queueManagement->pushToQueueForRetry($relationId);
- } else {
- $this->queueManagement->changeStatus([$relationId], QueueManagement::MESSAGE_STATUS_ERROR);
- if ($rejectionMessage !== null) {
- $this->logger->critical(__('Message has been rejected: %1', $rejectionMessage));
- }
- }
- }
- /**
- * {@inheritDoc}
- */
- public function push(EnvelopeInterface $envelope)
- {
- $properties = $envelope->getProperties();
- $this->queueManagement->addMessageToQueues(
- $properties[QueueManagement::MESSAGE_TOPIC],
- $envelope->getBody(),
- [$this->queueName]
- );
- }
- }
|