Client.php 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. <?php
  2. namespace Channel;
  3. use Workerman\Connection\AsyncTcpConnection;
  4. use Workerman\Lib\Timer;
  5. use Workerman\Protocols\Frame;
  6. /**
  7. * Channel/Client
  8. * @version 1.0.7
  9. */
  10. class Client
  11. {
  12. /**
  13. * onMessage.
  14. * @var callback
  15. */
  16. public static $onMessage = null;
  17. /**
  18. * onConnect
  19. * @var callback
  20. */
  21. public static $onConnect = null;
  22. /**
  23. * onClose
  24. * @var callback
  25. */
  26. public static $onClose = null;
  27. /**
  28. * Connction to channel server.
  29. * @var \Workerman\Connection\TcpConnection
  30. */
  31. protected static $_remoteConnection = null;
  32. /**
  33. * Channel server ip.
  34. * @var string
  35. */
  36. protected static $_remoteIp = null;
  37. /**
  38. * Channel server port.
  39. * @var int
  40. */
  41. protected static $_remotePort = null;
  42. /**
  43. * Reconnect timer.
  44. * @var Timer
  45. */
  46. protected static $_reconnectTimer = null;
  47. /**
  48. * Ping timer.
  49. * @var Timer
  50. */
  51. protected static $_pingTimer = null;
  52. /**
  53. * All event callback.
  54. * @var array
  55. */
  56. protected static $_events = array();
  57. /**
  58. * All queue callback.
  59. * @var callable
  60. */
  61. protected static $_queues = array();
  62. /**
  63. * @var bool
  64. */
  65. protected static $_isWorkermanEnv = true;
  66. /**
  67. * Ping interval.
  68. * @var int
  69. */
  70. public static $pingInterval = 25;
  71. /**
  72. * Connect to channel server
  73. * @param string $ip Channel server ip address or unix domain socket address
  74. * Ip like (TCP): 192.168.1.100
  75. * Unix domain socket like: unix:///tmp/workerman-channel.sock
  76. * @param int $port Port to connect when use tcp
  77. */
  78. public static function connect($ip = '127.0.0.1', $port = 2206)
  79. {
  80. if (self::$_remoteConnection) {
  81. return;
  82. }
  83. self::$_remoteIp = $ip;
  84. self::$_remotePort = $port;
  85. if (PHP_SAPI !== 'cli' || !class_exists('Workerman\Worker', false)) {
  86. self::$_isWorkermanEnv = false;
  87. }
  88. // For workerman environment.
  89. if (self::$_isWorkermanEnv) {
  90. if (strpos($ip, 'unix://') === false) {
  91. $conn = new AsyncTcpConnection('frame://' . self::$_remoteIp . ':' . self::$_remotePort);
  92. } else {
  93. $conn = new AsyncTcpConnection($ip);
  94. $conn->protocol = Frame::class;
  95. }
  96. $conn->onClose = [self::class, 'onRemoteClose'];
  97. $conn->onConnect = [self::class, 'onRemoteConnect'];
  98. $conn->onMessage = [self::class , 'onRemoteMessage'];
  99. $conn->connect();
  100. if (empty(self::$_pingTimer)) {
  101. self::$_pingTimer = Timer::add(self::$pingInterval, 'Channel\Client::ping');
  102. }
  103. // Not workerman environment.
  104. } else {
  105. $remote = strpos($ip, 'unix://') === false ? 'tcp://'.self::$_remoteIp.':'.self::$_remotePort : $ip;
  106. $conn = stream_socket_client($remote, $code, $message, 5);
  107. if (!$conn) {
  108. throw new \Exception($message);
  109. }
  110. }
  111. self::$_remoteConnection = $conn;
  112. }
  113. /**
  114. * onRemoteMessage.
  115. * @param \Workerman\Connection\TcpConnection $connection
  116. * @param string $data
  117. * @throws \Exception
  118. */
  119. public static function onRemoteMessage($connection, $data)
  120. {
  121. $data = unserialize($data);
  122. $type = $data['type'];
  123. $event = $data['channel'];
  124. $event_data = $data['data'];
  125. $callback = null;
  126. if ($type == 'event') {
  127. if (!empty(self::$_events[$event])) {
  128. call_user_func(self::$_events[$event], $event_data);
  129. } elseif (!empty(Client::$onMessage)) {
  130. call_user_func(Client::$onMessage, $event, $event_data);
  131. } else {
  132. throw new \Exception("event:$event have not callback");
  133. }
  134. } else {
  135. if (isset(self::$_queues[$event])) {
  136. call_user_func(self::$_queues[$event], $event_data);
  137. } else {
  138. throw new \Exception("queue:$event have not callback");
  139. }
  140. }
  141. }
  142. /**
  143. * Ping.
  144. * @return void
  145. */
  146. public static function ping()
  147. {
  148. if(self::$_remoteConnection)
  149. {
  150. self::$_remoteConnection->send('');
  151. }
  152. }
  153. /**
  154. * onRemoteClose.
  155. * @return void
  156. */
  157. public static function onRemoteClose()
  158. {
  159. echo "Waring channel connection closed and try to reconnect\n";
  160. self::$_remoteConnection = null;
  161. self::clearTimer();
  162. self::$_reconnectTimer = Timer::add(1, 'Channel\Client::connect', array(self::$_remoteIp, self::$_remotePort));
  163. if (self::$onClose) {
  164. call_user_func(Client::$onClose);
  165. }
  166. }
  167. /**
  168. * onRemoteConnect.
  169. * @return void
  170. */
  171. public static function onRemoteConnect()
  172. {
  173. $all_event_names = array_keys(self::$_events);
  174. if($all_event_names)
  175. {
  176. self::subscribe($all_event_names);
  177. }
  178. self::clearTimer();
  179. if (self::$onConnect) {
  180. call_user_func(Client::$onConnect);
  181. }
  182. }
  183. /**
  184. * clearTimer.
  185. * @return void
  186. */
  187. public static function clearTimer()
  188. {
  189. if (!self::$_isWorkermanEnv) {
  190. throw new \Exception('Channel\\Client not support clearTimer method when it is not in the workerman environment.');
  191. }
  192. if(self::$_reconnectTimer)
  193. {
  194. Timer::del(self::$_reconnectTimer);
  195. self::$_reconnectTimer = null;
  196. }
  197. }
  198. /**
  199. * On.
  200. * @param string $event
  201. * @param callback $callback
  202. * @throws \Exception
  203. */
  204. public static function on($event, $callback)
  205. {
  206. if (!is_callable($callback)) {
  207. throw new \Exception('callback is not callable for event.');
  208. }
  209. self::$_events[$event] = $callback;
  210. self::subscribe($event);
  211. }
  212. /**
  213. * Subscribe.
  214. * @param string $events
  215. * @return void
  216. */
  217. public static function subscribe($events)
  218. {
  219. $events = (array)$events;
  220. self::send(array('type' => 'subscribe', 'channels'=>$events));
  221. foreach ($events as $event) {
  222. if(!isset(self::$_events[$event])) {
  223. self::$_events[$event] = null;
  224. }
  225. }
  226. }
  227. /**
  228. * Unsubscribe.
  229. * @param string $events
  230. * @return void
  231. */
  232. public static function unsubscribe($events)
  233. {
  234. $events = (array)$events;
  235. self::send(array('type' => 'unsubscribe', 'channels'=>$events));
  236. foreach($events as $event) {
  237. unset(self::$_events[$event]);
  238. }
  239. }
  240. /**
  241. * Publish.
  242. * @param string $events
  243. * @param mixed $data
  244. */
  245. public static function publish($events, $data)
  246. {
  247. self::sendAnyway(array('type' => 'publish', 'channels' => (array)$events, 'data' => $data));
  248. }
  249. /**
  250. * Watch a channel of queue
  251. * @param string|array $channels
  252. * @param callable $callback
  253. * @param boolean $autoReserve Auto reserve after callback finished.
  254. * But sometime you may don't want reserve immediately, or in some asynchronous job,
  255. * you want reserve in finished callback, so you should set $autoReserve to false
  256. * and call Client::reserve() after watch() and in finish callback manually.
  257. * @throws \Exception
  258. */
  259. public static function watch($channels, $callback, $autoReserve=true)
  260. {
  261. if (!is_callable($callback)) {
  262. throw new \Exception('callback is not callable for watch.');
  263. }
  264. if ($autoReserve) {
  265. $callback = static function($data) use ($callback) {
  266. try {
  267. call_user_func($callback, $data);
  268. } catch (\Exception $e) {
  269. throw $e;
  270. } catch (\Error $e) {
  271. throw $e;
  272. } finally {
  273. self::reserve();
  274. }
  275. };
  276. }
  277. $channels = (array)$channels;
  278. self::send(array('type' => 'watch', 'channels'=>$channels));
  279. foreach ($channels as $channel) {
  280. self::$_queues[$channel] = $callback;
  281. }
  282. if ($autoReserve) {
  283. self::reserve();
  284. }
  285. }
  286. /**
  287. * Unwatch a channel of queue
  288. * @param string $channel
  289. * @throws \Exception
  290. */
  291. public static function unwatch($channels)
  292. {
  293. $channels = (array)$channels;
  294. self::send(array('type' => 'unwatch', 'channels'=>$channels));
  295. foreach ($channels as $channel) {
  296. if (isset(self::$_queues[$channel])) {
  297. unset(self::$_queues[$channel]);
  298. }
  299. }
  300. }
  301. /**
  302. * Put data to queue
  303. * @param string|array $channels
  304. * @param mixed $data
  305. * @throws \Exception
  306. */
  307. public static function enqueue($channels, $data)
  308. {
  309. self::sendAnyway(array('type' => 'enqueue', 'channels' => (array)$channels, 'data' => $data));
  310. }
  311. /**
  312. * Start reserve queue manual
  313. * @throws \Exception
  314. */
  315. public static function reserve()
  316. {
  317. self::send(array('type' => 'reserve'));
  318. }
  319. /**
  320. * Send through workerman environment
  321. * @param $data
  322. * @throws \Exception
  323. */
  324. protected static function send($data)
  325. {
  326. if (!self::$_isWorkermanEnv) {
  327. throw new \Exception("Channel\\Client not support {$data['type']} method when it is not in the workerman environment.");
  328. }
  329. self::connect(self::$_remoteIp, self::$_remotePort);
  330. self::$_remoteConnection->send(serialize($data));
  331. }
  332. /**
  333. * Send from any environment
  334. * @param $data
  335. * @throws \Exception
  336. */
  337. protected static function sendAnyway($data)
  338. {
  339. self::connect(self::$_remoteIp, self::$_remotePort);
  340. $body = serialize($data);
  341. if (self::$_isWorkermanEnv) {
  342. self::$_remoteConnection->send($body);
  343. } else {
  344. $buffer = pack('N', 4+strlen($body)) . $body;
  345. fwrite(self::$_remoteConnection, $buffer);
  346. }
  347. }
  348. }