BulkManagement.php 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\AsynchronousOperations\Model;
  7. use Magento\Framework\App\ObjectManager;
  8. use Magento\Framework\App\ResourceConnection;
  9. use Magento\AsynchronousOperations\Api\Data\BulkSummaryInterface;
  10. use Magento\AsynchronousOperations\Api\Data\BulkSummaryInterfaceFactory;
  11. use Magento\AsynchronousOperations\Api\Data\OperationInterface;
  12. use Magento\Framework\MessageQueue\BulkPublisherInterface;
  13. use Magento\Framework\EntityManager\EntityManager;
  14. use Magento\Framework\EntityManager\MetadataPool;
  15. use Magento\AsynchronousOperations\Model\ResourceModel\Operation\CollectionFactory;
  16. use Magento\Authorization\Model\UserContextInterface;
  17. /**
  18. * Class BulkManagement
  19. *
  20. * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
  21. */
  22. class BulkManagement implements \Magento\Framework\Bulk\BulkManagementInterface
  23. {
  24. /**
  25. * @var EntityManager
  26. */
  27. private $entityManager;
  28. /**
  29. * @var BulkSummaryInterfaceFactory
  30. */
  31. private $bulkSummaryFactory;
  32. /**
  33. * @var CollectionFactory
  34. */
  35. private $operationCollectionFactory;
  36. /**
  37. * @var BulkPublisherInterface
  38. */
  39. private $publisher;
  40. /**
  41. * @var MetadataPool
  42. */
  43. private $metadataPool;
  44. /**
  45. * @var ResourceConnection
  46. */
  47. private $resourceConnection;
  48. /**
  49. * @var \Magento\Authorization\Model\UserContextInterface
  50. */
  51. private $userContext;
  52. /**
  53. * @var \Psr\Log\LoggerInterface
  54. */
  55. private $logger;
  56. /**
  57. * BulkManagement constructor.
  58. * @param EntityManager $entityManager
  59. * @param BulkSummaryInterfaceFactory $bulkSummaryFactory
  60. * @param CollectionFactory $operationCollectionFactory
  61. * @param BulkPublisherInterface $publisher
  62. * @param MetadataPool $metadataPool
  63. * @param ResourceConnection $resourceConnection
  64. * @param \Psr\Log\LoggerInterface $logger
  65. * @param UserContextInterface $userContext
  66. */
  67. public function __construct(
  68. EntityManager $entityManager,
  69. BulkSummaryInterfaceFactory $bulkSummaryFactory,
  70. CollectionFactory $operationCollectionFactory,
  71. BulkPublisherInterface $publisher,
  72. MetadataPool $metadataPool,
  73. ResourceConnection $resourceConnection,
  74. \Psr\Log\LoggerInterface $logger,
  75. UserContextInterface $userContext = null
  76. ) {
  77. $this->entityManager = $entityManager;
  78. $this->bulkSummaryFactory= $bulkSummaryFactory;
  79. $this->operationCollectionFactory = $operationCollectionFactory;
  80. $this->metadataPool = $metadataPool;
  81. $this->resourceConnection = $resourceConnection;
  82. $this->publisher = $publisher;
  83. $this->logger = $logger;
  84. $this->userContext = $userContext ?: ObjectManager::getInstance()->get(UserContextInterface::class);
  85. }
  86. /**
  87. * @inheritDoc
  88. */
  89. public function scheduleBulk($bulkUuid, array $operations, $description, $userId = null)
  90. {
  91. $metadata = $this->metadataPool->getMetadata(BulkSummaryInterface::class);
  92. $connection = $this->resourceConnection->getConnectionByName($metadata->getEntityConnectionName());
  93. // save bulk summary and related operations
  94. $connection->beginTransaction();
  95. $userType = $this->userContext->getUserType();
  96. if ($userType === null) {
  97. $userType = UserContextInterface::USER_TYPE_ADMIN;
  98. }
  99. try {
  100. /** @var \Magento\AsynchronousOperations\Api\Data\BulkSummaryInterface $bulkSummary */
  101. $bulkSummary = $this->bulkSummaryFactory->create();
  102. $this->entityManager->load($bulkSummary, $bulkUuid);
  103. $bulkSummary->setBulkId($bulkUuid);
  104. $bulkSummary->setDescription($description);
  105. $bulkSummary->setUserId($userId);
  106. $bulkSummary->setUserType($userType);
  107. $bulkSummary->setOperationCount((int)$bulkSummary->getOperationCount() + count($operations));
  108. $this->entityManager->save($bulkSummary);
  109. $connection->commit();
  110. } catch (\Exception $exception) {
  111. $connection->rollBack();
  112. $this->logger->critical($exception->getMessage());
  113. return false;
  114. }
  115. $this->publishOperations($operations);
  116. return true;
  117. }
  118. /**
  119. * Retry bulk operations that failed due to given errors.
  120. *
  121. * @param string $bulkUuid target bulk UUID
  122. * @param array $errorCodes list of corresponding error codes
  123. * @return int number of affected bulk operations
  124. */
  125. public function retryBulk($bulkUuid, array $errorCodes)
  126. {
  127. $metadata = $this->metadataPool->getMetadata(BulkSummaryInterface::class);
  128. $connection = $this->resourceConnection->getConnectionByName($metadata->getEntityConnectionName());
  129. /** @var \Magento\AsynchronousOperations\Model\ResourceModel\Operation[] $retriablyFailedOperations */
  130. $retriablyFailedOperations = $this->operationCollectionFactory->create()
  131. ->addFieldToFilter('error_code', ['in' => $errorCodes])
  132. ->addFieldToFilter('bulk_uuid', ['eq' => $bulkUuid])
  133. ->getItems();
  134. // remove corresponding operations from database (i.e. move them to 'open' status)
  135. $connection->beginTransaction();
  136. try {
  137. $operationIds = [];
  138. $currentBatchSize = 0;
  139. $maxBatchSize = 10000;
  140. /** @var OperationInterface $operation */
  141. foreach ($retriablyFailedOperations as $operation) {
  142. if ($currentBatchSize === $maxBatchSize) {
  143. $connection->delete(
  144. $this->resourceConnection->getTableName('magento_operation'),
  145. $connection->quoteInto('id IN (?)', $operationIds)
  146. );
  147. $operationIds = [];
  148. $currentBatchSize = 0;
  149. }
  150. $currentBatchSize++;
  151. $operationIds[] = $operation->getId();
  152. // Rescheduled operations must be put in queue in 'open' state (i.e. without ID)
  153. $operation->setId(null);
  154. }
  155. // remove operations from the last batch
  156. if (!empty($operationIds)) {
  157. $connection->delete(
  158. $this->resourceConnection->getTableName('magento_operation'),
  159. $connection->quoteInto('id IN (?)', $operationIds)
  160. );
  161. }
  162. $connection->commit();
  163. } catch (\Exception $exception) {
  164. $connection->rollBack();
  165. $this->logger->critical($exception->getMessage());
  166. return 0;
  167. }
  168. $this->publishOperations($retriablyFailedOperations);
  169. return count($retriablyFailedOperations);
  170. }
  171. /**
  172. * Publish list of operations to the corresponding message queues.
  173. *
  174. * @param array $operations
  175. * @return void
  176. */
  177. private function publishOperations(array $operations)
  178. {
  179. $operationsByTopics = [];
  180. foreach ($operations as $operation) {
  181. $operationsByTopics[$operation->getTopicName()][] = $operation;
  182. }
  183. foreach ($operationsByTopics as $topicName => $operations) {
  184. $this->publisher->publish($topicName, $operations);
  185. }
  186. }
  187. /**
  188. * @inheritDoc
  189. */
  190. public function deleteBulk($bulkId)
  191. {
  192. return $this->entityManager->delete(
  193. $this->entityManager->load(
  194. $this->bulkSummaryFactory->create(),
  195. $bulkId
  196. )
  197. );
  198. }
  199. }