MessageController.php 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\Framework\MessageQueue;
  7. use Magento\Framework\Exception\NotFoundException;
  8. use Magento\Framework\Phrase;
  9. class MessageController
  10. {
  11. /**
  12. * @var \Magento\Framework\MessageQueue\LockInterfaceFactory
  13. */
  14. private $lockFactory;
  15. /**
  16. * @var \Magento\Framework\MessageQueue\Lock\ReaderInterface
  17. */
  18. private $reader;
  19. /**
  20. * @var \Magento\Framework\MessageQueue\Lock\WriterInterface
  21. */
  22. private $writer;
  23. /**
  24. * Initialize dependencies.
  25. *
  26. * @param \Magento\Framework\MessageQueue\LockInterfaceFactory $lockFactory
  27. * @param Lock\ReaderInterface $reader
  28. * @param Lock\WriterInterface $writer
  29. */
  30. public function __construct(
  31. \Magento\Framework\MessageQueue\LockInterfaceFactory $lockFactory,
  32. \Magento\Framework\MessageQueue\Lock\ReaderInterface $reader,
  33. \Magento\Framework\MessageQueue\Lock\WriterInterface $writer
  34. ) {
  35. $this->lockFactory = $lockFactory;
  36. $this->reader = $reader;
  37. $this->writer = $writer;
  38. }
  39. /**
  40. * Create lock corresponding to the provided message. Throw MessageLockException if lock is already created.
  41. *
  42. * @param EnvelopeInterface $envelope
  43. * @param string $consumerName
  44. * @return LockInterface
  45. * @throws MessageLockException
  46. * @throws NotFoundException
  47. */
  48. public function lock(EnvelopeInterface $envelope, $consumerName)
  49. {
  50. $lock = $this->lockFactory->create();
  51. $properties = $envelope->getProperties();
  52. if (empty($properties['message_id'])) {
  53. throw new NotFoundException(new Phrase("Property 'message_id' not found in properties."));
  54. }
  55. $code = $consumerName . '-' . $properties['message_id'];
  56. $code = md5($code);
  57. $this->reader->read($lock, $code);
  58. if ($lock->getId()) {
  59. throw new MessageLockException(new Phrase('The "%1" message code was already processed.', [$code]));
  60. }
  61. $this->writer->saveLock($lock);
  62. return $lock;
  63. }
  64. }