Queue.php 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. <?php
  2. namespace Channel;
  3. use Workerman\Connection\TcpConnection;
  4. class Queue
  5. {
  6. public $name = 'default';
  7. public $watcher = array();
  8. public $consumer = array();
  9. protected $queue = null;
  10. public function __construct($name)
  11. {
  12. $this->name = $name;
  13. $this->queue = new \SplQueue();
  14. }
  15. /**
  16. * @param TcpConnection $connection
  17. */
  18. public function addWatch($connection)
  19. {
  20. if (!isset($this->watcher[$connection->id])) {
  21. $this->watcher[$connection->id] = $connection;
  22. $connection->watchs[] = $this->name;
  23. }
  24. }
  25. /**
  26. * @param TcpConnection $connection
  27. */
  28. public function removeWatch($connection)
  29. {
  30. if (isset($connection->watchs) && in_array($this->name, $connection->watchs)) {
  31. $idx = array_search($this->name, $connection->watchs);
  32. unset($connection->watchs[$idx]);
  33. }
  34. if (isset($this->watcher[$connection->id])) {
  35. unset($this->watcher[$connection->id]);
  36. }
  37. if (isset($this->consumer[$connection->id])) {
  38. unset($this->consumer[$connection->id]);
  39. }
  40. }
  41. /**
  42. * @param TcpConnection $connection
  43. */
  44. public function addConsumer($connection)
  45. {
  46. if (isset($this->watcher[$connection->id]) && !isset($this->consumer[$connection->id])) {
  47. $this->consumer[$connection->id] = $connection;
  48. }
  49. $this->dispatch();
  50. }
  51. public function enqueue($data)
  52. {
  53. $this->queue->enqueue($data);
  54. $this->dispatch();
  55. }
  56. private function dispatch()
  57. {
  58. if ($this->queue->isEmpty() || count($this->consumer) == 0) {
  59. return;
  60. }
  61. while (!$this->queue->isEmpty()) {
  62. $data = $this->queue->dequeue();
  63. $idx = key($this->consumer);
  64. $connection = $this->consumer[$idx];
  65. unset($this->consumer[$idx]);
  66. $connection->send(serialize(array('type'=>'queue', 'channel'=>$this->name, 'data' => $data)));
  67. if (count($this->consumer) == 0) {
  68. break;
  69. }
  70. }
  71. }
  72. public function isEmpty()
  73. {
  74. return empty($this->watcher) && $this->queue->isEmpty();
  75. }
  76. }