Socket.php 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. <?php
  2. namespace PHPSocketIO\Engine;
  3. use \PHPSocketIO\Event\Emitter;
  4. use \Workerman\Lib\Timer;
  5. use \PHPSocketIO\Debug;
  6. class Socket extends Emitter
  7. {
  8. public $id = 0;
  9. public $server = null;
  10. public $upgrading = false;
  11. public $upgraded = false;
  12. public $readyState = 'opening';
  13. public $writeBuffer = array();
  14. public $packetsFn = array();
  15. public $sentCallbackFn = array();
  16. public $request = null;
  17. public $remoteAddress = '';
  18. public $checkIntervalTimer = null;
  19. public $upgradeTimeoutTimer = null;
  20. public $pingTimeoutTimer = null;
  21. public function __construct($id, $server, $transport, $req)
  22. {
  23. $this->id = $id;
  24. $this->server = $server;
  25. $this->request = $req;
  26. $this->remoteAddress = $req->connection->getRemoteIp().':'.$req->connection->getRemotePort();
  27. $this->setTransport($transport);
  28. $this->onOpen();
  29. Debug::debug('Engine/Socket __construct');
  30. }
  31. public function __destruct()
  32. {
  33. Debug::debug('Engine/Socket __destruct');
  34. }
  35. public function maybeUpgrade($transport)
  36. {
  37. $this->upgrading = true;
  38. $this->upgradeTimeoutTimer = Timer::add(
  39. $this->server->upgradeTimeout,
  40. array($this, 'upgradeTimeoutCallback'),
  41. array($transport), false
  42. );
  43. $this->upgradeTransport = $transport;
  44. $transport->on('packet', array($this, 'onUpgradePacket'));
  45. $transport->once('close', array($this, 'onUpgradeTransportClose'));
  46. $transport->once('error', array($this, 'onUpgradeTransportError'));
  47. $this->once('close', array($this, 'onUpgradeTransportClose'));
  48. }
  49. public function onUpgradePacket($packet)
  50. {
  51. if(empty($this->upgradeTransport))
  52. {
  53. $this->onError('upgradeTransport empty');
  54. return;
  55. }
  56. if('ping' === $packet['type'] && (isset($packet['data']) && 'probe' === $packet['data']))
  57. {
  58. $this->upgradeTransport->send(array(array('type'=> 'pong', 'data'=> 'probe')));
  59. //$this->transport->shouldClose = function(){};
  60. if ($this->checkIntervalTimer) {
  61. Timer::del($this->checkIntervalTimer);
  62. }
  63. $this->checkIntervalTimer = Timer::add(0.5, array($this, 'check'));
  64. }
  65. else if('upgrade' === $packet['type'] && $this->readyState !== 'closed')
  66. {
  67. $this->upgradeCleanup();
  68. $this->upgraded = true;
  69. $this->clearTransport();
  70. $this->transport->destroy();
  71. $this->setTransport($this->upgradeTransport);
  72. $this->emit('upgrade', $this->upgradeTransport);
  73. $this->upgradeTransport = null;
  74. $this->setPingTimeout();
  75. $this->flush();
  76. if($this->readyState === 'closing')
  77. {
  78. $this->transport->close(array($this, 'onClose'));
  79. }
  80. }
  81. else
  82. {
  83. if(!empty($this->upgradeTransport))
  84. {
  85. $this->upgradeCleanup();
  86. $this->upgradeTransport->close();
  87. $this->upgradeTransport = null;
  88. }
  89. }
  90. }
  91. public function upgradeCleanup()
  92. {
  93. $this->upgrading = false;
  94. Timer::del($this->checkIntervalTimer);
  95. Timer::del($this->upgradeTimeoutTimer);
  96. if(!empty($this->upgradeTransport))
  97. {
  98. $this->upgradeTransport->removeListener('packet', array($this, 'onUpgradePacket'));
  99. $this->upgradeTransport->removeListener('close', array($this, 'onUpgradeTransportClose'));
  100. $this->upgradeTransport->removeListener('error', array($this, 'onUpgradeTransportError'));
  101. }
  102. $this->removeListener('close', array($this, 'onUpgradeTransportClose'));
  103. }
  104. public function onUpgradeTransportClose()
  105. {
  106. $this->onUpgradeTransportError('transport closed');
  107. }
  108. public function onUpgradeTransportError($err)
  109. {
  110. //echo $err;
  111. $this->upgradeCleanup();
  112. if($this->upgradeTransport)
  113. {
  114. $this->upgradeTransport->close();
  115. $this->upgradeTransport = null;
  116. }
  117. }
  118. public function upgradeTimeoutCallback($transport)
  119. {
  120. //echo("client did not complete upgrade - closing transport\n");
  121. $this->upgradeCleanup();
  122. if('open' === $transport->readyState)
  123. {
  124. $transport->close();
  125. }
  126. }
  127. public function setTransport($transport)
  128. {
  129. $this->transport = $transport;
  130. $this->transport->once('error', array($this, 'onError'));
  131. $this->transport->on('packet', array($this, 'onPacket'));
  132. $this->transport->on('drain', array($this, 'flush'));
  133. $this->transport->once('close', array($this, 'onClose'));
  134. //this function will manage packet events (also message callbacks)
  135. $this->setupSendCallback();
  136. }
  137. public function onOpen()
  138. {
  139. $this->readyState = 'open';
  140. // sends an `open` packet
  141. $this->transport->sid = $this->id;
  142. $this->sendPacket('open', json_encode(array(
  143. 'sid'=> $this->id
  144. , 'upgrades' => $this->getAvailableUpgrades()
  145. , 'pingInterval'=> $this->server->pingInterval*1000
  146. , 'pingTimeout'=> $this->server->pingTimeout*1000
  147. )));
  148. $this->emit('open');
  149. $this->setPingTimeout();
  150. }
  151. public function onPacket($packet)
  152. {
  153. if ('open' === $this->readyState) {
  154. // export packet event
  155. $this->emit('packet', $packet);
  156. // Reset ping timeout on any packet, incoming data is a good sign of
  157. // other side's liveness
  158. $this->setPingTimeout();
  159. switch ($packet['type']) {
  160. case 'ping':
  161. $this->sendPacket('pong');
  162. $this->emit('heartbeat');
  163. break;
  164. case 'error':
  165. $this->onClose('parse error');
  166. break;
  167. case 'message':
  168. $this->emit('data', $packet['data']);
  169. $this->emit('message', $packet['data']);
  170. break;
  171. }
  172. }
  173. else
  174. {
  175. echo('packet received with closed socket');
  176. }
  177. }
  178. public function check()
  179. {
  180. if('polling' == $this->transport->name && $this->transport->writable)
  181. {
  182. $this->transport->send(array(array('type' => 'noop')));
  183. }
  184. }
  185. public function onError($err)
  186. {
  187. $this->onClose('transport error', $err);
  188. }
  189. public function setPingTimeout()
  190. {
  191. if ($this->pingTimeoutTimer) {
  192. Timer::del($this->pingTimeoutTimer);
  193. }
  194. $this->pingTimeoutTimer = Timer::add(
  195. $this->server->pingInterval + $this->server->pingTimeout ,
  196. array($this, 'pingTimeoutCallback'), null, false);
  197. }
  198. public function pingTimeoutCallback()
  199. {
  200. $this->transport->close();
  201. $this->onClose('ping timeout');
  202. }
  203. public function clearTransport()
  204. {
  205. $this->transport->close();
  206. Timer::del($this->pingTimeoutTimer);
  207. }
  208. public function onClose($reason = '', $description = null)
  209. {
  210. if ('closed' !== $this->readyState)
  211. {
  212. Timer::del($this->pingTimeoutTimer);
  213. Timer::del($this->checkIntervalTimer);
  214. $this->checkIntervalTimer = null;
  215. Timer::del($this->upgradeTimeoutTimer);
  216. // clean writeBuffer in next tick, so developers can still
  217. // grab the writeBuffer on 'close' event
  218. $this->writeBuffer = array();
  219. $this->packetsFn = array();
  220. $this->sentCallbackFn = array();
  221. $this->clearTransport();
  222. $this->readyState = 'closed';
  223. $this->emit('close', $this->id, $reason, $description);
  224. $this->server = null;
  225. $this->request = null;
  226. $this->upgradeTransport = null;
  227. $this->removeAllListeners();
  228. if(!empty($this->transport))
  229. {
  230. $this->transport->removeAllListeners();
  231. $this->transport = null;
  232. }
  233. }
  234. }
  235. public function send($data, $options, $callback)
  236. {
  237. $this->sendPacket('message', $data, $callback);
  238. return $this;
  239. }
  240. public function write($data, $options = array(), $callback = null)
  241. {
  242. return $this->send($data, $options, $callback);
  243. }
  244. public function sendPacket($type, $data = null, $callback = null)
  245. {
  246. if('closing' !== $this->readyState)
  247. {
  248. $packet = array(
  249. 'type'=> $type
  250. );
  251. if($data !== null)
  252. {
  253. $packet['data'] = $data;
  254. }
  255. // exports packetCreate event
  256. $this->emit('packetCreate', $packet);
  257. $this->writeBuffer[] = $packet;
  258. //add send callback to object
  259. if($callback)
  260. {
  261. $this->packetsFn[] = $callback;
  262. }
  263. $this->flush();
  264. }
  265. }
  266. public function flush()
  267. {
  268. if ('closed' !== $this->readyState && $this->transport->writable
  269. && $this->writeBuffer)
  270. {
  271. $this->emit('flush', $this->writeBuffer);
  272. $this->server->emit('flush', $this, $this->writeBuffer);
  273. $wbuf = $this->writeBuffer;
  274. $this->writeBuffer = array();
  275. if($this->packetsFn)
  276. {
  277. if(!empty($this->transport->supportsFraming))
  278. {
  279. $this->sentCallbackFn[] = $this->packetsFn;
  280. }
  281. else
  282. {
  283. // @todo check
  284. $this->sentCallbackFn[]=$this->packetsFn;
  285. }
  286. }
  287. $this->packetsFn = array();
  288. $this->transport->send($wbuf);
  289. $this->emit('drain');
  290. if($this->server)
  291. {
  292. $this->server->emit('drain', $this);
  293. }
  294. }
  295. }
  296. public function getAvailableUpgrades()
  297. {
  298. return array('websocket');
  299. }
  300. public function close()
  301. {
  302. if ('open' !== $this->readyState)
  303. {
  304. return;
  305. }
  306. $this->readyState = 'closing';
  307. if ($this->writeBuffer) {
  308. $this->once('drain', array($this, 'closeTransport'));
  309. return;
  310. }
  311. $this->closeTransport();
  312. }
  313. public function closeTransport()
  314. {
  315. //todo onClose.bind(this, 'forced close'));
  316. $this->transport->close(array($this, 'onClose'));
  317. }
  318. public function setupSendCallback()
  319. {
  320. $self = $this;
  321. //the message was sent successfully, execute the callback
  322. $this->transport->on('drain', array($this, 'onDrainCallback'));
  323. }
  324. public function onDrainCallback()
  325. {
  326. if ($this->sentCallbackFn)
  327. {
  328. $seqFn = array_shift($this->sentCallbackFn);
  329. if(is_callable($seqFn))
  330. {
  331. echo('executing send callback');
  332. call_user_func($seqFn, $this->transport);
  333. }else if (is_array($seqFn)) {
  334. echo('executing batch send callback');
  335. foreach($seqFn as $fn)
  336. {
  337. call_user_func($fn, $this->transport);
  338. }
  339. }
  340. }
  341. }
  342. }