queueManagement = $queueManagement; $this->envelopeFactory = $envelopeFactory; $this->queueName = $queueName; $this->interval = $interval; $this->maxNumberOfTrials = $maxNumberOfTrials; $this->logger = $logger; } /** * {@inheritdoc} */ public function dequeue() { $envelope = null; $messages = $this->queueManagement->readMessages($this->queueName, 1); if (isset($messages[0])) { $properties = $messages[0]; $body = $properties[QueueManagement::MESSAGE_BODY]; unset($properties[QueueManagement::MESSAGE_BODY]); $envelope = $this->envelopeFactory->create(['body' => $body, 'properties' => $properties]); } return $envelope; } /** * {@inheritdoc} */ public function acknowledge(EnvelopeInterface $envelope) { $properties = $envelope->getProperties(); $relationId = $properties[QueueManagement::MESSAGE_QUEUE_RELATION_ID]; $this->queueManagement->changeStatus($relationId, QueueManagement::MESSAGE_STATUS_COMPLETE); } /** * {@inheritdoc} */ public function subscribe($callback) { while (true) { while ($envelope = $this->dequeue()) { try { call_user_func($callback, $envelope); $this->acknowledge($envelope); } catch (\Exception $e) { $this->reject($envelope); } } sleep($this->interval); } } /** * {@inheritdoc} */ public function reject(EnvelopeInterface $envelope, $requeue = true, $rejectionMessage = null) { $properties = $envelope->getProperties(); $relationId = $properties[QueueManagement::MESSAGE_QUEUE_RELATION_ID]; if ($properties[QueueManagement::MESSAGE_NUMBER_OF_TRIALS] < $this->maxNumberOfTrials && $requeue) { $this->queueManagement->pushToQueueForRetry($relationId); } else { $this->queueManagement->changeStatus([$relationId], QueueManagement::MESSAGE_STATUS_ERROR); if ($rejectionMessage !== null) { $this->logger->critical(__('Message has been rejected: %1', $rejectionMessage)); } } } /** * {@inheritDoc} */ public function push(EnvelopeInterface $envelope) { $properties = $envelope->getProperties(); $this->queueManagement->addMessageToQueues( $properties[QueueManagement::MESSAGE_TOPIC], $envelope->getBody(), [$this->queueName] ); } }