QueueManagementTest.php 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\MysqlMq\Model;
  7. use Magento\MysqlMq\Model\QueueManagement;
  8. /**
  9. * Test for Queue Management class.
  10. */
  11. class QueueManagementTest extends \PHPUnit\Framework\TestCase
  12. {
  13. /**
  14. * @var QueueManagement
  15. */
  16. protected $queueManagement;
  17. /**
  18. * @var \Magento\Framework\ObjectManagerInterface
  19. */
  20. protected $objectManager;
  21. protected function setUp()
  22. {
  23. $this->objectManager = \Magento\TestFramework\Helper\Bootstrap::getObjectManager();
  24. $this->queueManagement = $this->objectManager->create(\Magento\MysqlMq\Model\QueueManagement::class);
  25. }
  26. /**
  27. * @magentoDataFixture Magento/MysqlMq/_files/queues.php
  28. */
  29. public function testAllFlows()
  30. {
  31. $this->queueManagement->addMessageToQueues('topic1', 'messageBody1', ['queue1', 'queue2']);
  32. $this->queueManagement->addMessageToQueues('topic2', 'messageBody2', ['queue2', 'queue3']);
  33. $this->queueManagement->addMessageToQueues('topic3', 'messageBody3', ['queue1', 'queue3']);
  34. $this->queueManagement->addMessageToQueues('topic4', 'messageBody4', ['queue1', 'queue2', 'queue3']);
  35. $maxMessagesNumber = 2;
  36. $messages = $this->queueManagement->readMessages('queue3', $maxMessagesNumber);
  37. $this->assertCount($maxMessagesNumber, $messages);
  38. $firstMessage = array_shift($messages);
  39. $this->assertEquals('topic2', $firstMessage[QueueManagement::MESSAGE_TOPIC]);
  40. $this->assertEquals('messageBody2', $firstMessage[QueueManagement::MESSAGE_BODY]);
  41. $this->assertEquals('queue3', $firstMessage[QueueManagement::MESSAGE_QUEUE_NAME]);
  42. $this->assertEquals(
  43. QueueManagement::MESSAGE_STATUS_IN_PROGRESS,
  44. $firstMessage[QueueManagement::MESSAGE_STATUS]
  45. );
  46. $this->assertTrue(is_numeric($firstMessage[QueueManagement::MESSAGE_QUEUE_ID]));
  47. $this->assertTrue(is_numeric($firstMessage[QueueManagement::MESSAGE_ID]));
  48. $this->assertTrue(is_numeric($firstMessage[QueueManagement::MESSAGE_QUEUE_RELATION_ID]));
  49. $this->assertEquals(0, $firstMessage[QueueManagement::MESSAGE_NUMBER_OF_TRIALS]);
  50. $this->assertCount(12, date_parse($firstMessage[QueueManagement::MESSAGE_UPDATED_AT]));
  51. $secondMessage = array_shift($messages);
  52. $this->assertEquals('topic3', $secondMessage[QueueManagement::MESSAGE_TOPIC]);
  53. $this->assertEquals('messageBody3', $secondMessage[QueueManagement::MESSAGE_BODY]);
  54. $this->assertEquals('queue3', $secondMessage[QueueManagement::MESSAGE_QUEUE_NAME]);
  55. $this->assertEquals(
  56. QueueManagement::MESSAGE_STATUS_IN_PROGRESS,
  57. $secondMessage[QueueManagement::MESSAGE_STATUS]
  58. );
  59. $this->assertTrue(is_numeric($secondMessage[QueueManagement::MESSAGE_QUEUE_ID]));
  60. $this->assertTrue(is_numeric($secondMessage[QueueManagement::MESSAGE_ID]));
  61. $this->assertTrue(is_numeric($secondMessage[QueueManagement::MESSAGE_QUEUE_RELATION_ID]));
  62. $this->assertEquals(0, $secondMessage[QueueManagement::MESSAGE_NUMBER_OF_TRIALS]);
  63. $this->assertCount(12, date_parse($secondMessage[QueueManagement::MESSAGE_UPDATED_AT]));
  64. /** Mark one message as complete or failed and make sure it is not displayed in the list of read messages */
  65. $this->queueManagement->changeStatus(
  66. [
  67. $secondMessage[QueueManagement::MESSAGE_QUEUE_RELATION_ID]
  68. ],
  69. QueueManagement::MESSAGE_STATUS_COMPLETE
  70. );
  71. $messages = $this->queueManagement->readMessages('queue3', $maxMessagesNumber);
  72. $this->assertCount(1, $messages);
  73. $this->queueManagement->changeStatus(
  74. [
  75. $firstMessage[QueueManagement::MESSAGE_QUEUE_RELATION_ID]
  76. ],
  77. QueueManagement::MESSAGE_STATUS_ERROR
  78. );
  79. $messages = $this->queueManagement->readMessages('queue3', $maxMessagesNumber);
  80. $this->assertCount(0, $messages);
  81. /** Ensure that message for retry is still accessible when reading messages from the queue */
  82. $messages = $this->queueManagement->readMessages('queue2', 1);
  83. $message = array_shift($messages);
  84. $messageRelationId = $message[QueueManagement::MESSAGE_QUEUE_RELATION_ID];
  85. for ($i = 0; $i < 2; $i++) {
  86. $this->assertEquals($i, $message[QueueManagement::MESSAGE_NUMBER_OF_TRIALS]);
  87. $this->queueManagement->pushToQueueForRetry($message[QueueManagement::MESSAGE_QUEUE_RELATION_ID]);
  88. $messages = $this->queueManagement->readMessages('queue2', 1);
  89. $message = array_shift($messages);
  90. $this->assertEquals($messageRelationId, $message[QueueManagement::MESSAGE_QUEUE_RELATION_ID]);
  91. }
  92. }
  93. }