PublisherConsumerController.php 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. declare(strict_types=1);
  7. namespace Magento\TestFramework\MessageQueue;
  8. use Magento\Framework\MessageQueue\PublisherInterface;
  9. use Magento\Framework\OsInfo;
  10. use Magento\TestFramework\Helper\Amqp;
  11. /**
  12. * Publisher Consumer Controller
  13. */
  14. class PublisherConsumerController
  15. {
  16. /**
  17. * @var string[]
  18. */
  19. private $consumers = [];
  20. /**
  21. * @var PublisherInterface
  22. */
  23. private $publisher;
  24. /**
  25. * @var string
  26. */
  27. private $logFilePath;
  28. /**
  29. * @var int|null
  30. */
  31. private $maxMessages = null;
  32. /**
  33. * @var OsInfo
  34. */
  35. private $osInfo;
  36. /**
  37. * @var array
  38. */
  39. private $appInitParams;
  40. /**
  41. * @var Amqp
  42. */
  43. private $amqpHelper;
  44. /**
  45. * PublisherConsumerController constructor.
  46. * @param PublisherInterface $publisher
  47. * @param OsInfo $osInfo
  48. * @param Amqp $amqpHelper
  49. * @param string $logFilePath
  50. * @param array $consumers
  51. * @param array $appInitParams
  52. * @param null|int $maxMessages
  53. */
  54. public function __construct(
  55. PublisherInterface $publisher,
  56. OsInfo $osInfo,
  57. Amqp $amqpHelper,
  58. $logFilePath,
  59. $consumers,
  60. $appInitParams,
  61. $maxMessages = null
  62. ) {
  63. $this->consumers = $consumers;
  64. $this->publisher = $publisher;
  65. $this->logFilePath = $logFilePath;
  66. $this->maxMessages = $maxMessages;
  67. $this->osInfo = $osInfo;
  68. $this->appInitParams = $appInitParams;
  69. $this->amqpHelper = $amqpHelper;
  70. }
  71. /**
  72. * Initialize Environment and Consumers
  73. *
  74. * @throws EnvironmentPreconditionException
  75. * @throws PreconditionFailedException
  76. */
  77. public function initialize()
  78. {
  79. $this->validateEnvironmentPreconditions();
  80. $connections = $this->amqpHelper->getConnections();
  81. foreach (array_keys($connections) as $connectionName) {
  82. $this->amqpHelper->deleteConnection($connectionName);
  83. }
  84. $this->amqpHelper->clearQueue("async.operations.all");
  85. foreach ($this->consumers as $consumer) {
  86. foreach ($this->getConsumerProcessIds($consumer) as $consumerProcessId) {
  87. exec("kill {$consumerProcessId}");
  88. }
  89. }
  90. foreach ($this->consumers as $consumer) {
  91. if (!$this->getConsumerProcessIds($consumer)) {
  92. exec("{$this->getConsumerStartCommand($consumer, true)} > /dev/null &");
  93. }
  94. sleep(5);
  95. }
  96. if (file_exists($this->logFilePath)) {
  97. // try to remove before failing the test
  98. unlink($this->logFilePath);
  99. if (file_exists($this->logFilePath)) {
  100. throw new PreconditionFailedException(
  101. "Precondition failed: test log ({$this->logFilePath}) cannot be deleted before test execution."
  102. );
  103. }
  104. }
  105. }
  106. /**
  107. * Validate environment preconditions
  108. *
  109. * @throws EnvironmentPreconditionException
  110. * @throws PreconditionFailedException
  111. */
  112. private function validateEnvironmentPreconditions()
  113. {
  114. if ($this->osInfo->isWindows()) {
  115. throw new EnvironmentPreconditionException(
  116. "This test relies on *nix shell and should be skipped in Windows environment."
  117. );
  118. }
  119. if (!$this->amqpHelper->isAvailable()) {
  120. throw new PreconditionFailedException(
  121. 'This test relies on RabbitMQ Management Plugin.'
  122. );
  123. }
  124. }
  125. /**
  126. * Stop Consumers
  127. */
  128. public function stopConsumers()
  129. {
  130. foreach ($this->consumers as $consumer) {
  131. foreach ($this->getConsumerProcessIds($consumer) as $consumerProcessId) {
  132. exec("kill {$consumerProcessId}");
  133. }
  134. }
  135. }
  136. /**
  137. * Get Consumers ProcessIds
  138. *
  139. * @return array
  140. */
  141. public function getConsumersProcessIds()
  142. {
  143. $consumers = [];
  144. foreach ($this->consumers as $consumer) {
  145. $consumers[$consumer] = $this->getConsumerProcessIds($consumer);
  146. }
  147. return $consumers;
  148. }
  149. /**
  150. * Get Consumer ProcessIds
  151. *
  152. * @param string $consumer
  153. * @return string[]
  154. */
  155. private function getConsumerProcessIds($consumer)
  156. {
  157. exec("ps ax | grep -v grep | grep '{$this->getConsumerStartCommand($consumer)}' | awk '{print $1}'", $output);
  158. return $output;
  159. }
  160. /**
  161. * Get CLI command for starting specified consumer.
  162. *
  163. * @param string $consumer
  164. * @param bool $withEnvVariables
  165. * @return string
  166. */
  167. private function getConsumerStartCommand($consumer, $withEnvVariables = false)
  168. {
  169. $binDirectory = realpath(INTEGRATION_TESTS_DIR . '/bin/');
  170. $magentoCli = $binDirectory . '/magento';
  171. $consumerStartCommand = "php {$magentoCli} queue:consumers:start -vvv " . $consumer;
  172. if ($this->maxMessages) {
  173. $consumerStartCommand .= " --max-messages={$this->maxMessages}";
  174. }
  175. if ($withEnvVariables) {
  176. $params = $this->appInitParams;
  177. $params['MAGE_DIRS']['base']['path'] = BP;
  178. $params = 'INTEGRATION_TEST_PARAMS="' . urldecode(http_build_query($params)) . '"';
  179. $consumerStartCommand = $params . ' ' . $consumerStartCommand;
  180. }
  181. return $consumerStartCommand;
  182. }
  183. /**
  184. * Wait for asynchronous result
  185. *
  186. * @param callable $condition
  187. * @param array $params
  188. * @throws PreconditionFailedException
  189. */
  190. public function waitForAsynchronousResult(callable $condition, $params)
  191. {
  192. $i = 0;
  193. do {
  194. sleep(1);
  195. $assertion = call_user_func_array($condition, $params);
  196. } while (!$assertion && ($i++ < 180));
  197. if (!$assertion) {
  198. throw new PreconditionFailedException("No asynchronous messages were processed.");
  199. }
  200. }
  201. /**
  202. * Get publisher
  203. *
  204. * @return PublisherInterface
  205. */
  206. public function getPublisher()
  207. {
  208. return $this->publisher;
  209. }
  210. }