123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391 |
- <?php
- namespace Channel;
- use Workerman\Connection\AsyncTcpConnection;
- use Workerman\Lib\Timer;
- use Workerman\Protocols\Frame;
- /**
- * Channel/Client
- * @version 1.0.7
- */
- class Client
- {
- /**
- * onMessage.
- * @var callback
- */
- public static $onMessage = null;
- /**
- * onConnect
- * @var callback
- */
- public static $onConnect = null;
- /**
- * onClose
- * @var callback
- */
- public static $onClose = null;
- /**
- * Connction to channel server.
- * @var \Workerman\Connection\TcpConnection
- */
- protected static $_remoteConnection = null;
- /**
- * Channel server ip.
- * @var string
- */
- protected static $_remoteIp = null;
- /**
- * Channel server port.
- * @var int
- */
- protected static $_remotePort = null;
- /**
- * Reconnect timer.
- * @var Timer
- */
- protected static $_reconnectTimer = null;
- /**
- * Ping timer.
- * @var Timer
- */
- protected static $_pingTimer = null;
- /**
- * All event callback.
- * @var array
- */
- protected static $_events = array();
- /**
- * All queue callback.
- * @var callable
- */
- protected static $_queues = array();
- /**
- * @var bool
- */
- protected static $_isWorkermanEnv = true;
- /**
- * Ping interval.
- * @var int
- */
- public static $pingInterval = 25;
- /**
- * Connect to channel server
- * @param string $ip Channel server ip address or unix domain socket address
- * Ip like (TCP): 192.168.1.100
- * Unix domain socket like: unix:///tmp/workerman-channel.sock
- * @param int $port Port to connect when use tcp
- */
- public static function connect($ip = '127.0.0.1', $port = 2206)
- {
- if (self::$_remoteConnection) {
- return;
- }
- self::$_remoteIp = $ip;
- self::$_remotePort = $port;
- if (PHP_SAPI !== 'cli' || !class_exists('Workerman\Worker', false)) {
- self::$_isWorkermanEnv = false;
- }
- // For workerman environment.
- if (self::$_isWorkermanEnv) {
- if (strpos($ip, 'unix://') === false) {
- $conn = new AsyncTcpConnection('frame://' . self::$_remoteIp . ':' . self::$_remotePort);
- } else {
- $conn = new AsyncTcpConnection($ip);
- $conn->protocol = Frame::class;
- }
- $conn->onClose = [self::class, 'onRemoteClose'];
- $conn->onConnect = [self::class, 'onRemoteConnect'];
- $conn->onMessage = [self::class , 'onRemoteMessage'];
- $conn->connect();
- if (empty(self::$_pingTimer)) {
- self::$_pingTimer = Timer::add(self::$pingInterval, 'Channel\Client::ping');
- }
- // Not workerman environment.
- } else {
- $remote = strpos($ip, 'unix://') === false ? 'tcp://'.self::$_remoteIp.':'.self::$_remotePort : $ip;
- $conn = stream_socket_client($remote, $code, $message, 5);
- if (!$conn) {
- throw new \Exception($message);
- }
- }
- self::$_remoteConnection = $conn;
- }
- /**
- * onRemoteMessage.
- * @param \Workerman\Connection\TcpConnection $connection
- * @param string $data
- * @throws \Exception
- */
- public static function onRemoteMessage($connection, $data)
- {
- $data = unserialize($data);
- $type = $data['type'];
- $event = $data['channel'];
- $event_data = $data['data'];
- $callback = null;
- if ($type == 'event') {
- if (!empty(self::$_events[$event])) {
- call_user_func(self::$_events[$event], $event_data);
- } elseif (!empty(Client::$onMessage)) {
- call_user_func(Client::$onMessage, $event, $event_data);
- } else {
- throw new \Exception("event:$event have not callback");
- }
- } else {
- if (isset(self::$_queues[$event])) {
- call_user_func(self::$_queues[$event], $event_data);
- } else {
- throw new \Exception("queue:$event have not callback");
- }
- }
- }
- /**
- * Ping.
- * @return void
- */
- public static function ping()
- {
- if(self::$_remoteConnection)
- {
- self::$_remoteConnection->send('');
- }
- }
- /**
- * onRemoteClose.
- * @return void
- */
- public static function onRemoteClose()
- {
- echo "Waring channel connection closed and try to reconnect\n";
- self::$_remoteConnection = null;
- self::clearTimer();
- self::$_reconnectTimer = Timer::add(1, 'Channel\Client::connect', array(self::$_remoteIp, self::$_remotePort));
- if (self::$onClose) {
- call_user_func(Client::$onClose);
- }
- }
- /**
- * onRemoteConnect.
- * @return void
- */
- public static function onRemoteConnect()
- {
- $all_event_names = array_keys(self::$_events);
- if($all_event_names)
- {
- self::subscribe($all_event_names);
- }
- self::clearTimer();
- if (self::$onConnect) {
- call_user_func(Client::$onConnect);
- }
- }
- /**
- * clearTimer.
- * @return void
- */
- public static function clearTimer()
- {
- if (!self::$_isWorkermanEnv) {
- throw new \Exception('Channel\\Client not support clearTimer method when it is not in the workerman environment.');
- }
- if(self::$_reconnectTimer)
- {
- Timer::del(self::$_reconnectTimer);
- self::$_reconnectTimer = null;
- }
- }
- /**
- * On.
- * @param string $event
- * @param callback $callback
- * @throws \Exception
- */
- public static function on($event, $callback)
- {
- if (!is_callable($callback)) {
- throw new \Exception('callback is not callable for event.');
- }
- self::$_events[$event] = $callback;
- self::subscribe($event);
- }
- /**
- * Subscribe.
- * @param string $events
- * @return void
- */
- public static function subscribe($events)
- {
- $events = (array)$events;
- self::send(array('type' => 'subscribe', 'channels'=>$events));
- foreach ($events as $event) {
- if(!isset(self::$_events[$event])) {
- self::$_events[$event] = null;
- }
- }
- }
- /**
- * Unsubscribe.
- * @param string $events
- * @return void
- */
- public static function unsubscribe($events)
- {
- $events = (array)$events;
- self::send(array('type' => 'unsubscribe', 'channels'=>$events));
- foreach($events as $event) {
- unset(self::$_events[$event]);
- }
- }
- /**
- * Publish.
- * @param string $events
- * @param mixed $data
- */
- public static function publish($events, $data)
- {
- self::sendAnyway(array('type' => 'publish', 'channels' => (array)$events, 'data' => $data));
- }
- /**
- * Watch a channel of queue
- * @param string|array $channels
- * @param callable $callback
- * @param boolean $autoReserve Auto reserve after callback finished.
- * But sometime you may don't want reserve immediately, or in some asynchronous job,
- * you want reserve in finished callback, so you should set $autoReserve to false
- * and call Client::reserve() after watch() and in finish callback manually.
- * @throws \Exception
- */
- public static function watch($channels, $callback, $autoReserve=true)
- {
- if (!is_callable($callback)) {
- throw new \Exception('callback is not callable for watch.');
- }
- if ($autoReserve) {
- $callback = static function($data) use ($callback) {
- try {
- call_user_func($callback, $data);
- } catch (\Exception $e) {
- throw $e;
- } catch (\Error $e) {
- throw $e;
- } finally {
- self::reserve();
- }
- };
- }
- $channels = (array)$channels;
- self::send(array('type' => 'watch', 'channels'=>$channels));
- foreach ($channels as $channel) {
- self::$_queues[$channel] = $callback;
- }
- if ($autoReserve) {
- self::reserve();
- }
- }
- /**
- * Unwatch a channel of queue
- * @param string $channel
- * @throws \Exception
- */
- public static function unwatch($channels)
- {
- $channels = (array)$channels;
- self::send(array('type' => 'unwatch', 'channels'=>$channels));
- foreach ($channels as $channel) {
- if (isset(self::$_queues[$channel])) {
- unset(self::$_queues[$channel]);
- }
- }
- }
- /**
- * Put data to queue
- * @param string|array $channels
- * @param mixed $data
- * @throws \Exception
- */
- public static function enqueue($channels, $data)
- {
- self::sendAnyway(array('type' => 'enqueue', 'channels' => (array)$channels, 'data' => $data));
- }
- /**
- * Start reserve queue manual
- * @throws \Exception
- */
- public static function reserve()
- {
- self::send(array('type' => 'reserve'));
- }
- /**
- * Send through workerman environment
- * @param $data
- * @throws \Exception
- */
- protected static function send($data)
- {
- if (!self::$_isWorkermanEnv) {
- throw new \Exception("Channel\\Client not support {$data['type']} method when it is not in the workerman environment.");
- }
- self::connect(self::$_remoteIp, self::$_remotePort);
- self::$_remoteConnection->send(serialize($data));
- }
- /**
- * Send from any environment
- * @param $data
- * @throws \Exception
- */
- protected static function sendAnyway($data)
- {
- self::connect(self::$_remoteIp, self::$_remotePort);
- $body = serialize($data);
- if (self::$_isWorkermanEnv) {
- self::$_remoteConnection->send($body);
- } else {
- $buffer = pack('N', 4+strlen($body)) . $body;
- fwrite(self::$_remoteConnection, $buffer);
- }
- }
- }
|