123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- <?php
- /**
- * Copyright © Magento, Inc. All rights reserved.
- * See COPYING.txt for license details.
- */
- namespace Magento\MysqlMq\Model;
- use Magento\Framework\MessageQueue\PublisherInterface;
- /**
- * Test for MySQL publisher class.
- *
- * @magentoDbIsolation disabled
- */
- class PublisherConsumerTest extends \PHPUnit\Framework\TestCase
- {
- const MAX_NUMBER_OF_TRIALS = 3;
- /**
- * @var \Magento\Framework\MessageQueue\PublisherInterface
- */
- protected $publisher;
- /**
- * @var \Magento\Framework\ObjectManagerInterface
- */
- protected $objectManager;
- protected function setUp()
- {
- $this->markTestIncomplete('Should be converted to queue config v2.');
- $this->objectManager = \Magento\TestFramework\Helper\Bootstrap::getObjectManager();
- $configPath = __DIR__ . '/../etc/queue.xml';
- $fileResolverMock = $this->createMock(\Magento\Framework\Config\FileResolverInterface::class);
- $fileResolverMock->expects($this->any())
- ->method('get')
- ->willReturn([$configPath => file_get_contents(($configPath))]);
- /** @var \Magento\Framework\MessageQueue\Config\Reader\Xml $xmlReader */
- $xmlReader = $this->objectManager->create(
- \Magento\Framework\MessageQueue\Config\Reader\Xml::class,
- ['fileResolver' => $fileResolverMock]
- );
- $newData = $xmlReader->read();
- /** @var \Magento\Framework\MessageQueue\Config\Data $configData */
- $configData = $this->objectManager->get(\Magento\Framework\MessageQueue\Config\Data::class);
- $configData->reset();
- $configData->merge($newData);
- $this->publisher = $this->objectManager->create(\Magento\Framework\MessageQueue\PublisherInterface::class);
- }
- protected function tearDown()
- {
- $this->markTestIncomplete('Should be converted to queue config v2.');
- $this->consumeMessages('demoConsumerQueueOne', PHP_INT_MAX);
- $this->consumeMessages('demoConsumerQueueTwo', PHP_INT_MAX);
- $this->consumeMessages('demoConsumerQueueThree', PHP_INT_MAX);
- $this->consumeMessages('demoConsumerQueueFour', PHP_INT_MAX);
- $this->consumeMessages('demoConsumerQueueFive', PHP_INT_MAX);
- $this->consumeMessages('demoConsumerQueueOneWithException', PHP_INT_MAX);
- $objectManagerConfiguration = [\Magento\Framework\MessageQueue\Config\Reader\Xml::class => [
- 'arguments' => [
- 'fileResolver' => ['instance' => \Magento\Framework\Config\FileResolverInterface::class],
- ],
- ],
- ];
- $this->objectManager->configure($objectManagerConfiguration);
- /** @var \Magento\Framework\MessageQueue\Config\Data $queueConfig */
- $queueConfig = $this->objectManager->get(\Magento\Framework\MessageQueue\Config\Data::class);
- $queueConfig->reset();
- }
- /**
- * @magentoDataFixture Magento/MysqlMq/_files/queues.php
- */
- public function testPublishConsumeFlow()
- {
- /** @var \Magento\MysqlMq\Model\DataObjectFactory $objectFactory */
- $objectFactory = $this->objectManager->create(\Magento\MysqlMq\Model\DataObjectFactory::class);
- /** @var \Magento\MysqlMq\Model\DataObject $object */
- $object = $objectFactory->create();
- for ($i = 0; $i < 10; $i++) {
- $object->setName('Object name ' . $i)->setEntityId($i);
- $this->publisher->publish('demo.object.created', $object);
- }
- for ($i = 0; $i < 5; $i++) {
- $object->setName('Object name ' . $i)->setEntityId($i);
- $this->publisher->publish('demo.object.updated', $object);
- }
- for ($i = 0; $i < 3; $i++) {
- $object->setName('Object name ' . $i)->setEntityId($i);
- $this->publisher->publish('demo.object.custom.created', $object);
- }
- $outputPattern = '/(Processed \d+\s)/';
- /** There are total of 10 messages in the first queue, total expected consumption is 7, 3 then 0 */
- $this->consumeMessages('demoConsumerQueueOne', 7, 7, $outputPattern);
- /** Consumer all messages which left in this queue */
- $this->consumeMessages('demoConsumerQueueOne', PHP_INT_MAX, 3, $outputPattern);
- $this->consumeMessages('demoConsumerQueueOne', 7, 0, $outputPattern);
- /** Verify that messages were added correctly to second queue for update and create topics */
- $this->consumeMessages('demoConsumerQueueTwo', 20, 15, $outputPattern);
- /** Verify that messages were NOT added to fourth queue */
- $this->consumeMessages('demoConsumerQueueFour', 11, 0, $outputPattern);
- /** Verify that messages were added correctly by '*' pattern in bind config to third queue */
- $this->consumeMessages('demoConsumerQueueThree', 20, 15, $outputPattern);
- /** Verify that messages were added correctly by '#' pattern in bind config to fifth queue */
- $this->consumeMessages('demoConsumerQueueFive', 20, 18, $outputPattern);
- }
- /**
- * @magentoDataFixture Magento/MysqlMq/_files/queues.php
- */
- public function testPublishAndConsumeWithFailedJobs()
- {
- /** @var \Magento\MysqlMq\Model\DataObjectFactory $objectFactory */
- $objectFactory = $this->objectManager->create(\Magento\MysqlMq\Model\DataObjectFactory::class);
- /** @var \Magento\MysqlMq\Model\DataObject $object */
- /** Try consume messages for MAX_NUMBER_OF_TRIALS and then consumer them without exception */
- $object = $objectFactory->create();
- for ($i = 0; $i < 5; $i++) {
- $object->setName('Object name ' . $i)->setEntityId($i);
- $this->publisher->publish('demo.object.created', $object);
- }
- $outputPattern = '/(Processed \d+\s)/';
- for ($i = 0; $i < self::MAX_NUMBER_OF_TRIALS; $i++) {
- $this->consumeMessages('demoConsumerQueueOneWithException', PHP_INT_MAX, 0, $outputPattern);
- }
- $this->consumeMessages('demoConsumerQueueOne', PHP_INT_MAX, 0, $outputPattern);
- /** Try consume messages for MAX_NUMBER_OF_TRIALS+1 and then consumer them without exception */
- for ($i = 0; $i < 5; $i++) {
- $object->setName('Object name ' . $i)->setEntityId($i);
- $this->publisher->publish('demo.object.created', $object);
- }
- /** Try consume messages for MAX_NUMBER_OF_TRIALS and then consumer them without exception */
- for ($i = 0; $i < self::MAX_NUMBER_OF_TRIALS + 1; $i++) {
- $this->consumeMessages('demoConsumerQueueOneWithException', PHP_INT_MAX, 0, $outputPattern);
- }
- /** Make sure that messages are not accessible anymore after number of trials is exceeded */
- $this->consumeMessages('demoConsumerQueueOne', PHP_INT_MAX, 0, $outputPattern);
- }
- /**
- * @magentoDataFixture Magento/MysqlMq/_files/queues.php
- */
- public function testPublishAndConsumeSchemaDefinedByMethod()
- {
- /** @var \Magento\MysqlMq\Model\DataObjectFactory $objectFactory */
- $objectFactory = $this->objectManager->create(\Magento\MysqlMq\Model\DataObjectFactory::class);
- /** @var \Magento\MysqlMq\Model\DataObject $object */
- $object = $objectFactory->create();
- $id = 33;
- $object->setName('Object name ' . $id)->setEntityId($id);
- $requiredStringParam = 'Required value';
- $optionalIntParam = 44;
- $this->publisher->publish('test.schema.defined.by.method', [$object, $requiredStringParam, $optionalIntParam]);
- $outputPattern = "/Processed '{$object->getEntityId()}'; "
- . "Required param '{$requiredStringParam}'; Optional param '{$optionalIntParam}'/";
- $this->consumeMessages('delayedOperationConsumer', PHP_INT_MAX, 1, $outputPattern);
- }
- /**
- * Make sure that consumers consume correct number of messages.
- *
- * @param string $consumerName
- * @param int|null $messagesToProcess
- * @param int|null $expectedNumberOfProcessedMessages
- * @param string|null $outputPattern
- */
- protected function consumeMessages(
- $consumerName,
- $messagesToProcess,
- $expectedNumberOfProcessedMessages = null,
- $outputPattern = null
- ) {
- /** @var \Magento\Framework\MessageQueue\ConsumerFactory $consumerFactory */
- $consumerFactory = $this->objectManager->create(\Magento\Framework\MessageQueue\ConsumerFactory::class);
- $consumer = $consumerFactory->get($consumerName);
- ob_start();
- $consumer->process($messagesToProcess);
- $consumersOutput = ob_get_contents();
- ob_end_clean();
- if ($outputPattern) {
- $this->assertEquals(
- $expectedNumberOfProcessedMessages,
- preg_match_all($outputPattern, $consumersOutput)
- );
- }
- }
- }
|