Server.php 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. <?php
  2. namespace Channel;
  3. use Workerman\Protocols\Frame;
  4. use Workerman\Worker;
  5. /**
  6. * Channel server.
  7. */
  8. class Server
  9. {
  10. /**
  11. * Worker instance.
  12. * @var Worker
  13. */
  14. protected $_worker = null;
  15. /**
  16. * Queues
  17. * @var Queue[]
  18. */
  19. protected $_queues = array();
  20. private $ip;
  21. /**
  22. * Construct.
  23. * @param string $ip Bind ip address or unix domain socket.
  24. * Bind unix domain socket use 'unix:///tmp/channel.sock'
  25. * @param int $port Tcp port to bind, only used when listen on tcp.
  26. */
  27. public function __construct($ip = '0.0.0.0', $port = 2206)
  28. {
  29. if (strpos($ip, 'unix:') === false) {
  30. $worker = new Worker("frame://$ip:$port");
  31. } else {
  32. $worker = new Worker($ip);
  33. $worker->protocol = Frame::class;
  34. }
  35. $this->ip = $ip;
  36. $worker->count = 1;
  37. $worker->name = 'ChannelServer';
  38. $worker->channels = array();
  39. $worker->onMessage = array($this, 'onMessage') ;
  40. $worker->onClose = array($this, 'onClose');
  41. $this->_worker = $worker;
  42. }
  43. /**
  44. * onClose
  45. * @return void
  46. */
  47. public function onClose($connection)
  48. {
  49. if (!empty($connection->channels)) {
  50. foreach ($connection->channels as $channel) {
  51. unset($this->_worker->channels[$channel][$connection->id]);
  52. if (empty($this->_worker->channels[$channel])) {
  53. unset($this->_worker->channels[$channel]);
  54. }
  55. }
  56. }
  57. if (!empty($connection->watchs)) {
  58. foreach ($connection->watchs as $channel) {
  59. if (isset($this->_queues[$channel])) {
  60. $this->_queues[$channel]->removeWatch($connection);
  61. if ($this->_queues[$channel]->isEmpty()) {
  62. unset($this->_queues[$channel]);
  63. }
  64. }
  65. }
  66. }
  67. }
  68. /**
  69. * onMessage.
  70. * @param \Workerman\Connection\TcpConnection $connection
  71. * @param string $data
  72. */
  73. public function onMessage($connection, $data)
  74. {
  75. if(!$data)
  76. {
  77. return;
  78. }
  79. $worker = $this->_worker;
  80. $data = unserialize($data);
  81. $type = $data['type'];
  82. switch($type)
  83. {
  84. case 'subscribe':
  85. foreach($data['channels'] as $channel)
  86. {
  87. $connection->channels[$channel] = $channel;
  88. $worker->channels[$channel][$connection->id] = $connection;
  89. }
  90. break;
  91. case 'unsubscribe':
  92. foreach($data['channels'] as $channel) {
  93. if (isset($connection->channels[$channel])) {
  94. unset($connection->channels[$channel]);
  95. }
  96. if (isset($worker->channels[$channel][$connection->id])) {
  97. unset($worker->channels[$channel][$connection->id]);
  98. if (empty($worker->channels[$channel])) {
  99. unset($worker->channels[$channel]);
  100. }
  101. }
  102. }
  103. break;
  104. case 'publish':
  105. foreach ($data['channels'] as $channel) {
  106. if (empty($worker->channels[$channel])) {
  107. continue;
  108. }
  109. $buffer = serialize(array('type' => 'event', 'channel' => $channel, 'data' => $data['data']))."\n";
  110. foreach ($worker->channels[$channel] as $connection) {
  111. $connection->send($buffer);
  112. }
  113. }
  114. break;
  115. case 'watch':
  116. foreach ($data['channels'] as $channel) {
  117. $this->getQueue($channel)->addWatch($connection);
  118. }
  119. break;
  120. case 'unwatch':
  121. foreach ($data['channels'] as $channel) {
  122. if (isset($this->_queues[$channel])) {
  123. $this->_queues[$channel]->removeWatch($connection);
  124. if ($this->_queues[$channel]->isEmpty()) {
  125. unset($this->_queues[$channel]);
  126. }
  127. }
  128. }
  129. break;
  130. case 'enqueue':
  131. foreach ($data['channels'] as $channel) {
  132. $this->getQueue($channel)->enqueue($data['data']);
  133. }
  134. break;
  135. case 'reserve':
  136. if (isset($connection->watchs)) {
  137. foreach ($connection->watchs as $channel) {
  138. if (isset($this->_queues[$channel])) {
  139. $this->_queues[$channel]->addConsumer($connection);
  140. }
  141. }
  142. }
  143. break;
  144. }
  145. }
  146. private function getQueue($channel)
  147. {
  148. if (isset($this->_queues[$channel])) {
  149. return $this->_queues[$channel];
  150. }
  151. return ($this->_queues[$channel] = new Queue($channel));
  152. }
  153. }