OperationProcessor.php 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. declare(strict_types=1);
  7. namespace Magento\AsynchronousOperations\Model;
  8. use Magento\Framework\Serialize\Serializer\Json;
  9. use Magento\AsynchronousOperations\Api\Data\OperationInterface;
  10. use Magento\Framework\Bulk\OperationManagementInterface;
  11. use Magento\AsynchronousOperations\Model\ConfigInterface as AsyncConfig;
  12. use Magento\Framework\MessageQueue\MessageValidator;
  13. use Magento\Framework\MessageQueue\MessageEncoder;
  14. use Magento\Framework\Exception\NoSuchEntityException;
  15. use Magento\Framework\MessageQueue\ConsumerConfigurationInterface;
  16. use Psr\Log\LoggerInterface;
  17. use Magento\Framework\Exception\LocalizedException;
  18. use Magento\Framework\Exception\TemporaryStateExceptionInterface;
  19. use Magento\Framework\DB\Adapter\ConnectionException;
  20. use Magento\Framework\DB\Adapter\DeadlockException;
  21. use Magento\Framework\DB\Adapter\LockWaitException;
  22. use Magento\Framework\Webapi\ServiceOutputProcessor;
  23. use Magento\Framework\Communication\ConfigInterface as CommunicationConfig;
  24. /**
  25. * Class OperationProcessor
  26. *
  27. * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
  28. */
  29. class OperationProcessor
  30. {
  31. /**
  32. * @var Json
  33. */
  34. private $jsonHelper;
  35. /**
  36. * @var OperationManagementInterface
  37. */
  38. private $operationManagement;
  39. /**
  40. * @var MessageEncoder
  41. */
  42. private $messageEncoder;
  43. /**
  44. * @var MessageValidator
  45. */
  46. private $messageValidator;
  47. /**
  48. * @var ConsumerConfigurationInterface
  49. */
  50. private $configuration;
  51. /**
  52. * @var LoggerInterface
  53. */
  54. private $logger;
  55. /**
  56. * @var ServiceOutputProcessor
  57. */
  58. private $serviceOutputProcessor;
  59. /**
  60. * @var CommunicationConfig
  61. */
  62. private $communicationConfig;
  63. /**
  64. * OperationProcessor constructor.
  65. *
  66. * @param MessageValidator $messageValidator
  67. * @param MessageEncoder $messageEncoder
  68. * @param ConsumerConfigurationInterface $configuration
  69. * @param Json $jsonHelper
  70. * @param OperationManagementInterface $operationManagement
  71. * @param LoggerInterface $logger
  72. * @param \Magento\Framework\Webapi\ServiceOutputProcessor $serviceOutputProcessor
  73. * @param \Magento\Framework\Communication\ConfigInterface $communicationConfig
  74. */
  75. public function __construct(
  76. MessageValidator $messageValidator,
  77. MessageEncoder $messageEncoder,
  78. ConsumerConfigurationInterface $configuration,
  79. Json $jsonHelper,
  80. OperationManagementInterface $operationManagement,
  81. ServiceOutputProcessor $serviceOutputProcessor,
  82. CommunicationConfig $communicationConfig,
  83. LoggerInterface $logger
  84. ) {
  85. $this->messageValidator = $messageValidator;
  86. $this->messageEncoder = $messageEncoder;
  87. $this->configuration = $configuration;
  88. $this->jsonHelper = $jsonHelper;
  89. $this->operationManagement = $operationManagement;
  90. $this->logger = $logger;
  91. $this->serviceOutputProcessor = $serviceOutputProcessor;
  92. $this->communicationConfig = $communicationConfig;
  93. }
  94. /**
  95. * Process topic-based encoded message
  96. *
  97. * @param string $encodedMessage
  98. * @return void
  99. */
  100. public function process(string $encodedMessage)
  101. {
  102. $operation = $this->messageEncoder->decode(AsyncConfig::SYSTEM_TOPIC_NAME, $encodedMessage);
  103. $this->messageValidator->validate(AsyncConfig::SYSTEM_TOPIC_NAME, $operation);
  104. $status = OperationInterface::STATUS_TYPE_COMPLETE;
  105. $errorCode = null;
  106. $messages = [];
  107. $topicName = $operation->getTopicName();
  108. $handlers = $this->configuration->getHandlers($topicName);
  109. try {
  110. $data = $this->jsonHelper->unserialize($operation->getSerializedData());
  111. $entityParams = $this->messageEncoder->decode($topicName, $data['meta_information']);
  112. $this->messageValidator->validate($topicName, $entityParams);
  113. } catch (\Exception $e) {
  114. $this->logger->error($e->getMessage());
  115. $status = OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED;
  116. $errorCode = $e->getCode();
  117. $messages[] = $e->getMessage();
  118. }
  119. $outputData = null;
  120. if ($errorCode === null) {
  121. foreach ($handlers as $callback) {
  122. $result = $this->executeHandler($callback, $entityParams);
  123. $status = $result['status'];
  124. $errorCode = $result['error_code'];
  125. $messages = array_merge($messages, $result['messages']);
  126. $outputData = $result['output_data'];
  127. }
  128. }
  129. if (isset($outputData)) {
  130. try {
  131. $communicationConfig = $this->communicationConfig->getTopic($topicName);
  132. $asyncHandler =
  133. $communicationConfig[CommunicationConfig::TOPIC_HANDLERS][AsyncConfig::DEFAULT_HANDLER_NAME];
  134. $serviceClass = $asyncHandler[CommunicationConfig::HANDLER_TYPE];
  135. $serviceMethod = $asyncHandler[CommunicationConfig::HANDLER_METHOD];
  136. $outputData = $this->serviceOutputProcessor->process(
  137. $outputData,
  138. $serviceClass,
  139. $serviceMethod
  140. );
  141. $outputData = $this->jsonHelper->serialize($outputData);
  142. } catch (\Exception $e) {
  143. $messages[] = $e->getMessage();
  144. }
  145. }
  146. $serializedData = (isset($errorCode)) ? $operation->getSerializedData() : null;
  147. $this->operationManagement->changeOperationStatus(
  148. $operation->getId(),
  149. $status,
  150. $errorCode,
  151. implode('; ', $messages),
  152. $serializedData,
  153. $outputData
  154. );
  155. }
  156. /**
  157. * Execute topic handler
  158. *
  159. * @param $callback
  160. * @param $entityParams
  161. * @return array
  162. */
  163. private function executeHandler($callback, $entityParams)
  164. {
  165. $result = [
  166. 'status' => OperationInterface::STATUS_TYPE_COMPLETE,
  167. 'error_code' => null,
  168. 'messages' => [],
  169. 'output_data' => null
  170. ];
  171. try {
  172. $result['output_data'] = call_user_func_array($callback, $entityParams);
  173. $result['messages'][] = sprintf('Service execution success %s::%s', get_class($callback[0]), $callback[1]);
  174. } catch (\Zend_Db_Adapter_Exception $e) {
  175. $this->logger->critical($e->getMessage());
  176. if ($e instanceof LockWaitException
  177. || $e instanceof DeadlockException
  178. || $e instanceof ConnectionException
  179. ) {
  180. $result['status'] = OperationInterface::STATUS_TYPE_RETRIABLY_FAILED;
  181. $result['error_code'] = $e->getCode();
  182. $result['messages'][] = __($e->getMessage());
  183. } else {
  184. $result['status'] = OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED;
  185. $result['error_code'] = $e->getCode();
  186. $result['messages'][] =
  187. __('Sorry, something went wrong during product prices update. Please see log for details.');
  188. }
  189. } catch (NoSuchEntityException $e) {
  190. $this->logger->error($e->getMessage());
  191. $result['status'] = ($e instanceof TemporaryStateExceptionInterface) ?
  192. OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED :
  193. OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED;
  194. $result['error_code'] = $e->getCode();
  195. $result['messages'][] = $e->getMessage();
  196. } catch (LocalizedException $e) {
  197. $this->logger->error($e->getMessage());
  198. $result['status'] = OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED;
  199. $result['error_code'] = $e->getCode();
  200. $result['messages'][] = $e->getMessage();
  201. } catch (\Exception $e) {
  202. $this->logger->error($e->getMessage());
  203. $result['status'] = OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED;
  204. $result['error_code'] = $e->getCode();
  205. $result['messages'][] = $e->getMessage();
  206. }
  207. return $result;
  208. }
  209. }