Skip to content

Commit

Permalink
Merge pull request #2 from utopia-php/fix-adapter-interface
Browse files Browse the repository at this point in the history
fix(adapters): consistent interface
  • Loading branch information
eldadfux authored Jul 7, 2021
2 parents d08b0b1 + 23339d8 commit fdb82f0
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 74 deletions.
20 changes: 15 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,21 @@ $adapter = new WebSocket\Adapter\Swoole();
$adapter->setPackageMaxLength(64000);

$server = new WebSocket\Server($adapter);
$server->onStart(/* callback */);
$server->onWorkerStart(/* callback */);
$server->onMessage(/* callback */);
$server->onOpen(/* callback */);
$server->onClose(/* callback */);
$server->onStart(function () {
echo "Server started!";
});
$server->onWorkerStart(function (int $workerId) {
echo "Worker {$workerId} started!";
});
$server->onOpen(function (int $connection, $request) {
echo "Connection {$connection} established!";
});
$server->onMessage(function (int $connection, string $message) {
echo "Message from {$connection}: {$message}";
});
$server->onClose(function (int $connection) {
echo "Connection {$workerId} closed!";
});

$server->start();
```
Expand Down
2 changes: 2 additions & 0 deletions psalm.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
<file name="vendor/swoole/ide-helper/output/swoole/constants.php"/>
<file name="vendor/swoole/ide-helper/output/swoole/namespace/Server.php"/>
<file name="vendor/swoole/ide-helper/output/swoole/namespace/Http/Server.php"/>
<file name="vendor/swoole/ide-helper/output/swoole/namespace/Http/Request.php"/>
<file name="vendor/swoole/ide-helper/output/swoole/namespace/WebSocket/Frame.php"/>
<file name="vendor/swoole/ide-helper/output/swoole/namespace/WebSocket/Server.php"/>
</stubs>
</psalm>
33 changes: 31 additions & 2 deletions src/WebSocket/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ public abstract function send(array $connections, string $message): void;

/**
* Closes a connection.
* @param string $connection Connection ID.
* @param int $connection Connection ID.
* @param int $code Close Code.
* @return void
*/
public abstract function close(string $connection, int $code): void;
public abstract function close(int $connection, int $code): void;

/**
* Is called when the Server starts.
Expand Down Expand Up @@ -85,7 +85,36 @@ public abstract function onMessage(callable $callback): self;
*/
public abstract function onClose(callable $callback): self;

/**
* Sets maximum package length in bytes.
* @param int $bytes
* @return Adapter
*/
public abstract function setPackageMaxLength(int $bytes): self;

/**
* Enables/Disables compression.
* @param bool $enabled
* @return Adapter
*/
public abstract function setCompressionEnabled(bool $enabled): self;

/**
* Sets the number of workers.
* @param int $num
* @return Adapter
*/
public abstract function setWorkerNumber(int $num): self;

/**
* Returns the native server object from the Adapter.
* @return mixed
*/
public abstract function getNative(): mixed;

/**
* Returns all connections.
* @return array
*/
public abstract function getConnections(): array;
}
50 changes: 42 additions & 8 deletions src/WebSocket/Adapter/Swoole.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

namespace Utopia\WebSocket\Adapter;

use Swoole\Http\Request;
use Swoole\Process;
use Swoole\WebSocket\Frame;
use Swoole\WebSocket\Server;
use Utopia\WebSocket\Adapter;

/**
Expand All @@ -10,16 +14,18 @@
*/
class Swoole extends Adapter
{
protected \Swoole\WebSocket\Server $server;
protected Server $server;

protected string $host;
protected int $port;

private static array $connections = [];

public function __construct(string $host = '0.0.0.0', int $port = 80)
{
parent::__construct($host, $port);

$this->server = new \Swoole\WebSocket\Server($this->host, $this->port);
$this->server = new Server($this->host, $this->port);
}

public function start(): void
Expand Down Expand Up @@ -49,38 +55,56 @@ public function send(array $connections, string $message): void
}
}

public function close(string $connection, int $code): void
public function close(int $connection, int $code): void
{
$this->server->close($connection);
}

public function onStart(callable $callback): self
{
$this->server->on('start', $callback);
$this->server->on('start', function () use ($callback) {
call_user_func($callback);

Process::signal(2, function () {
$this->shutdown();
});
});
return $this;
}

public function onWorkerStart(callable $callback): self
{
$this->server->on('workerStart', $callback);
$this->server->on('workerStart', function(Server $server, int $workerId) use ($callback) {
call_user_func($callback, $workerId);
});
return $this;
}

public function onOpen(callable $callback): self
{
$this->server->on('open', $callback);
$this->server->on('open', function (Server $server, Request $request) use ($callback) {
self::$connections[$request->fd] = true;

call_user_func($callback, $request->fd, $request);
});
return $this;
}

public function onMessage(callable $callback): self
{
$this->server->on('message', $callback);
$this->server->on('message', function (Server $server, Frame $frame) use ($callback) {
call_user_func($callback, $frame->fd, $frame->data);
});
return $this;
}

public function onClose(callable $callback): self
{
$this->server->on('close', $callback);
$this->server->on('close', function (Server $server, int $fd) use ($callback) {
unset(self::$connections[$fd]);

call_user_func($callback, $fd);
});
return $this;
}

Expand All @@ -101,4 +125,14 @@ public function setWorkerNumber(int $num): self
$this->config['worker_num'] = $num;
return $this;
}

public function getNative(): \Swoole\WebSocket\Server
{
return $this->server;
}

public function getConnections(): array
{
return array_keys(self::$connections);
}
}
55 changes: 38 additions & 17 deletions src/WebSocket/Adapter/Workerman.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,79 +4,90 @@

use Utopia\WebSocket\Adapter;
use Workerman\Connection\TcpConnection;
use Workerman\Worker;

/**
*
* @package Utopia\WebSocket\Adapter
*/
class Workerman extends Adapter
{
protected \Workerman\Worker $server;
protected Worker $server;

protected string $host;
protected int $port;

private mixed $callbackOnStart;

public function __construct(string $host = '0.0.0.0', int $port = 80)
{
parent::__construct($host, $port);

$this->server = new \Workerman\Worker("websocket://{$this->host}:{$this->port}");
$this->server = new Worker("websocket://{$this->host}:{$this->port}");
}

public function start(): void
{
\Workerman\Worker::runAll();
Worker::runAll();
call_user_func($this->callbackOnStart);
}

public function shutdown(): void
{
\Workerman\Worker::stopAll();
Worker::stopAll();
}

public function send(array $connections, string $message): void
{
foreach ($connections as $connection) {
$connection->send($message);
TcpConnection::$connections[$connection]->send($message);
}
}

/**
*
* @param TcpConnection $connection
* @param int $code
* @return void
*/
public function close($connection, int $code): void
public function close(int $connection, int $code): void
{
$connection->close();
TcpConnection::$connections[$connection]->close();
}

public function onStart(callable $callback): self
{
$this->callbackOnStart = $callback;
return $this;
}

public function onWorkerStart(callable $callback): self
{
$this->server->onWorkerStart = $callback;
$this->server->onWorkerStart = function(Worker $worker) use ($callback): void {
call_user_func($callback, $worker->id);
};
return $this;
}

public function onOpen(callable $callback): self
{
$this->server->onConnect = $callback;
$this->server->onConnect = function (mixed $connection) use ($callback): void {
$connection->onWebSocketConnect = function(TcpConnection $connection) use ($callback): void
{
/** @var array $_SERVER */
call_user_func($callback, $connection->id, $_SERVER);
};
};
return $this;
}

public function onMessage(callable $callback): self
{
$this->server->onMessage = $callback;
$this->server->onMessage = function (TcpConnection $connection, string $data) use ($callback): void {
call_user_func($callback, $connection->id, $data);
};
return $this;
}

public function onClose(callable $callback): self
{
$this->server->onClose = $callback;
$this->server->onClose = function (TcpConnection $connection) use ($callback): void {
call_user_func($callback, $connection->id);
};
return $this;
}

Expand All @@ -95,4 +106,14 @@ public function setWorkerNumber(int $num): self
$this->server->count = $num;
return $this;
}

public function getNative(): Worker
{
return $this->server;
}

public function getConnections(): array
{
return array_keys(TcpConnection::$connections);
}
}
14 changes: 12 additions & 2 deletions src/WebSocket/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ public function send(array $connections, string $message): void

/**
* Closes a connection.
* @param string $connection Connection ID.
* @param int $connection Connection ID.
* @param int $code Close Code.
* @return void
*/
public function close($connection, int $code): void
public function close(int $connection, int $code): void
{
$this->adapter->close($connection, $code);
}
Expand Down Expand Up @@ -120,4 +120,14 @@ public function onClose(callable $callback): self
$this->adapter->onClose($callback);
return $this;
}

/**
* Returns all connections.
* @param callable $callback
* @return array
*/
public function getConnections(): array
{
return $this->adapter->getConnections();
}
}
4 changes: 1 addition & 3 deletions tests/e2e/AdapterTest.php
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
<?php
use PHPUnit\Framework\TestCase;
use WebSocket\Client as WebSocketClient;
use WebSocket\ConnectionException;
use WebSocket\TimeoutException;

class SwooleTest extends TestCase
{
private function getWebsocket(string $server, int $port): WebSocketClient
{
return new WebSocketClient('ws://'.$server.':'.$port.'/v1/realtime', [
return new WebSocketClient('ws://'.$server.':'.$port, [
'timeout' => 10,
]);
}
Expand Down
Loading

0 comments on commit fdb82f0

Please sign in to comment.