| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 | 
							- <?php
 
- namespace Channel;
 
- use Workerman\Connection\TcpConnection;
 
- class Queue
 
- {
 
-     public $name = 'default';
 
-     public $watcher = array();
 
-     public $consumer = array();
 
-     protected $queue = null;
 
-     public function __construct($name)
 
-     {
 
-         $this->name = $name;
 
-         $this->queue = new \SplQueue();
 
-     }
 
-     /**
 
-      * @param TcpConnection $connection
 
-      */
 
-     public function addWatch($connection)
 
-     {
 
-     	if (!isset($this->watcher[$connection->id])) {
 
- 		    $this->watcher[$connection->id] = $connection;
 
- 		    $connection->watchs[] = $this->name;
 
- 	    }
 
-     }
 
-     /**
 
-      * @param TcpConnection $connection
 
-      */
 
-     public function removeWatch($connection)
 
-     {
 
-         if (isset($connection->watchs) && in_array($this->name, $connection->watchs)) {
 
-         	$idx = array_search($this->name, $connection->watchs);
 
-             unset($connection->watchs[$idx]);
 
-         }
 
-         if (isset($this->watcher[$connection->id])) {
 
-             unset($this->watcher[$connection->id]);
 
-         }
 
-         if (isset($this->consumer[$connection->id])) {
 
-             unset($this->consumer[$connection->id]);
 
-         }
 
-     }
 
- 	/**
 
- 	 * @param TcpConnection $connection
 
- 	 */
 
-     public function addConsumer($connection)
 
-     {
 
-     	if (isset($this->watcher[$connection->id]) && !isset($this->consumer[$connection->id])) {
 
-     		$this->consumer[$connection->id] = $connection;
 
- 	    }
 
- 	    $this->dispatch();
 
-     }
 
-     public function enqueue($data)
 
-     {
 
-     	$this->queue->enqueue($data);
 
-     	$this->dispatch();
 
-     }
 
-     private function dispatch()
 
-     {
 
-     	if ($this->queue->isEmpty() || count($this->consumer) == 0) {
 
-     		return;
 
- 	    }
 
- 		while (!$this->queue->isEmpty()) {
 
-     		$data = $this->queue->dequeue();
 
-     		$idx = key($this->consumer);
 
-     		$connection = $this->consumer[$idx];
 
-     		unset($this->consumer[$idx]);
 
- 	        $connection->send(serialize(array('type'=>'queue', 'channel'=>$this->name, 'data' => $data)));
 
- 	        if (count($this->consumer) == 0) {
 
- 		        break;
 
- 	        }
 
- 		}
 
-     }
 
-     public function isEmpty()
 
-     {
 
-         return empty($this->watcher) && $this->queue->isEmpty();
 
-     }
 
- }
 
 
  |