Skip to content

Commit

Permalink
feat: add redis cluster adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
basert committed Oct 31, 2024
1 parent 9175652 commit 575491c
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 5 deletions.
46 changes: 45 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ services:
- ./:/usr/local/src
depends_on:
- redis
- redis-cluster-0
- swoole
- workerman

Expand All @@ -20,6 +21,15 @@ services:
depends_on:
- redis

swoole-redis-cluster:
container_name: swoole-redis-cluster
build: ./tests/Queue/servers/SwooleRedisCluster/.
command: php /usr/src/code/tests/Queue/servers/SwooleRedisCluster/worker.php
volumes:
- ./:/usr/src/code
depends_on:
- redis-cluster-0

workerman:
container_name: workerman
build: ./tests/Queue/servers/Workerman/.
Expand All @@ -33,4 +43,38 @@ services:
container_name: redis
image: "redis:alpine"
ports:
- "6379:6379"
- "6379:6379"

redis-cluster-0:
image: docker.io/bitnami/redis-cluster:7.4
environment:
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_DISABLE_COMMANDS=FLUSHDB,FLUSHALL
- REDIS_NODES=redis-cluster-0 redis-cluster-1 redis-cluster-2 redis-cluster-3
- REDIS_CLUSTER_CREATOR=yes
- REDIS_CLUSTER_REPLICAS=0
depends_on:
- redis-cluster-1
- redis-cluster-2
- redis-cluster-3

redis-cluster-1:
image: docker.io/bitnami/redis-cluster:7.4
environment:
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_DISABLE_COMMANDS=FLUSHDB,FLUSHALL
- REDIS_NODES=redis-cluster-0 redis-cluster-1 redis-cluster-2 redis-cluster-3

redis-cluster-2:
image: docker.io/bitnami/redis-cluster:7.4
environment:
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_DISABLE_COMMANDS=FLUSHDB,FLUSHALL
- REDIS_NODES=redis-cluster-0 redis-cluster-1 redis-cluster-2 redis-cluster-3

redis-cluster-3:
image: docker.io/bitnami/redis-cluster:7.4
environment:
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_DISABLE_COMMANDS=FLUSHDB,FLUSHALL
- REDIS_NODES=redis-cluster-0 redis-cluster-1 redis-cluster-2 redis-cluster-3
178 changes: 178 additions & 0 deletions src/Queue/Connection/RedisCluster.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
<?php

namespace Utopia\Queue\Connection;

use Utopia\Queue\Connection;

