-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #23 from utopia-php/redis-cluster
feat: add redis cluster adapter
- Loading branch information
Showing
6 changed files
with
279 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
<?php | ||
|
||
namespace Utopia\Queue\Connection; | ||
|
||
use Utopia\Queue\Connection; | ||
|
||
class RedisCluster implements Connection | ||
{ | ||
|
||
protected array $seeds; | ||
protected ?\RedisCluster $redis = null; | ||
|
||
public function __construct(array $seeds) | ||
{ | ||
$this->seeds = $seeds; | ||
} | ||
|
||
public function rightPopLeftPushArray(string $queue, string $destination, int $timeout): array|false | ||
{ | ||
$response = $this->rightPopLeftPush($queue, $destination, $timeout); | ||
|
||
if (!$response) { | ||
return false; | ||
} | ||
|
||
return json_decode($response, true); | ||
} | ||
public function rightPopLeftPush(string $queue, string $destination, int $timeout): string|false | ||
{ | ||
$response = $this->getRedis()->bRPopLPush($queue, $destination, $timeout); | ||
|
||
if (!$response) { | ||
return false; | ||
} | ||
|
||
return $response; | ||
} | ||
public function rightPushArray(string $queue, array $value): bool | ||
{ | ||
return !!$this->getRedis()->rPush($queue, json_encode($value)); | ||
} | ||
|
||
public function rightPush(string $queue, string $value): bool | ||
{ | ||
return !!$this->getRedis()->rPush($queue, $value); | ||
} | ||
|
||
public function leftPushArray(string $queue, array $value): bool | ||
{ | ||
return !!$this->getRedis()->lPush($queue, json_encode($value)); | ||
} | ||
|
||
public function leftPush(string $queue, string $value): bool | ||
{ | ||
return !!$this->getRedis()->lPush($queue, $value); | ||
} | ||
|
||
public function rightPopArray(string $queue, int $timeout): array|false | ||
{ | ||
$response = $this->rightPop($queue, $timeout); | ||
|
||
if ($response === false) { | ||
return false; | ||
} | ||
|
||
return json_decode($response, true) ?? false; | ||
} | ||
|
||
public function rightPop(string $queue, int $timeout): string|false | ||
{ | ||
$response = $this->getRedis()->brPop([$queue], $timeout); | ||
|
||
if (empty($response)) { | ||
return false; | ||
} | ||
|
||
return $response[1]; | ||
} | ||
|
||
public function leftPopArray(string $queue, int $timeout): array|false | ||
{ | ||
$response = $this->getRedis()->blPop($queue, $timeout); | ||
|
||
if (empty($response)) { | ||
return false; | ||
} | ||
|
||
return json_decode($response[1], true) ?? false; | ||
} | ||
|
||
public function leftPop(string $queue, int $timeout): string|false | ||
{ | ||
$response = $this->getRedis()->blPop($queue, $timeout); | ||
|
||
if (empty($response)) { | ||
return false; | ||
} | ||
|
||
return $response[1]; | ||
} | ||
|
||
public function listRemove(string $queue, string $key): bool | ||
{ | ||
return !!$this->getRedis()->lRem($queue, $key, 1); | ||
} | ||
|
||
public function remove(string $key): bool | ||
{ | ||
return !!$this->getRedis()->del($key); | ||
} | ||
|
||
public function move(string $queue, string $destination): bool | ||
{ | ||
return $this->getRedis()->move($queue, $destination); | ||
} | ||
|
||
public function setArray(string $key, array $value): bool | ||
{ | ||
return $this->set($key, json_encode($value)); | ||
} | ||
|
||
public function set(string $key, string $value): bool | ||
{ | ||
return $this->getRedis()->set($key, $value); | ||
} | ||
|
||
public function get(string $key): array|string|null | ||
{ | ||
return $this->getRedis()->get($key); | ||
} | ||
|
||
public function listSize(string $key): int | ||
{ | ||
return $this->getRedis()->lLen($key); | ||
} | ||
|
||
public function increment(string $key): int | ||
{ | ||
return $this->getRedis()->incr($key); | ||
} | ||
|
||
public function decrement(string $key): int | ||
{ | ||
return $this->getRedis()->decr($key); | ||
} | ||
|
||
public function listRange(string $key, int $total, int $offset): array | ||
{ | ||
$start = $offset; | ||
$end = $start + $total - 1; | ||
$results = $this->getRedis()->lRange($key, $start, $end); | ||
|
||
return $results; | ||
} | ||
|
||
public function ping(): bool | ||
{ | ||
try { | ||
foreach ($this->getRedis()->_masters() as $master) { | ||
$this->getRedis()->ping($master); | ||
} | ||
|
||
return true; | ||
} catch (Exception $e) { | ||
return false; | ||
} | ||
} | ||
|
||
protected function getRedis(): \RedisCluster | ||
{ | ||
if ($this->redis) { | ||
return $this->redis; | ||
} | ||
|
||
$this->redis = new \RedisCluster(null, $this->seeds); | ||
return $this->redis; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
<?php | ||
|
||
namespace Queue\E2E\Adapter; | ||
|
||
use Tests\E2E\Adapter\Base; | ||
use Utopia\Queue\Client; | ||
use Utopia\Queue\Connection\Redis; | ||
use Utopia\Queue\Connection\RedisCluster; | ||
|
||
class SwooleRedisClusterTest extends Base | ||
{ | ||
protected function getClient(): Client | ||
{ | ||
$connection = new RedisCluster(['redis-cluster-0:6379', 'redis-cluster-1:6379', 'redis-cluster-2:6379']); | ||
$client = new Client('swoole-redis-cluster', $connection); | ||
|
||
return $client; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
FROM phpswoole/swoole:php8.1-alpine | ||
|
||
RUN apk add autoconf build-base | ||
|
||
RUN docker-php-ext-enable redis |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
<?php | ||
|
||
require_once __DIR__ . '/../../../../vendor/autoload.php'; | ||
require_once __DIR__ . '/../tests.php'; | ||
|
||
use Utopia\Queue; | ||
use Utopia\Queue\Message; | ||
|
||
$connection = new Queue\Connection\RedisCluster(['redis-cluster-0:6379', 'redis-cluster-1:6379', 'redis-cluster-2:6379']); | ||
$adapter = new Queue\Adapter\Swoole($connection, 12, 'swoole-redis-cluster'); | ||
$server = new Queue\Server($adapter); | ||
|
||
$server->job() | ||
->inject('message') | ||
->action(function (Message $message) { | ||
handleRequest($message); | ||
}); | ||
|
||
$server | ||
->error() | ||
->inject('error') | ||
->action(function ($th) { | ||
echo $th->getMessage() . PHP_EOL; | ||
}); | ||
|
||
$server | ||
->workerStart() | ||
->action(function () { | ||
echo "Worker Started" . PHP_EOL; | ||
}); | ||
|
||
$server->start(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters