start_io.php 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. <?php
  2. use Workerman\Worker;
  3. use Workerman\Timer;
  4. use PHPSocketIO\SocketIO;
  5. use Workerman\Protocols\Http\Request;
  6. use Workerman\Connection\TcpConnection;
  7. include __DIR__ . '/vendor/autoload.php';
  8. // 全局数组保存uid在线数据
  9. $uidConnectionMap = array();
  10. // 记录最后一次广播的在线用户数
  11. $last_online_count = 0;
  12. // 记录最后一次广播的在线页面数
  13. $last_online_page_count = 0;
  14. $context = array(
  15. 'ssl' => array(
  16. 'local_cert' => '/www/wwwroot/1.wepolicy.cn/msg/erp.crt',
  17. 'local_pk' => '/www/wwwroot/1.wepolicy.cn/msg/erp.key',
  18. 'verify_peer' => false,
  19. )
  20. );
  21. // PHPSocketIO服务
  22. $sender_io = new SocketIO(2120,$context);
  23. // 客户端发起连接事件时,设置连接socket的各种事件回调
  24. $sender_io->on('connection', function($socket){
  25. // 当客户端发来登录事件时触发
  26. $socket->on('login', function ($uid)use($socket){
  27. global $uidConnectionMap, $last_online_count, $last_online_page_count;
  28. // 已经登录过了
  29. if(isset($socket->uid)){
  30. return;
  31. }
  32. // 更新对应uid的在线数据
  33. $uid = (string)$uid;
  34. if(!isset($uidConnectionMap[$uid]))
  35. {
  36. $uidConnectionMap[$uid] = 0;
  37. }
  38. // 这个uid有++$uidConnectionMap[$uid]个socket连接
  39. ++$uidConnectionMap[$uid];
  40. // 将这个连接加入到uid分组,方便针对uid推送数据
  41. $socket->join($uid);
  42. $socket->uid = $uid;
  43. // 更新这个socket对应页面的在线数据
  44. $username = '';
  45. foreach ($uidConnectionMap as $k=>$v)
  46. {
  47. $username .= $k.'、';
  48. }
  49. $socket->emit('update_online_count', json_encode(array('num'=>$last_online_count,'username'=>trim($username,'、'))));//{$last_online_page_count}页面
  50. });
  51. // 当客户端断开连接是触发(一般是关闭网页或者跳转刷新导致)
  52. $socket->on('disconnect', function () use($socket) {
  53. if(!isset($socket->uid))
  54. {
  55. return;
  56. }
  57. global $uidConnectionMap, $sender_io;
  58. // 将uid的在线socket数减一
  59. if(--$uidConnectionMap[$socket->uid] <= 0)
  60. {
  61. unset($uidConnectionMap[$socket->uid]);
  62. }
  63. });
  64. });
  65. // 当$sender_io启动后监听一个http端口,通过这个端口可以给任意uid或者所有uid推送数据
  66. $sender_io->on('workerStart', function(){
  67. // 监听一个http端口
  68. $inner_http_worker = new Worker('http://0.0.0.0:2121');
  69. // 当http客户端发来数据时触发
  70. $inner_http_worker->onMessage = function(TcpConnection $http_connection, Request $request){
  71. global $uidConnectionMap;
  72. $post = $request->post();
  73. $post = $post ? $post : $request->get();
  74. // 推送数据的url格式 type=publish&to=uid&content=xxxx
  75. switch(@$post['type']){
  76. case 'publish':
  77. global $sender_io;
  78. $to = @$post['to'];
  79. $post['content'] = htmlspecialchars(@$post['content']);
  80. // 有指定uid则向uid所在socket组发送数据
  81. if($to){
  82. if($post['content'] == $to)
  83. {
  84. $sender_io->to($to)->emit($to, $post['content']);
  85. }
  86. else
  87. {
  88. $sender_io->to($to)->emit('new_msg', $post['content']);
  89. }
  90. // 否则向所有uid推送数据
  91. }else{
  92. $sender_io->emit('new_msg', @$post['content']);
  93. }
  94. // http接口返回,如果用户离线socket返回fail
  95. if($to && !isset($uidConnectionMap[$to])){
  96. return $http_connection->send('offline');
  97. }else{
  98. return $http_connection->send('ok');
  99. }
  100. }
  101. return $http_connection->send('fail');
  102. };
  103. // 执行监听
  104. $inner_http_worker->listen();
  105. // 一个定时器,定时向所有uid推送当前uid在线数及在线页面数
  106. Timer::add(1, function(){
  107. global $uidConnectionMap, $sender_io, $last_online_count, $last_online_page_count;
  108. $online_count_now = count($uidConnectionMap);
  109. $online_page_count_now = array_sum($uidConnectionMap);
  110. // 只有在客户端在线数变化了才广播,减少不必要的客户端通讯
  111. if($last_online_count != $online_count_now || $last_online_page_count != $online_page_count_now)
  112. {
  113. $username = '';
  114. foreach ($uidConnectionMap as $k=>$v)
  115. {
  116. $username .= $k.'、';
  117. }
  118. $sender_io->emit('update_online_count', json_encode(array('num'=>$online_count_now,'username'=>trim($username,'、'))));//{$online_page_count_now} 页面
  119. $last_online_count = $online_count_now;
  120. $last_online_page_count = $online_page_count_now;
  121. }
  122. });
  123. });
  124. if(!defined('GLOBAL_START'))
  125. {
  126. Worker::runAll();
  127. }