123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233 |
- <?php
- /**
- * Copyright © Magento, Inc. All rights reserved.
- * See COPYING.txt for license details.
- */
- declare(strict_types=1);
- namespace Magento\TestFramework\MessageQueue;
- use Magento\Framework\MessageQueue\PublisherInterface;
- use Magento\Framework\OsInfo;
- use Magento\TestFramework\Helper\Amqp;
- /**
- * Publisher Consumer Controller
- */
- class PublisherConsumerController
- {
- /**
- * @var string[]
- */
- private $consumers = [];
- /**
- * @var PublisherInterface
- */
- private $publisher;
- /**
- * @var string
- */
- private $logFilePath;
- /**
- * @var int|null
- */
- private $maxMessages = null;
- /**
- * @var OsInfo
- */
- private $osInfo;
- /**
- * @var array
- */
- private $appInitParams;
- /**
- * @var Amqp
- */
- private $amqpHelper;
- /**
- * PublisherConsumerController constructor.
- * @param PublisherInterface $publisher
- * @param OsInfo $osInfo
- * @param Amqp $amqpHelper
- * @param string $logFilePath
- * @param array $consumers
- * @param array $appInitParams
- * @param null|int $maxMessages
- */
- public function __construct(
- PublisherInterface $publisher,
- OsInfo $osInfo,
- Amqp $amqpHelper,
- $logFilePath,
- $consumers,
- $appInitParams,
- $maxMessages = null
- ) {
- $this->consumers = $consumers;
- $this->publisher = $publisher;
- $this->logFilePath = $logFilePath;
- $this->maxMessages = $maxMessages;
- $this->osInfo = $osInfo;
- $this->appInitParams = $appInitParams;
- $this->amqpHelper = $amqpHelper;
- }
- /**
- * Initialize Environment and Consumers
- *
- * @throws EnvironmentPreconditionException
- * @throws PreconditionFailedException
- */
- public function initialize()
- {
- $this->validateEnvironmentPreconditions();
- $connections = $this->amqpHelper->getConnections();
- foreach (array_keys($connections) as $connectionName) {
- $this->amqpHelper->deleteConnection($connectionName);
- }
- $this->amqpHelper->clearQueue("async.operations.all");
- foreach ($this->consumers as $consumer) {
- foreach ($this->getConsumerProcessIds($consumer) as $consumerProcessId) {
- exec("kill {$consumerProcessId}");
- }
- }
- foreach ($this->consumers as $consumer) {
- if (!$this->getConsumerProcessIds($consumer)) {
- exec("{$this->getConsumerStartCommand($consumer, true)} > /dev/null &");
- }
- sleep(5);
- }
- if (file_exists($this->logFilePath)) {
- // try to remove before failing the test
- unlink($this->logFilePath);
- if (file_exists($this->logFilePath)) {
- throw new PreconditionFailedException(
- "Precondition failed: test log ({$this->logFilePath}) cannot be deleted before test execution."
- );
- }
- }
- }
- /**
- * Validate environment preconditions
- *
- * @throws EnvironmentPreconditionException
- * @throws PreconditionFailedException
- */
- private function validateEnvironmentPreconditions()
- {
- if ($this->osInfo->isWindows()) {
- throw new EnvironmentPreconditionException(
- "This test relies on *nix shell and should be skipped in Windows environment."
- );
- }
- if (!$this->amqpHelper->isAvailable()) {
- throw new PreconditionFailedException(
- 'This test relies on RabbitMQ Management Plugin.'
- );
- }
- }
- /**
- * Stop Consumers
- */
- public function stopConsumers()
- {
- foreach ($this->consumers as $consumer) {
- foreach ($this->getConsumerProcessIds($consumer) as $consumerProcessId) {
- exec("kill {$consumerProcessId}");
- }
- }
- }
- /**
- * Get Consumers ProcessIds
- *
- * @return array
- */
- public function getConsumersProcessIds()
- {
- $consumers = [];
- foreach ($this->consumers as $consumer) {
- $consumers[$consumer] = $this->getConsumerProcessIds($consumer);
- }
- return $consumers;
- }
- /**
- * Get Consumer ProcessIds
- *
- * @param string $consumer
- * @return string[]
- */
- private function getConsumerProcessIds($consumer)
- {
- exec("ps ax | grep -v grep | grep '{$this->getConsumerStartCommand($consumer)}' | awk '{print $1}'", $output);
- return $output;
- }
- /**
- * Get CLI command for starting specified consumer.
- *
- * @param string $consumer
- * @param bool $withEnvVariables
- * @return string
- */
- private function getConsumerStartCommand($consumer, $withEnvVariables = false)
- {
- $binDirectory = realpath(INTEGRATION_TESTS_DIR . '/bin/');
- $magentoCli = $binDirectory . '/magento';
- $consumerStartCommand = "php {$magentoCli} queue:consumers:start -vvv " . $consumer;
- if ($this->maxMessages) {
- $consumerStartCommand .= " --max-messages={$this->maxMessages}";
- }
- if ($withEnvVariables) {
- $params = $this->appInitParams;
- $params['MAGE_DIRS']['base']['path'] = BP;
- $params = 'INTEGRATION_TEST_PARAMS="' . urldecode(http_build_query($params)) . '"';
- $consumerStartCommand = $params . ' ' . $consumerStartCommand;
- }
- return $consumerStartCommand;
- }
- /**
- * Wait for asynchronous result
- *
- * @param callable $condition
- * @param array $params
- * @throws PreconditionFailedException
- */
- public function waitForAsynchronousResult(callable $condition, $params)
- {
- $i = 0;
- do {
- sleep(1);
- $assertion = call_user_func_array($condition, $params);
- } while (!$assertion && ($i++ < 180));
- if (!$assertion) {
- throw new PreconditionFailedException("No asynchronous messages were processed.");
- }
- }
- /**
- * Get publisher
- *
- * @return PublisherInterface
- */
- public function getPublisher()
- {
- return $this->publisher;
- }
- }
|