1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- <?php
- namespace Channel;
- use Workerman\Connection\TcpConnection;
- class Queue
- {
- public $name = 'default';
- public $watcher = array();
- public $consumer = array();
- protected $queue = null;
- public function __construct($name)
- {
- $this->name = $name;
- $this->queue = new \SplQueue();
- }
- /**
- * @param TcpConnection $connection
- */
- public function addWatch($connection)
- {
- if (!isset($this->watcher[$connection->id])) {
- $this->watcher[$connection->id] = $connection;
- $connection->watchs[] = $this->name;
- }
- }
- /**
- * @param TcpConnection $connection
- */
- public function removeWatch($connection)
- {
- if (isset($connection->watchs) && in_array($this->name, $connection->watchs)) {
- $idx = array_search($this->name, $connection->watchs);
- unset($connection->watchs[$idx]);
- }
- if (isset($this->watcher[$connection->id])) {
- unset($this->watcher[$connection->id]);
- }
- if (isset($this->consumer[$connection->id])) {
- unset($this->consumer[$connection->id]);
- }
- }
- /**
- * @param TcpConnection $connection
- */
- public function addConsumer($connection)
- {
- if (isset($this->watcher[$connection->id]) && !isset($this->consumer[$connection->id])) {
- $this->consumer[$connection->id] = $connection;
- }
- $this->dispatch();
- }
- public function enqueue($data)
- {
- $this->queue->enqueue($data);
- $this->dispatch();
- }
- private function dispatch()
- {
- if ($this->queue->isEmpty() || count($this->consumer) == 0) {
- return;
- }
- while (!$this->queue->isEmpty()) {
- $data = $this->queue->dequeue();
- $idx = key($this->consumer);
- $connection = $this->consumer[$idx];
- unset($this->consumer[$idx]);
- $connection->send(serialize(array('type'=>'queue', 'channel'=>$this->name, 'data' => $data)));
- if (count($this->consumer) == 0) {
- break;
- }
- }
- }
- public function isEmpty()
- {
- return empty($this->watcher) && $this->queue->isEmpty();
- }
- }
|