123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- <?php
- /**
- * Copyright © Magento, Inc. All rights reserved.
- * See COPYING.txt for license details.
- */
- namespace Magento\MysqlMq\Model\ResourceModel;
- use Magento\MysqlMq\Model\QueueManagement;
- /**
- * Resource model for queue.
- */
- class Queue extends \Magento\Framework\Model\ResourceModel\Db\AbstractDb
- {
- /**
- * Model initialization
- *
- * @return void
- */
- protected function _construct()
- {
- $this->_init('queue', 'id');
- }
- /**
- * Save message to 'queue_message' table.
- *
- * @param string $messageTopic
- * @param string $messageBody
- * @return int ID of the inserted record
- */
- public function saveMessage($messageTopic, $messageBody)
- {
- $this->getConnection()->insert(
- $this->getMessageTable(),
- ['topic_name' => $messageTopic, 'body' => $messageBody]
- );
- return $this->getConnection()->lastInsertId($this->getMessageTable());
- }
- /**
- * Save messages in bulk to 'queue_message' table.
- *
- * @param string $messageTopic
- * @param array $messages
- * @return array List of IDs of inserted records
- */
- public function saveMessages($messageTopic, array $messages)
- {
- $data = [];
- foreach ($messages as $message) {
- $data[] = ['topic_name' => $messageTopic, 'body' => $message];
- }
- $rowCount = $this->getConnection()->insertMultiple($this->getMessageTable(), $data);
- $firstId = $this->getConnection()->lastInsertId($this->getMessageTable());
- $select = $this->getConnection()->select()
- ->from(['qm' => $this->getMessageTable()], ['id'])
- ->where('qm.id >= ?', $firstId)
- ->limit($rowCount);
- return $this->getConnection()->fetchCol($select);
- }
- /**
- * Add associations between the specified message and queues.
- *
- * @param int $messageId
- * @param string[] $queueNames
- * @return $this
- */
- public function linkQueues($messageId, $queueNames)
- {
- return $this->linkMessagesWithQueues([$messageId], $queueNames);
- }
- /**
- * Add associations between the specified messages and queues.
- *
- * @param array $messageIds
- * @param string[] $queueNames
- * @return $this
- */
- public function linkMessagesWithQueues(array $messageIds, array $queueNames)
- {
- $connection = $this->getConnection();
- $queueIds = $this->getQueueIdsByNames($queueNames);
- $data = [];
- foreach ($messageIds as $messageId) {
- foreach ($queueIds as $queueId) {
- $data[] = [
- $queueId,
- $messageId,
- QueueManagement::MESSAGE_STATUS_NEW
- ];
- }
- }
- if (!empty($data)) {
- $connection->insertArray(
- $this->getMessageStatusTable(),
- ['queue_id', 'message_id', 'status'],
- $data
- );
- }
- return $this;
- }
- /**
- * Retrieve array of queue IDs corresponding to the specified array of queue names.
- *
- * @param string[] $queueNames
- * @return int[]
- */
- protected function getQueueIdsByNames($queueNames)
- {
- $selectObject = $this->getConnection()->select();
- $selectObject->from(['queue' => $this->getQueueTable()])
- ->columns(['id'])
- ->where('queue.name IN (?)', $queueNames);
- return $this->getConnection()->fetchCol($selectObject);
- }
- /**
- * Retrieve messages from the specified queue.
- *
- * @param string $queueName
- * @param int|null $limit
- * @return array
- */
- public function getMessages($queueName, $limit = null)
- {
- $connection = $this->getConnection();
- $select = $connection->select()
- ->from(
- ['queue_message' => $this->getMessageTable()],
- [QueueManagement::MESSAGE_TOPIC => 'topic_name', QueueManagement::MESSAGE_BODY => 'body']
- )->join(
- ['queue_message_status' => $this->getMessageStatusTable()],
- 'queue_message.id = queue_message_status.message_id',
- [
- QueueManagement::MESSAGE_QUEUE_RELATION_ID => 'id',
- QueueManagement::MESSAGE_QUEUE_ID => 'queue_id',
- QueueManagement::MESSAGE_ID => 'message_id',
- QueueManagement::MESSAGE_STATUS => 'status',
- QueueManagement::MESSAGE_UPDATED_AT => 'updated_at',
- QueueManagement::MESSAGE_NUMBER_OF_TRIALS => 'number_of_trials'
- ]
- )->join(
- ['queue' => $this->getQueueTable()],
- 'queue.id = queue_message_status.queue_id',
- [QueueManagement::MESSAGE_QUEUE_NAME => 'name']
- )->where(
- 'queue_message_status.status IN (?)',
- [QueueManagement::MESSAGE_STATUS_NEW, QueueManagement::MESSAGE_STATUS_RETRY_REQUIRED]
- )->where('queue.name = ?', $queueName)
- ->order('queue_message_status.updated_at ASC');
- if ($limit) {
- $select->limit($limit);
- }
- return $connection->fetchAll($select);
- }
- /**
- * Delete messages if there is no queue whrere the message is not in status TO BE DELETED
- *
- * @return void
- */
- public function deleteMarkedMessages()
- {
- $connection = $this->getConnection();
- $select = $connection->select()
- ->from(['queue_message_status' => $this->getMessageStatusTable()], ['message_id'])
- ->where('status <> ?', QueueManagement::MESSAGE_STATUS_TO_BE_DELETED)
- ->distinct();
- $messageIds = $connection->fetchCol($select);
- $condition = count($messageIds) > 0 ? ['id NOT IN (?)' => $messageIds] : null;
- $connection->delete($this->getMessageTable(), $condition);
- }
- /**
- * Mark specified messages with 'in progress' status.
- *
- * @param int[] $relationIds
- * @return int[] IDs of messages which should be taken in progress by current process.
- */
- public function takeMessagesInProgress($relationIds)
- {
- $takenMessagesRelationIds = [];
- foreach ($relationIds as $relationId) {
- $affectedRows = $this->getConnection()->update(
- $this->getMessageStatusTable(),
- ['status' => QueueManagement::MESSAGE_STATUS_IN_PROGRESS],
- ['id = ?' => $relationId]
- );
- if ($affectedRows) {
- /**
- * If status was set to 'in progress' by some other process (due to race conditions),
- * current process should not process the same message.
- * So message will be processed only if current process was able to change its status.
- */
- $takenMessagesRelationIds[] = $relationId;
- }
- }
- return $takenMessagesRelationIds;
- }
- /**
- * Set status of message to 'retry required' and increment number of processing trials.
- *
- * @param int $relationId
- * @return void
- */
- public function pushBackForRetry($relationId)
- {
- $this->getConnection()->update(
- $this->getMessageStatusTable(),
- [
- 'status' => QueueManagement::MESSAGE_STATUS_RETRY_REQUIRED,
- 'number_of_trials' => new \Zend_Db_Expr('number_of_trials+1')
- ],
- ['id = ?' => $relationId]
- );
- }
- /**
- * Change message status.
- *
- * @param int[] $relationIds
- * @param int $status
- * @return void
- */
- public function changeStatus($relationIds, $status)
- {
- $this->getConnection()->update(
- $this->getMessageStatusTable(),
- ['status' => $status],
- ['id IN (?)' => $relationIds]
- );
- }
- /**
- * Get name of table storing message statuses and associations to queues.
- *
- * @return string
- */
- protected function getMessageStatusTable()
- {
- return $this->getTable('queue_message_status');
- }
- /**
- * Get name of table storing declared queues.
- *
- * @return string
- */
- protected function getQueueTable()
- {
- return $this->getTable('queue');
- }
- /**
- * Get name of table storing message body and topic.
- *
- * @return string
- */
- protected function getMessageTable()
- {
- return $this->getTable('queue_message');
- }
- }
|