PublisherConsumerTest.php 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\MysqlMq\Model;
  7. use Magento\Framework\MessageQueue\PublisherInterface;
  8. /**
  9. * Test for MySQL publisher class.
  10. *
  11. * @magentoDbIsolation disabled
  12. */
  13. class PublisherConsumerTest extends \PHPUnit\Framework\TestCase
  14. {
  15. const MAX_NUMBER_OF_TRIALS = 3;
  16. /**
  17. * @var \Magento\Framework\MessageQueue\PublisherInterface
  18. */
  19. protected $publisher;
  20. /**
  21. * @var \Magento\Framework\ObjectManagerInterface
  22. */
  23. protected $objectManager;
  24. protected function setUp()
  25. {
  26. $this->markTestIncomplete('Should be converted to queue config v2.');
  27. $this->objectManager = \Magento\TestFramework\Helper\Bootstrap::getObjectManager();
  28. $configPath = __DIR__ . '/../etc/queue.xml';
  29. $fileResolverMock = $this->createMock(\Magento\Framework\Config\FileResolverInterface::class);
  30. $fileResolverMock->expects($this->any())
  31. ->method('get')
  32. ->willReturn([$configPath => file_get_contents(($configPath))]);
  33. /** @var \Magento\Framework\MessageQueue\Config\Reader\Xml $xmlReader */
  34. $xmlReader = $this->objectManager->create(
  35. \Magento\Framework\MessageQueue\Config\Reader\Xml::class,
  36. ['fileResolver' => $fileResolverMock]
  37. );
  38. $newData = $xmlReader->read();
  39. /** @var \Magento\Framework\MessageQueue\Config\Data $configData */
  40. $configData = $this->objectManager->get(\Magento\Framework\MessageQueue\Config\Data::class);
  41. $configData->reset();
  42. $configData->merge($newData);
  43. $this->publisher = $this->objectManager->create(\Magento\Framework\MessageQueue\PublisherInterface::class);
  44. }
  45. protected function tearDown()
  46. {
  47. $this->markTestIncomplete('Should be converted to queue config v2.');
  48. $this->consumeMessages('demoConsumerQueueOne', PHP_INT_MAX);
  49. $this->consumeMessages('demoConsumerQueueTwo', PHP_INT_MAX);
  50. $this->consumeMessages('demoConsumerQueueThree', PHP_INT_MAX);
  51. $this->consumeMessages('demoConsumerQueueFour', PHP_INT_MAX);
  52. $this->consumeMessages('demoConsumerQueueFive', PHP_INT_MAX);
  53. $this->consumeMessages('demoConsumerQueueOneWithException', PHP_INT_MAX);
  54. $objectManagerConfiguration = [\Magento\Framework\MessageQueue\Config\Reader\Xml::class => [
  55. 'arguments' => [
  56. 'fileResolver' => ['instance' => \Magento\Framework\Config\FileResolverInterface::class],
  57. ],
  58. ],
  59. ];
  60. $this->objectManager->configure($objectManagerConfiguration);
  61. /** @var \Magento\Framework\MessageQueue\Config\Data $queueConfig */
  62. $queueConfig = $this->objectManager->get(\Magento\Framework\MessageQueue\Config\Data::class);
  63. $queueConfig->reset();
  64. }
  65. /**
  66. * @magentoDataFixture Magento/MysqlMq/_files/queues.php
  67. */
  68. public function testPublishConsumeFlow()
  69. {
  70. /** @var \Magento\MysqlMq\Model\DataObjectFactory $objectFactory */
  71. $objectFactory = $this->objectManager->create(\Magento\MysqlMq\Model\DataObjectFactory::class);
  72. /** @var \Magento\MysqlMq\Model\DataObject $object */
  73. $object = $objectFactory->create();
  74. for ($i = 0; $i < 10; $i++) {
  75. $object->setName('Object name ' . $i)->setEntityId($i);
  76. $this->publisher->publish('demo.object.created', $object);
  77. }
  78. for ($i = 0; $i < 5; $i++) {
  79. $object->setName('Object name ' . $i)->setEntityId($i);
  80. $this->publisher->publish('demo.object.updated', $object);
  81. }
  82. for ($i = 0; $i < 3; $i++) {
  83. $object->setName('Object name ' . $i)->setEntityId($i);
  84. $this->publisher->publish('demo.object.custom.created', $object);
  85. }
  86. $outputPattern = '/(Processed \d+\s)/';
  87. /** There are total of 10 messages in the first queue, total expected consumption is 7, 3 then 0 */
  88. $this->consumeMessages('demoConsumerQueueOne', 7, 7, $outputPattern);
  89. /** Consumer all messages which left in this queue */
  90. $this->consumeMessages('demoConsumerQueueOne', PHP_INT_MAX, 3, $outputPattern);
  91. $this->consumeMessages('demoConsumerQueueOne', 7, 0, $outputPattern);
  92. /** Verify that messages were added correctly to second queue for update and create topics */
  93. $this->consumeMessages('demoConsumerQueueTwo', 20, 15, $outputPattern);
  94. /** Verify that messages were NOT added to fourth queue */
  95. $this->consumeMessages('demoConsumerQueueFour', 11, 0, $outputPattern);
  96. /** Verify that messages were added correctly by '*' pattern in bind config to third queue */
  97. $this->consumeMessages('demoConsumerQueueThree', 20, 15, $outputPattern);
  98. /** Verify that messages were added correctly by '#' pattern in bind config to fifth queue */
  99. $this->consumeMessages('demoConsumerQueueFive', 20, 18, $outputPattern);
  100. }
  101. /**
  102. * @magentoDataFixture Magento/MysqlMq/_files/queues.php
  103. */
  104. public function testPublishAndConsumeWithFailedJobs()
  105. {
  106. /** @var \Magento\MysqlMq\Model\DataObjectFactory $objectFactory */
  107. $objectFactory = $this->objectManager->create(\Magento\MysqlMq\Model\DataObjectFactory::class);
  108. /** @var \Magento\MysqlMq\Model\DataObject $object */
  109. /** Try consume messages for MAX_NUMBER_OF_TRIALS and then consumer them without exception */
  110. $object = $objectFactory->create();
  111. for ($i = 0; $i < 5; $i++) {
  112. $object->setName('Object name ' . $i)->setEntityId($i);
  113. $this->publisher->publish('demo.object.created', $object);
  114. }
  115. $outputPattern = '/(Processed \d+\s)/';
  116. for ($i = 0; $i < self::MAX_NUMBER_OF_TRIALS; $i++) {
  117. $this->consumeMessages('demoConsumerQueueOneWithException', PHP_INT_MAX, 0, $outputPattern);
  118. }
  119. $this->consumeMessages('demoConsumerQueueOne', PHP_INT_MAX, 0, $outputPattern);
  120. /** Try consume messages for MAX_NUMBER_OF_TRIALS+1 and then consumer them without exception */
  121. for ($i = 0; $i < 5; $i++) {
  122. $object->setName('Object name ' . $i)->setEntityId($i);
  123. $this->publisher->publish('demo.object.created', $object);
  124. }
  125. /** Try consume messages for MAX_NUMBER_OF_TRIALS and then consumer them without exception */
  126. for ($i = 0; $i < self::MAX_NUMBER_OF_TRIALS + 1; $i++) {
  127. $this->consumeMessages('demoConsumerQueueOneWithException', PHP_INT_MAX, 0, $outputPattern);
  128. }
  129. /** Make sure that messages are not accessible anymore after number of trials is exceeded */
  130. $this->consumeMessages('demoConsumerQueueOne', PHP_INT_MAX, 0, $outputPattern);
  131. }
  132. /**
  133. * @magentoDataFixture Magento/MysqlMq/_files/queues.php
  134. */
  135. public function testPublishAndConsumeSchemaDefinedByMethod()
  136. {
  137. /** @var \Magento\MysqlMq\Model\DataObjectFactory $objectFactory */
  138. $objectFactory = $this->objectManager->create(\Magento\MysqlMq\Model\DataObjectFactory::class);
  139. /** @var \Magento\MysqlMq\Model\DataObject $object */
  140. $object = $objectFactory->create();
  141. $id = 33;
  142. $object->setName('Object name ' . $id)->setEntityId($id);
  143. $requiredStringParam = 'Required value';
  144. $optionalIntParam = 44;
  145. $this->publisher->publish('test.schema.defined.by.method', [$object, $requiredStringParam, $optionalIntParam]);
  146. $outputPattern = "/Processed '{$object->getEntityId()}'; "
  147. . "Required param '{$requiredStringParam}'; Optional param '{$optionalIntParam}'/";
  148. $this->consumeMessages('delayedOperationConsumer', PHP_INT_MAX, 1, $outputPattern);
  149. }
  150. /**
  151. * Make sure that consumers consume correct number of messages.
  152. *
  153. * @param string $consumerName
  154. * @param int|null $messagesToProcess
  155. * @param int|null $expectedNumberOfProcessedMessages
  156. * @param string|null $outputPattern
  157. */
  158. protected function consumeMessages(
  159. $consumerName,
  160. $messagesToProcess,
  161. $expectedNumberOfProcessedMessages = null,
  162. $outputPattern = null
  163. ) {
  164. /** @var \Magento\Framework\MessageQueue\ConsumerFactory $consumerFactory */
  165. $consumerFactory = $this->objectManager->create(\Magento\Framework\MessageQueue\ConsumerFactory::class);
  166. $consumer = $consumerFactory->get($consumerName);
  167. ob_start();
  168. $consumer->process($messagesToProcess);
  169. $consumersOutput = ob_get_contents();
  170. ob_end_clean();
  171. if ($outputPattern) {
  172. $this->assertEquals(
  173. $expectedNumberOfProcessedMessages,
  174. preg_match_all($outputPattern, $consumersOutput)
  175. );
  176. }
  177. }
  178. }