QueueController.php 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. <?php
  2. namespace fec\component\redisqueue;
  3. use Yii;
  4. use yii\console\Controller;
  5. /**
  6. * Queue Process Command
  7. *
  8. * Class QueueController
  9. * @package wh\queue\console\controllers
  10. */
  11. class QueueController extends Controller
  12. {
  13. private $timeout;
  14. private $sleep=5;
  15. /**
  16. * Process a job
  17. *
  18. * @param string $queueName
  19. * @param string $queueObjectName
  20. * @throws \Exception
  21. */
  22. public function actionWork($queueName = null, $queueObjectName = 'queue')
  23. {
  24. $this->process($queueName, $queueObjectName);
  25. }
  26. /**
  27. * Continuously process jobs
  28. *
  29. * @param string $queueName
  30. * @param string $queueObjectName
  31. * @throws \Exception
  32. */
  33. public function actionListen($queueName = null, $queueObjectName = 'queue')
  34. {
  35. while (true) {
  36. if ($this->timeout !==null) {
  37. if ($this->timeout<time()) {
  38. return true;
  39. }
  40. }
  41. if (!$this->process($queueName, $queueObjectName)) {
  42. sleep($this->sleep);
  43. }
  44. }
  45. }
  46. protected function process($queueName, $queueObjectName)
  47. {
  48. $queue = Yii::$app->{$queueObjectName};
  49. $job = $queue->pop($queueName);
  50. if ($job) {
  51. try {
  52. $job->run();
  53. return true;
  54. } catch (\Exception $e) {
  55. if ($queue->debug) {
  56. var_dump($e);
  57. }
  58. Yii::error($e->getMessage(), __METHOD__);
  59. }
  60. }
  61. return false;
  62. }
  63. public function beforeAction($action)
  64. {
  65. if (!parent::beforeAction($action)) {
  66. return false;
  67. }
  68. if (getenv('QUEUE_TIMEOUT')) {
  69. $this->timeout=(int)getenv('QUEUE_TIMEOUT')+time();
  70. }
  71. if (getenv('QUEUE_SLEEP')) {
  72. $this->sleep=(int)getenv('QUEUE_SLEEP');
  73. }
  74. return true;
  75. }
  76. }