MessageEncoder.php 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. <?php
  2. /**
  3. * Copyright © Magento, Inc. All rights reserved.
  4. * See COPYING.txt for license details.
  5. */
  6. namespace Magento\Framework\MessageQueue;
  7. use Magento\Framework\MessageQueue\ConfigInterface as QueueConfig;
  8. use Magento\Framework\Exception\LocalizedException;
  9. use Magento\Framework\Phrase;
  10. use Magento\Framework\Webapi\ServicePayloadConverterInterface;
  11. use Magento\Framework\Communication\ConfigInterface as CommunicationConfig;
  12. /**
  13. * Class which provides encoding and decoding capabilities for MessageQueue messages.
  14. *
  15. * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
  16. */
  17. class MessageEncoder
  18. {
  19. const DIRECTION_ENCODE = 'encode';
  20. const DIRECTION_DECODE = 'decode';
  21. /**
  22. * @var \Magento\Framework\Webapi\ServiceOutputProcessor
  23. */
  24. private $dataObjectEncoder;
  25. /**
  26. * @var \Magento\Framework\Webapi\ServiceInputProcessor
  27. */
  28. private $dataObjectDecoder;
  29. /**
  30. * @var \Magento\Framework\Json\EncoderInterface
  31. */
  32. private $jsonEncoder;
  33. /**
  34. * @var \Magento\Framework\Json\DecoderInterface
  35. */
  36. private $jsonDecoder;
  37. /**
  38. * @var CommunicationConfig
  39. */
  40. private $communicationConfig;
  41. /**
  42. * Initialize dependencies.
  43. *
  44. * @param QueueConfig $queueConfig
  45. * @param \Magento\Framework\Json\EncoderInterface $jsonEncoder
  46. * @param \Magento\Framework\Json\DecoderInterface $jsonDecoder
  47. * @param \Magento\Framework\Webapi\ServiceOutputProcessor $dataObjectEncoder
  48. * @param \Magento\Framework\Webapi\ServiceInputProcessor $dataObjectDecoder
  49. *
  50. * @SuppressWarnings(PHPMD.UnusedFormalParameter)
  51. */
  52. public function __construct(
  53. QueueConfig $queueConfig,
  54. \Magento\Framework\Json\EncoderInterface $jsonEncoder,
  55. \Magento\Framework\Json\DecoderInterface $jsonDecoder,
  56. \Magento\Framework\Webapi\ServiceOutputProcessor $dataObjectEncoder,
  57. \Magento\Framework\Webapi\ServiceInputProcessor $dataObjectDecoder
  58. ) {
  59. $this->dataObjectEncoder = $dataObjectEncoder;
  60. $this->dataObjectDecoder = $dataObjectDecoder;
  61. $this->jsonEncoder = $jsonEncoder;
  62. $this->jsonDecoder = $jsonDecoder;
  63. }
  64. /**
  65. * Encode message content based on current topic.
  66. *
  67. * @param string $topic
  68. * @param mixed $message
  69. * @param bool $requestType
  70. * @return string
  71. * @throws LocalizedException
  72. */
  73. public function encode($topic, $message, $requestType = true)
  74. {
  75. $convertedMessage = $this->convertMessage($topic, $message, self::DIRECTION_ENCODE, $requestType);
  76. return $this->jsonEncoder->encode($convertedMessage);
  77. }
  78. /**
  79. * Decode message content based on current topic.
  80. *
  81. * @param string $topic
  82. * @param string $message
  83. * @param bool $requestType
  84. * @return mixed
  85. * @throws LocalizedException
  86. */
  87. public function decode($topic, $message, $requestType = true)
  88. {
  89. try {
  90. $decodedMessage = $this->jsonDecoder->decode($message);
  91. } catch (\Exception $e) {
  92. throw new LocalizedException(new Phrase("Error occurred during message decoding."));
  93. }
  94. return $this->convertMessage($topic, $decodedMessage, self::DIRECTION_DECODE, $requestType);
  95. }
  96. /**
  97. * Identify message data schema by topic.
  98. *
  99. * @param string $topic
  100. * @param bool $requestType
  101. * @return array
  102. * @throws LocalizedException
  103. */
  104. protected function getTopicSchema($topic, $requestType)
  105. {
  106. $topicConfig = $this->getCommunicationConfig()->getTopic($topic);
  107. if ($topicConfig === null) {
  108. throw new LocalizedException(new Phrase('Specified topic "%topic" is not declared.', ['topic' => $topic]));
  109. }
  110. if ($requestType) {
  111. return [
  112. 'schema_type' => $topicConfig[CommunicationConfig::TOPIC_REQUEST_TYPE],
  113. 'schema_value' => $topicConfig[CommunicationConfig::TOPIC_REQUEST]
  114. ];
  115. } else {
  116. return [
  117. 'schema_type' => isset($topicConfig[CommunicationConfig::TOPIC_RESPONSE])
  118. ? CommunicationConfig::TOPIC_REQUEST_TYPE_CLASS
  119. : null,
  120. 'schema_value' => $topicConfig[CommunicationConfig::TOPIC_RESPONSE]
  121. ];
  122. }
  123. }
  124. /**
  125. * Convert message according to the format associated with its topic using provided converter.
  126. *
  127. * @param string $topic
  128. * @param mixed $message
  129. * @param string $direction
  130. * @param bool $requestType
  131. * @return mixed
  132. * @throws LocalizedException
  133. */
  134. protected function convertMessage($topic, $message, $direction, $requestType)
  135. {
  136. $topicSchema = $this->getTopicSchema($topic, $requestType);
  137. if ($topicSchema['schema_type'] == CommunicationConfig::TOPIC_REQUEST_TYPE_CLASS) {
  138. /** Convert message according to the data interface associated with the message topic */
  139. $messageDataType = $topicSchema[QueueConfig::TOPIC_SCHEMA_VALUE];
  140. try {
  141. $convertedMessage = $this->getConverter($direction)->convertValue($message, $messageDataType);
  142. } catch (LocalizedException $e) {
  143. throw new LocalizedException(
  144. new Phrase(
  145. 'Message with topic "%topic" must be an instance of "%class".',
  146. ['topic' => $topic, 'class' => $messageDataType]
  147. )
  148. );
  149. }
  150. } else {
  151. /** Convert message according to the method signature associated with the message topic */
  152. $message = (array)$message;
  153. $isIndexedArray = array_keys($message) === range(0, count($message) - 1);
  154. $convertedMessage = [];
  155. /** Message schema type is defined by method signature */
  156. foreach ($topicSchema['schema_value'] as $methodParameterMeta) {
  157. $paramName = $methodParameterMeta[CommunicationConfig::SCHEMA_METHOD_PARAM_NAME];
  158. $paramType = $methodParameterMeta[CommunicationConfig::SCHEMA_METHOD_PARAM_TYPE];
  159. if ($isIndexedArray) {
  160. /** Encode parameters according to their positions in method signature */
  161. $paramPosition = $methodParameterMeta[CommunicationConfig::SCHEMA_METHOD_PARAM_POSITION];
  162. if (isset($message[$paramPosition])) {
  163. $convertedMessage[$paramName] = $this->getConverter($direction)
  164. ->convertValue($message[$paramPosition], $paramType);
  165. }
  166. } else {
  167. /** Encode parameters according to their names in method signature */
  168. if (isset($message[$paramName])) {
  169. $convertedMessage[$paramName] = $this->getConverter($direction)
  170. ->convertValue($message[$paramName], $paramType);
  171. }
  172. }
  173. /** Ensure that all required params were passed */
  174. if ($methodParameterMeta[CommunicationConfig::SCHEMA_METHOD_PARAM_IS_REQUIRED]
  175. && !isset($convertedMessage[$paramName])
  176. ) {
  177. throw new LocalizedException(
  178. new Phrase(
  179. 'Data item corresponding to "%param" must be specified '
  180. . 'in the message with topic "%topic".',
  181. [
  182. 'topic' => $topic,
  183. 'param' => $paramName
  184. ]
  185. )
  186. );
  187. }
  188. }
  189. }
  190. return $convertedMessage;
  191. }
  192. /**
  193. * Get value converter based on conversion direction.
  194. *
  195. * @param string $direction
  196. * @return ServicePayloadConverterInterface
  197. */
  198. protected function getConverter($direction)
  199. {
  200. return ($direction == self::DIRECTION_ENCODE) ? $this->dataObjectEncoder : $this->dataObjectDecoder;
  201. }
  202. /**
  203. * Get communication config.
  204. *
  205. * @return CommunicationConfig
  206. *
  207. * @deprecated 102.0.1
  208. */
  209. private function getCommunicationConfig()
  210. {
  211. if ($this->communicationConfig === null) {
  212. $this->communicationConfig = \Magento\Framework\App\ObjectManager::getInstance()
  213. ->get(CommunicationConfig::class);
  214. }
  215. return $this->communicationConfig;
  216. }
  217. }