RedisQueue.php 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. <?php
  2. /**
  3. * FecShop file.
  4. *
  5. * @link http://www.fecshop.com/
  6. * @copyright Copyright (c) 2016 FecShop Software LLC
  7. * @license http://www.fecshop.com/license/
  8. */
  9. namespace fec\component;
  10. use Yii;
  11. use yii\base\InvalidConfigException;
  12. use yii\redis\Connection;
  13. use fec\component\redisqueue\Queue;
  14. use fec\component\redisqueue\Job;
  15. /**
  16. * @author Terry Zhao <2358269014@qq.com>
  17. * @since 1.0
  18. */
  19. class RedisQueue extends Queue
  20. {
  21. /**
  22. * 'queue' => [
  23. * 'class' => 'fec\component\RedisQueue',
  24. * ],
  25. *
  26. */
  27. /**
  28. * @var string Default redis component name
  29. */
  30. public $redis = 'redis';
  31. /**
  32. * Class initialization logic
  33. *
  34. * @throws InvalidConfigException
  35. */
  36. public function init()
  37. {
  38. parent::init();
  39. if (is_string($this->redis)) {
  40. $this->redis = Yii::$app->get($this->redis);
  41. } elseif (is_array($this->redis)) {
  42. $this->redis = Yii::createObject($this->redis);
  43. }
  44. if (!$this->redis instanceof Connection) {
  45. throw new InvalidConfigException("Queue::redis must be either a Redis connection instance or the application component ID of a Redis connection.");
  46. }
  47. }
  48. protected function pushInternal($payload, $queue = null, $options = [])
  49. {
  50. $this->redis->rpush($this->getQueue($queue), $payload);
  51. $payload = json_decode($payload, true);
  52. return $payload['id'];
  53. }
  54. public function popInternal($queue = null)
  55. {
  56. $payload = $this->redis->lpop($this->getQueue($queue));
  57. if ($payload) {
  58. //$this->redis->zadd($queue.':reserved', $this->getTime() + 60, $job);
  59. return new Job($this, $payload, $queue);
  60. }
  61. return null;
  62. }
  63. }