| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 | <?phpnamespace Channel;use Workerman\Protocols\Frame;use Workerman\Worker;/** * Channel server. */class Server{    /**     * Worker instance.     * @var Worker     */    protected $_worker = null;    /**     * Queues     * @var Queue[]     */    protected $_queues = array();    private $ip;    /**     * Construct.     * @param string $ip Bind ip address or unix domain socket.     * Bind unix domain socket use 'unix:///tmp/channel.sock'     * @param int $port Tcp port to bind, only used when listen on tcp.     */    public function __construct($ip = '0.0.0.0', $port = 2206)    {        if (strpos($ip, 'unix:') === false) {            $worker = new Worker("frame://$ip:$port");        } else {            $worker = new Worker($ip);            $worker->protocol = Frame::class;        }        $this->ip = $ip;        $worker->count = 1;        $worker->name = 'ChannelServer';        $worker->channels = array();        $worker->onMessage = array($this, 'onMessage') ;        $worker->onClose = array($this, 'onClose');        $this->_worker = $worker;    }    /**     * onClose     * @return void     */    public function onClose($connection)    {        if (!empty($connection->channels)) {	        foreach ($connection->channels as $channel) {		        unset($this->_worker->channels[$channel][$connection->id]);		        if (empty($this->_worker->channels[$channel])) {			        unset($this->_worker->channels[$channel]);		        }	        }        }        if (!empty($connection->watchs)) {        	foreach ($connection->watchs as $channel) {        		if (isset($this->_queues[$channel])) {        			$this->_queues[$channel]->removeWatch($connection);        			if ($this->_queues[$channel]->isEmpty()) {        				unset($this->_queues[$channel]);			        }		        }	        }        }    }    /**     * onMessage.     * @param \Workerman\Connection\TcpConnection $connection     * @param string $data     */    public function onMessage($connection, $data)    {        if(!$data)        {            return;        }        $worker = $this->_worker;        $data = unserialize($data);        $type = $data['type'];        switch($type)        {            case 'subscribe':                foreach($data['channels'] as $channel)                {                    $connection->channels[$channel] = $channel;                    $worker->channels[$channel][$connection->id] = $connection;                }                break;            case 'unsubscribe':                foreach($data['channels'] as $channel) {                    if (isset($connection->channels[$channel])) {                        unset($connection->channels[$channel]);                    }                    if (isset($worker->channels[$channel][$connection->id])) {                        unset($worker->channels[$channel][$connection->id]);                        if (empty($worker->channels[$channel])) {                            unset($worker->channels[$channel]);                        }                    }                }                break;            case 'publish':                foreach ($data['channels'] as $channel) {                    if (empty($worker->channels[$channel])) {                        continue;                    }                    $buffer = serialize(array('type' => 'event', 'channel' => $channel, 'data' => $data['data']))."\n";                    foreach ($worker->channels[$channel] as $connection) {                        $connection->send($buffer);                    }                }                break;            case 'watch':            	foreach ($data['channels'] as $channel) {		            $this->getQueue($channel)->addWatch($connection);	            }                break;            case 'unwatch':	            foreach ($data['channels'] as $channel) {		            if (isset($this->_queues[$channel])) {			            $this->_queues[$channel]->removeWatch($connection);			            if ($this->_queues[$channel]->isEmpty()) {				            unset($this->_queues[$channel]);			            }		            }	            }                break;            case 'enqueue':            	foreach ($data['channels'] as $channel) {		            $this->getQueue($channel)->enqueue($data['data']);	            }                break;            case 'reserve':				if (isset($connection->watchs)) {					foreach ($connection->watchs as $channel) {						if (isset($this->_queues[$channel])) {							$this->_queues[$channel]->addConsumer($connection);						}					}				}                break;        }    }    private function getQueue($channel)    {        if (isset($this->_queues[$channel])) {            return $this->_queues[$channel];        }        return ($this->_queues[$channel] = new Queue($channel));    }}
 |