Queue.php 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\MysqlMq\Model\ResourceModel;
  7. use Magento\MysqlMq\Model\QueueManagement;
  8. /**
  9. * Resource model for queue.
  10. */
  11. class Queue extends \Magento\Framework\Model\ResourceModel\Db\AbstractDb
  12. {
  13. /**
  14. * Model initialization
  15. *
  16. * @return void
  17. */
  18. protected function _construct()
  19. {
  20. $this->_init('queue', 'id');
  21. }
  22. /**
  23. * Save message to 'queue_message' table.
  24. *
  25. * @param string $messageTopic
  26. * @param string $messageBody
  27. * @return int ID of the inserted record
  28. */
  29. public function saveMessage($messageTopic, $messageBody)
  30. {
  31. $this->getConnection()->insert(
  32. $this->getMessageTable(),
  33. ['topic_name' => $messageTopic, 'body' => $messageBody]
  34. );
  35. return $this->getConnection()->lastInsertId($this->getMessageTable());
  36. }
  37. /**
  38. * Save messages in bulk to 'queue_message' table.
  39. *
  40. * @param string $messageTopic
  41. * @param array $messages
  42. * @return array List of IDs of inserted records
  43. */
  44. public function saveMessages($messageTopic, array $messages)
  45. {
  46. $data = [];
  47. foreach ($messages as $message) {
  48. $data[] = ['topic_name' => $messageTopic, 'body' => $message];
  49. }
  50. $rowCount = $this->getConnection()->insertMultiple($this->getMessageTable(), $data);
  51. $firstId = $this->getConnection()->lastInsertId($this->getMessageTable());
  52. $select = $this->getConnection()->select()
  53. ->from(['qm' => $this->getMessageTable()], ['id'])
  54. ->where('qm.id >= ?', $firstId)
  55. ->limit($rowCount);
  56. return $this->getConnection()->fetchCol($select);
  57. }
  58. /**
  59. * Add associations between the specified message and queues.
  60. *
  61. * @param int $messageId
  62. * @param string[] $queueNames
  63. * @return $this
  64. */
  65. public function linkQueues($messageId, $queueNames)
  66. {
  67. return $this->linkMessagesWithQueues([$messageId], $queueNames);
  68. }
  69. /**
  70. * Add associations between the specified messages and queues.
  71. *
  72. * @param array $messageIds
  73. * @param string[] $queueNames
  74. * @return $this
  75. */
  76. public function linkMessagesWithQueues(array $messageIds, array $queueNames)
  77. {
  78. $connection = $this->getConnection();
  79. $queueIds = $this->getQueueIdsByNames($queueNames);
  80. $data = [];
  81. foreach ($messageIds as $messageId) {
  82. foreach ($queueIds as $queueId) {
  83. $data[] = [
  84. $queueId,
  85. $messageId,
  86. QueueManagement::MESSAGE_STATUS_NEW
  87. ];
  88. }
  89. }
  90. if (!empty($data)) {
  91. $connection->insertArray(
  92. $this->getMessageStatusTable(),
  93. ['queue_id', 'message_id', 'status'],
  94. $data
  95. );
  96. }
  97. return $this;
  98. }
  99. /**
  100. * Retrieve array of queue IDs corresponding to the specified array of queue names.
  101. *
  102. * @param string[] $queueNames
  103. * @return int[]
  104. */
  105. protected function getQueueIdsByNames($queueNames)
  106. {
  107. $selectObject = $this->getConnection()->select();
  108. $selectObject->from(['queue' => $this->getQueueTable()])
  109. ->columns(['id'])
  110. ->where('queue.name IN (?)', $queueNames);
  111. return $this->getConnection()->fetchCol($selectObject);
  112. }
  113. /**
  114. * Retrieve messages from the specified queue.
  115. *
  116. * @param string $queueName
  117. * @param int|null $limit
  118. * @return array
  119. */
  120. public function getMessages($queueName, $limit = null)
  121. {
  122. $connection = $this->getConnection();
  123. $select = $connection->select()
  124. ->from(
  125. ['queue_message' => $this->getMessageTable()],
  126. [QueueManagement::MESSAGE_TOPIC => 'topic_name', QueueManagement::MESSAGE_BODY => 'body']
  127. )->join(
  128. ['queue_message_status' => $this->getMessageStatusTable()],
  129. 'queue_message.id = queue_message_status.message_id',
  130. [
  131. QueueManagement::MESSAGE_QUEUE_RELATION_ID => 'id',
  132. QueueManagement::MESSAGE_QUEUE_ID => 'queue_id',
  133. QueueManagement::MESSAGE_ID => 'message_id',
  134. QueueManagement::MESSAGE_STATUS => 'status',
  135. QueueManagement::MESSAGE_UPDATED_AT => 'updated_at',
  136. QueueManagement::MESSAGE_NUMBER_OF_TRIALS => 'number_of_trials'
  137. ]
  138. )->join(
  139. ['queue' => $this->getQueueTable()],
  140. 'queue.id = queue_message_status.queue_id',
  141. [QueueManagement::MESSAGE_QUEUE_NAME => 'name']
  142. )->where(
  143. 'queue_message_status.status IN (?)',
  144. [QueueManagement::MESSAGE_STATUS_NEW, QueueManagement::MESSAGE_STATUS_RETRY_REQUIRED]
  145. )->where('queue.name = ?', $queueName)
  146. ->order('queue_message_status.updated_at ASC');
  147. if ($limit) {
  148. $select->limit($limit);
  149. }
  150. return $connection->fetchAll($select);
  151. }
  152. /**
  153. * Delete messages if there is no queue whrere the message is not in status TO BE DELETED
  154. *
  155. * @return void
  156. */
  157. public function deleteMarkedMessages()
  158. {
  159. $connection = $this->getConnection();
  160. $select = $connection->select()
  161. ->from(['queue_message_status' => $this->getMessageStatusTable()], ['message_id'])
  162. ->where('status <> ?', QueueManagement::MESSAGE_STATUS_TO_BE_DELETED)
  163. ->distinct();
  164. $messageIds = $connection->fetchCol($select);
  165. $condition = count($messageIds) > 0 ? ['id NOT IN (?)' => $messageIds] : null;
  166. $connection->delete($this->getMessageTable(), $condition);
  167. }
  168. /**
  169. * Mark specified messages with 'in progress' status.
  170. *
  171. * @param int[] $relationIds
  172. * @return int[] IDs of messages which should be taken in progress by current process.
  173. */
  174. public function takeMessagesInProgress($relationIds)
  175. {
  176. $takenMessagesRelationIds = [];
  177. foreach ($relationIds as $relationId) {
  178. $affectedRows = $this->getConnection()->update(
  179. $this->getMessageStatusTable(),
  180. ['status' => QueueManagement::MESSAGE_STATUS_IN_PROGRESS],
  181. ['id = ?' => $relationId]
  182. );
  183. if ($affectedRows) {
  184. /**
  185. * If status was set to 'in progress' by some other process (due to race conditions),
  186. * current process should not process the same message.
  187. * So message will be processed only if current process was able to change its status.
  188. */
  189. $takenMessagesRelationIds[] = $relationId;
  190. }
  191. }
  192. return $takenMessagesRelationIds;
  193. }
  194. /**
  195. * Set status of message to 'retry required' and increment number of processing trials.
  196. *
  197. * @param int $relationId
  198. * @return void
  199. */
  200. public function pushBackForRetry($relationId)
  201. {
  202. $this->getConnection()->update(
  203. $this->getMessageStatusTable(),
  204. [
  205. 'status' => QueueManagement::MESSAGE_STATUS_RETRY_REQUIRED,
  206. 'number_of_trials' => new \Zend_Db_Expr('number_of_trials+1')
  207. ],
  208. ['id = ?' => $relationId]
  209. );
  210. }
  211. /**
  212. * Change message status.
  213. *
  214. * @param int[] $relationIds
  215. * @param int $status
  216. * @return void
  217. */
  218. public function changeStatus($relationIds, $status)
  219. {
  220. $this->getConnection()->update(
  221. $this->getMessageStatusTable(),
  222. ['status' => $status],
  223. ['id IN (?)' => $relationIds]
  224. );
  225. }
  226. /**
  227. * Get name of table storing message statuses and associations to queues.
  228. *
  229. * @return string
  230. */
  231. protected function getMessageStatusTable()
  232. {
  233. return $this->getTable('queue_message_status');
  234. }
  235. /**
  236. * Get name of table storing declared queues.
  237. *
  238. * @return string
  239. */
  240. protected function getQueueTable()
  241. {
  242. return $this->getTable('queue');
  243. }
  244. /**
  245. * Get name of table storing message body and topic.
  246. *
  247. * @return string
  248. */
  249. protected function getMessageTable()
  250. {
  251. return $this->getTable('queue_message');
  252. }
  253. }