diff --git a/docker-compose.yml b/docker-compose.yml index 79b626b..fe8d540 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,6 +8,7 @@ services: - ./:/usr/local/src depends_on: - redis + - redis-cluster-0 - swoole - workerman @@ -20,6 +21,15 @@ services: depends_on: - redis + swoole-redis-cluster: + container_name: swoole-redis-cluster + build: ./tests/Queue/servers/SwooleRedisCluster/. + command: php /usr/src/code/tests/Queue/servers/SwooleRedisCluster/worker.php + volumes: + - ./:/usr/src/code + depends_on: + - redis-cluster-0 + workerman: container_name: workerman build: ./tests/Queue/servers/Workerman/. @@ -33,4 +43,38 @@ services: container_name: redis image: "redis:alpine" ports: - - "6379:6379" \ No newline at end of file + - "6379:6379" + + redis-cluster-0: + image: docker.io/bitnami/redis-cluster:7.4 + environment: + - ALLOW_EMPTY_PASSWORD=yes + - REDIS_DISABLE_COMMANDS=FLUSHDB,FLUSHALL + - REDIS_NODES=redis-cluster-0 redis-cluster-1 redis-cluster-2 redis-cluster-3 + - REDIS_CLUSTER_CREATOR=yes + - REDIS_CLUSTER_REPLICAS=0 + depends_on: + - redis-cluster-1 + - redis-cluster-2 + - redis-cluster-3 + + redis-cluster-1: + image: docker.io/bitnami/redis-cluster:7.4 + environment: + - ALLOW_EMPTY_PASSWORD=yes + - REDIS_DISABLE_COMMANDS=FLUSHDB,FLUSHALL + - REDIS_NODES=redis-cluster-0 redis-cluster-1 redis-cluster-2 redis-cluster-3 + + redis-cluster-2: + image: docker.io/bitnami/redis-cluster:7.4 + environment: + - ALLOW_EMPTY_PASSWORD=yes + - REDIS_DISABLE_COMMANDS=FLUSHDB,FLUSHALL + - REDIS_NODES=redis-cluster-0 redis-cluster-1 redis-cluster-2 redis-cluster-3 + + redis-cluster-3: + image: docker.io/bitnami/redis-cluster:7.4 + environment: + - ALLOW_EMPTY_PASSWORD=yes + - REDIS_DISABLE_COMMANDS=FLUSHDB,FLUSHALL + - REDIS_NODES=redis-cluster-0 redis-cluster-1 redis-cluster-2 redis-cluster-3 \ No newline at end of file diff --git a/src/Queue/Connection/RedisCluster.php b/src/Queue/Connection/RedisCluster.php new file mode 100644 index 0000000..e1c901a --- /dev/null +++ b/src/Queue/Connection/RedisCluster.php @@ -0,0 +1,178 @@ +seeds = $seeds; + } + + public function rightPopLeftPushArray(string $queue, string $destination, int $timeout): array|false + { + $response = $this->rightPopLeftPush($queue, $destination, $timeout); + + if (!$response) { + return false; + } + + return json_decode($response, true); + } + public function rightPopLeftPush(string $queue, string $destination, int $timeout): string|false + { + $response = $this->getRedis()->bRPopLPush($queue, $destination, $timeout); + + if (!$response) { + return false; + } + + return $response; + } + public function rightPushArray(string $queue, array $value): bool + { + return !!$this->getRedis()->rPush($queue, json_encode($value)); + } + + public function rightPush(string $queue, string $value): bool + { + return !!$this->getRedis()->rPush($queue, $value); + } + + public function leftPushArray(string $queue, array $value): bool + { + return !!$this->getRedis()->lPush($queue, json_encode($value)); + } + + public function leftPush(string $queue, string $value): bool + { + return !!$this->getRedis()->lPush($queue, $value); + } + + public function rightPopArray(string $queue, int $timeout): array|false + { + $response = $this->rightPop($queue, $timeout); + + if ($response === false) { + return false; + } + + return json_decode($response, true) ?? false; + } + + public function rightPop(string $queue, int $timeout): string|false + { + $response = $this->getRedis()->brPop([$queue], $timeout); + + if (empty($response)) { + return false; + } + + return $response[1]; + } + + public function leftPopArray(string $queue, int $timeout): array|false + { + $response = $this->getRedis()->blPop($queue, $timeout); + + if (empty($response)) { + return false; + } + + return json_decode($response[1], true) ?? false; + } + + public function leftPop(string $queue, int $timeout): string|false + { + $response = $this->getRedis()->blPop($queue, $timeout); + + if (empty($response)) { + return false; + } + + return $response[1]; + } + + public function listRemove(string $queue, string $key): bool + { + return !!$this->getRedis()->lRem($queue, $key, 1); + } + + public function remove(string $key): bool + { + return !!$this->getRedis()->del($key); + } + + public function move(string $queue, string $destination): bool + { + return $this->getRedis()->move($queue, $destination); + } + + public function setArray(string $key, array $value): bool + { + return $this->set($key, json_encode($value)); + } + + public function set(string $key, string $value): bool + { + return $this->getRedis()->set($key, $value); + } + + public function get(string $key): array|string|null + { + return $this->getRedis()->get($key); + } + + public function listSize(string $key): int + { + return $this->getRedis()->lLen($key); + } + + public function increment(string $key): int + { + return $this->getRedis()->incr($key); + } + + public function decrement(string $key): int + { + return $this->getRedis()->decr($key); + } + + public function listRange(string $key, int $total, int $offset): array + { + $start = $offset; + $end = $start + $total - 1; + $results = $this->getRedis()->lRange($key, $start, $end); + + return $results; + } + + public function ping(): bool + { + try { + foreach ($this->getRedis()->_masters() as $master) { + $this->getRedis()->ping($master); + } + + return true; + } catch (Exception $e) { + return false; + } + } + + protected function getRedis(): \RedisCluster + { + if ($this->redis) { + return $this->redis; + } + + $this->redis = new \RedisCluster(null, $this->seeds); + return $this->redis; + } +} diff --git a/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php b/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php new file mode 100644 index 0000000..ea2d420 --- /dev/null +++ b/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php @@ -0,0 +1,19 @@ +job() + ->inject('message') + ->action(function (Message $message) { + handleRequest($message); + }); + +$server + ->error() + ->inject('error') + ->action(function ($th) { + echo $th->getMessage() . PHP_EOL; + }); + +$server + ->workerStart() + ->action(function () { + echo "Worker Started" . PHP_EOL; + }); + +$server->start(); diff --git a/tests/Queue/servers/tests.php b/tests/Queue/servers/tests.php index 8380cdb..f663d86 100644 --- a/tests/Queue/servers/tests.php +++ b/tests/Queue/servers/tests.php @@ -22,10 +22,6 @@ function handleRequest(Queue\Message $job): void case 'test_bool': assert(is_bool($value)); - break; - case 'test_bool': - assert(is_null($value)); - break; case 'test_array': assert(is_array($value));