class RedisCluster implements Connection
{

protected array $seeds;
protected ?\RedisCluster $redis = null;

public function __construct(array $seeds)
{
$this->seeds = $seeds;
}

public function rightPopLeftPushArray(string $queue, string $destination, int $timeout): array|false
{
$response = $this->rightPopLeftPush($queue, $destination, $timeout);

if (!$response) {
return false;
}

return json_decode($response, true);
}
public function rightPopLeftPush(string $queue, string $destination, int $timeout): string|false
{
$response = $this->getRedis()->bRPopLPush($queue, $destination, $timeout);

if (!$response) {
return false;
}

return $response;
}
public function rightPushArray(string $queue, array $value): bool
{
return !!$this->getRedis()->rPush($queue, json_encode($value));
}

public function rightPush(string $queue, string $value): bool
{
return !!$this->getRedis()->rPush($queue, $value);
}

public function leftPushArray(string $queue, array $value): bool
{
return !!$this->getRedis()->lPush($queue, json_encode($value));
}

public function leftPush(string $queue, string $value): bool
{
return !!$this->getRedis()->lPush($queue, $value);
}

public function rightPopArray(string $queue, int $timeout): array|false
{
$response = $this->rightPop($queue, $timeout);

if ($response === false) {
return false;
}

return json_decode($response, true) ?? false;
}

public function rightPop(string $queue, int $timeout): string|false
{
$response = $this->getRedis()->brPop([$queue], $timeout);

if (empty($response)) {
return false;
}

return $response[1];
}

public function leftPopArray(string $queue, int $timeout): array|false
{
$response = $this->getRedis()->blPop($queue, $timeout);

if (empty($response)) {
return false;
}

return json_decode($response[1], true) ?? false;
}

public function leftPop(string $queue, int $timeout): string|false
{
$response = $this->getRedis()->blPop($queue, $timeout);

if (empty($response)) {
return false;
}

return $response[1];
}

public function listRemove(string $queue, string $key): bool
{
return !!$this->getRedis()->lRem($queue, $key, 1);
}

public function remove(string $key): bool
{
return !!$this->getRedis()->del($key);
}

public function move(string $queue, string $destination): bool
{
return $this->getRedis()->move($queue, $destination);
}

public function setArray(string $key, array $value): bool
{
return $this->set($key, json_encode($value));
}

public function set(string $key, string $value): bool
{
return $this->getRedis()->set($key, $value);
}

public function get(string $key): array|string|null
{
return $this->getRedis()->get($key);
}

public function listSize(string $key): int
{
return $this->getRedis()->lLen($key);
}

public function increment(string $key): int
{
return $this->getRedis()->incr($key);
}

public function decrement(string $key): int
{
return $this->getRedis()->decr($key);
}

public function listRange(string $key, int $total, int $offset): array
{
$start = $offset;
$end = $start + $total - 1;
$results = $this->getRedis()->lRange($key, $start, $end);

return $results;
}

public function ping(): bool
{
try {
foreach ($this->getRedis()->_masters() as $master) {
$this->getRedis()->ping($master);
}

return true;
} catch (Exception $e) {
return false;
}
}

protected function getRedis(): \RedisCluster
{
if ($this->redis) {
return $this->redis;
}

$this->redis = new \RedisCluster(null, $this->seeds);
return $this->redis;
}
}
19 changes: 19 additions & 0 deletions tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

namespace Queue\E2E\Adapter;

use Tests\E2E\Adapter\Base;
use Utopia\Queue\Client;
use Utopia\Queue\Connection\Redis;
use Utopia\Queue\Connection\RedisCluster;

class SwooleRedisClusterTest extends Base
{
protected function getClient(): Client
{
$connection = new RedisCluster(['redis-cluster-0:6379', 'redis-cluster-1:6379', 'redis-cluster-2:6379']);
$client = new Client('swoole-redis-cluster', $connection);

return $client;
}
}
5 changes: 5 additions & 0 deletions tests/Queue/servers/SwooleRedisCluster/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM phpswoole/swoole:php8.1-alpine

RUN apk add autoconf build-base

RUN docker-php-ext-enable redis
32 changes: 32 additions & 0 deletions tests/Queue/servers/SwooleRedisCluster/worker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

require_once __DIR__ . '/../../../../vendor/autoload.php';
require_once __DIR__ . '/../tests.php';

use Utopia\Queue;
use Utopia\Queue\Message;

$connection = new Queue\Connection\RedisCluster(['redis-cluster-0:6379', 'redis-cluster-1:6379', 'redis-cluster-2:6379']);
$adapter = new Queue\Adapter\Swoole($connection, 12, 'swoole-redis-cluster');
$server = new Queue\Server($adapter);

$server->job()
->inject('message')
->action(function (Message $message) {
handleRequest($message);
});

$server
->error()
->inject('error')
->action(function ($th) {
echo $th->getMessage() . PHP_EOL;
});

$server
->workerStart()
->action(function () {
echo "Worker Started" . PHP_EOL;
});

$server->start();
4 changes: 0 additions & 4 deletions tests/Queue/servers/tests.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ function handleRequest(Queue\Message $job): void
case 'test_bool':
assert(is_bool($value));

break;
case 'test_bool':
assert(is_null($value));

break;
case 'test_array':
assert(is_array($value));
Expand Down

0 comments on commit 575491c

Please sign in to comment.