123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- <?php
- namespace Channel;
- use Workerman\Protocols\Frame;
- use Workerman\Worker;
- /**
- * Channel server.
- */
- class Server
- {
- /**
- * Worker instance.
- * @var Worker
- */
- protected $_worker = null;
- /**
- * Queues
- * @var Queue[]
- */
- protected $_queues = array();
- private $ip;
- /**
- * Construct.
- * @param string $ip Bind ip address or unix domain socket.
- * Bind unix domain socket use 'unix:///tmp/channel.sock'
- * @param int $port Tcp port to bind, only used when listen on tcp.
- */
- public function __construct($ip = '0.0.0.0', $port = 2206)
- {
- if (strpos($ip, 'unix:') === false) {
- $worker = new Worker("frame://$ip:$port");
- } else {
- $worker = new Worker($ip);
- $worker->protocol = Frame::class;
- }
- $this->ip = $ip;
- $worker->count = 1;
- $worker->name = 'ChannelServer';
- $worker->channels = array();
- $worker->onMessage = array($this, 'onMessage') ;
- $worker->onClose = array($this, 'onClose');
- $this->_worker = $worker;
- }
- /**
- * onClose
- * @return void
- */
- public function onClose($connection)
- {
- if (!empty($connection->channels)) {
- foreach ($connection->channels as $channel) {
- unset($this->_worker->channels[$channel][$connection->id]);
- if (empty($this->_worker->channels[$channel])) {
- unset($this->_worker->channels[$channel]);
- }
- }
- }
- if (!empty($connection->watchs)) {
- foreach ($connection->watchs as $channel) {
- if (isset($this->_queues[$channel])) {
- $this->_queues[$channel]->removeWatch($connection);
- if ($this->_queues[$channel]->isEmpty()) {
- unset($this->_queues[$channel]);
- }
- }
- }
- }
- }
- /**
- * onMessage.
- * @param \Workerman\Connection\TcpConnection $connection
- * @param string $data
- */
- public function onMessage($connection, $data)
- {
- if(!$data)
- {
- return;
- }
- $worker = $this->_worker;
- $data = unserialize($data);
- $type = $data['type'];
- switch($type)
- {
- case 'subscribe':
- foreach($data['channels'] as $channel)
- {
- $connection->channels[$channel] = $channel;
- $worker->channels[$channel][$connection->id] = $connection;
- }
- break;
- case 'unsubscribe':
- foreach($data['channels'] as $channel) {
- if (isset($connection->channels[$channel])) {
- unset($connection->channels[$channel]);
- }
- if (isset($worker->channels[$channel][$connection->id])) {
- unset($worker->channels[$channel][$connection->id]);
- if (empty($worker->channels[$channel])) {
- unset($worker->channels[$channel]);
- }
- }
- }
- break;
- case 'publish':
- foreach ($data['channels'] as $channel) {
- if (empty($worker->channels[$channel])) {
- continue;
- }
- $buffer = serialize(array('type' => 'event', 'channel' => $channel, 'data' => $data['data']))."\n";
- foreach ($worker->channels[$channel] as $connection) {
- $connection->send($buffer);
- }
- }
- break;
- case 'watch':
- foreach ($data['channels'] as $channel) {
- $this->getQueue($channel)->addWatch($connection);
- }
- break;
- case 'unwatch':
- foreach ($data['channels'] as $channel) {
- if (isset($this->_queues[$channel])) {
- $this->_queues[$channel]->removeWatch($connection);
- if ($this->_queues[$channel]->isEmpty()) {
- unset($this->_queues[$channel]);
- }
- }
- }
- break;
- case 'enqueue':
- foreach ($data['channels'] as $channel) {
- $this->getQueue($channel)->enqueue($data['data']);
- }
- break;
- case 'reserve':
- if (isset($connection->watchs)) {
- foreach ($connection->watchs as $channel) {
- if (isset($this->_queues[$channel])) {
- $this->_queues[$channel]->addConsumer($connection);
- }
- }
- }
- break;
- }
- }
- private function getQueue($channel)
- {
- if (isset($this->_queues[$channel])) {
- return $this->_queues[$channel];
- }
- return ($this->_queues[$channel] = new Queue($channel));
- }
- }
|