123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- <?php
- /**
- * Copyright © Magento, Inc. All rights reserved.
- * See COPYING.txt for license details.
- */
- declare(strict_types=1);
- namespace Magento\AsynchronousOperations\Model;
- use Magento\Framework\Serialize\Serializer\Json;
- use Magento\AsynchronousOperations\Api\Data\OperationInterface;
- use Magento\Framework\Bulk\OperationManagementInterface;
- use Magento\AsynchronousOperations\Model\ConfigInterface as AsyncConfig;
- use Magento\Framework\MessageQueue\MessageValidator;
- use Magento\Framework\MessageQueue\MessageEncoder;
- use Magento\Framework\Exception\NoSuchEntityException;
- use Magento\Framework\MessageQueue\ConsumerConfigurationInterface;
- use Psr\Log\LoggerInterface;
- use Magento\Framework\Exception\LocalizedException;
- use Magento\Framework\Exception\TemporaryStateExceptionInterface;
- use Magento\Framework\DB\Adapter\ConnectionException;
- use Magento\Framework\DB\Adapter\DeadlockException;
- use Magento\Framework\DB\Adapter\LockWaitException;
- use Magento\Framework\Webapi\ServiceOutputProcessor;
- use Magento\Framework\Communication\ConfigInterface as CommunicationConfig;
- /**
- * Class OperationProcessor
- *
- * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
- */
- class OperationProcessor
- {
- /**
- * @var Json
- */
- private $jsonHelper;
- /**
- * @var OperationManagementInterface
- */
- private $operationManagement;
- /**
- * @var MessageEncoder
- */
- private $messageEncoder;
- /**
- * @var MessageValidator
- */
- private $messageValidator;
- /**
- * @var ConsumerConfigurationInterface
- */
- private $configuration;
- /**
- * @var LoggerInterface
- */
- private $logger;
- /**
- * @var ServiceOutputProcessor
- */
- private $serviceOutputProcessor;
- /**
- * @var CommunicationConfig
- */
- private $communicationConfig;
- /**
- * OperationProcessor constructor.
- *
- * @param MessageValidator $messageValidator
- * @param MessageEncoder $messageEncoder
- * @param ConsumerConfigurationInterface $configuration
- * @param Json $jsonHelper
- * @param OperationManagementInterface $operationManagement
- * @param LoggerInterface $logger
- * @param \Magento\Framework\Webapi\ServiceOutputProcessor $serviceOutputProcessor
- * @param \Magento\Framework\Communication\ConfigInterface $communicationConfig
- */
- public function __construct(
- MessageValidator $messageValidator,
- MessageEncoder $messageEncoder,
- ConsumerConfigurationInterface $configuration,
- Json $jsonHelper,
- OperationManagementInterface $operationManagement,
- ServiceOutputProcessor $serviceOutputProcessor,
- CommunicationConfig $communicationConfig,
- LoggerInterface $logger
- ) {
- $this->messageValidator = $messageValidator;
- $this->messageEncoder = $messageEncoder;
- $this->configuration = $configuration;
- $this->jsonHelper = $jsonHelper;
- $this->operationManagement = $operationManagement;
- $this->logger = $logger;
- $this->serviceOutputProcessor = $serviceOutputProcessor;
- $this->communicationConfig = $communicationConfig;
- }
- /**
- * Process topic-based encoded message
- *
- * @param string $encodedMessage
- * @return void
- */
- public function process(string $encodedMessage)
- {
- $operation = $this->messageEncoder->decode(AsyncConfig::SYSTEM_TOPIC_NAME, $encodedMessage);
- $this->messageValidator->validate(AsyncConfig::SYSTEM_TOPIC_NAME, $operation);
- $status = OperationInterface::STATUS_TYPE_COMPLETE;
- $errorCode = null;
- $messages = [];
- $topicName = $operation->getTopicName();
- $handlers = $this->configuration->getHandlers($topicName);
- try {
- $data = $this->jsonHelper->unserialize($operation->getSerializedData());
- $entityParams = $this->messageEncoder->decode($topicName, $data['meta_information']);
- $this->messageValidator->validate($topicName, $entityParams);
- } catch (\Exception $e) {
- $this->logger->error($e->getMessage());
- $status = OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED;
- $errorCode = $e->getCode();
- $messages[] = $e->getMessage();
- }
- $outputData = null;
- if ($errorCode === null) {
- foreach ($handlers as $callback) {
- $result = $this->executeHandler($callback, $entityParams);
- $status = $result['status'];
- $errorCode = $result['error_code'];
- $messages = array_merge($messages, $result['messages']);
- $outputData = $result['output_data'];
- }
- }
- if (isset($outputData)) {
- try {
- $communicationConfig = $this->communicationConfig->getTopic($topicName);
- $asyncHandler =
- $communicationConfig[CommunicationConfig::TOPIC_HANDLERS][AsyncConfig::DEFAULT_HANDLER_NAME];
- $serviceClass = $asyncHandler[CommunicationConfig::HANDLER_TYPE];
- $serviceMethod = $asyncHandler[CommunicationConfig::HANDLER_METHOD];
- $outputData = $this->serviceOutputProcessor->process(
- $outputData,
- $serviceClass,
- $serviceMethod
- );
- $outputData = $this->jsonHelper->serialize($outputData);
- } catch (\Exception $e) {
- $messages[] = $e->getMessage();
- }
- }
- $serializedData = (isset($errorCode)) ? $operation->getSerializedData() : null;
- $this->operationManagement->changeOperationStatus(
- $operation->getId(),
- $status,
- $errorCode,
- implode('; ', $messages),
- $serializedData,
- $outputData
- );
- }
- /**
- * Execute topic handler
- *
- * @param $callback
- * @param $entityParams
- * @return array
- */
- private function executeHandler($callback, $entityParams)
- {
- $result = [
- 'status' => OperationInterface::STATUS_TYPE_COMPLETE,
- 'error_code' => null,
- 'messages' => [],
- 'output_data' => null
- ];
- try {
- $result['output_data'] = call_user_func_array($callback, $entityParams);
- $result['messages'][] = sprintf('Service execution success %s::%s', get_class($callback[0]), $callback[1]);
- } catch (\Zend_Db_Adapter_Exception $e) {
- $this->logger->critical($e->getMessage());
- if ($e instanceof LockWaitException
- || $e instanceof DeadlockException
- || $e instanceof ConnectionException
- ) {
- $result['status'] = OperationInterface::STATUS_TYPE_RETRIABLY_FAILED;
- $result['error_code'] = $e->getCode();
- $result['messages'][] = __($e->getMessage());
- } else {
- $result['status'] = OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED;
- $result['error_code'] = $e->getCode();
- $result['messages'][] =
- __('Sorry, something went wrong during product prices update. Please see log for details.');
- }
- } catch (NoSuchEntityException $e) {
- $this->logger->error($e->getMessage());
- $result['status'] = ($e instanceof TemporaryStateExceptionInterface) ?
- OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED :
- OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED;
- $result['error_code'] = $e->getCode();
- $result['messages'][] = $e->getMessage();
- } catch (LocalizedException $e) {
- $this->logger->error($e->getMessage());
- $result['status'] = OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED;
- $result['error_code'] = $e->getCode();
- $result['messages'][] = $e->getMessage();
- } catch (\Exception $e) {
- $this->logger->error($e->getMessage());
- $result['status'] = OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED;
- $result['error_code'] = $e->getCode();
- $result['messages'][] = $e->getMessage();
- }
- return $result;
- }
- }
|