diff --git a/composer.json b/composer.json
index 9e3a4a9..08fcf85 100644
--- a/composer.json
+++ b/composer.json
@@ -20,7 +20,7 @@
"doctrine/dbal": "^3.2"
},
"require-dev": {
- "openswoole/ide-helper": "^4.9",
+ "openswoole/ide-helper": "^v4.10.0.x-dev",
"opsway/psr12-strict-coding-standard": "^0.7.0",
"vimeo/psalm": "^4.22",
"weirdan/doctrine-psalm-plugin": "^2.0"
diff --git a/composer.lock b/composer.lock
index 3c64a3c..c1f94e1 100644
--- a/composer.lock
+++ b/composer.lock
@@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
- "content-hash": "c39204fa88ae5e3f7648c52328679550",
+ "content-hash": "1aed5eb8431bcc9531557025833a8808",
"packages": [
{
"name": "beberlei/assert",
@@ -1353,18 +1353,21 @@
},
{
"name": "openswoole/ide-helper",
- "version": "4.10.2",
+ "version": "4.11.4",
"source": {
"type": "git",
"url": "https://github.com/openswoole/ide-helper.git",
- "reference": "26667488bf23c9359b6b5ee8bd96db425d8358f9"
+ "reference": "80ec936d4341f505edef6054211379616e6a2529"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/openswoole/ide-helper/zipball/26667488bf23c9359b6b5ee8bd96db425d8358f9",
- "reference": "26667488bf23c9359b6b5ee8bd96db425d8358f9",
+ "url": "https://api.github.com/repos/openswoole/ide-helper/zipball/80ec936d4341f505edef6054211379616e6a2529",
+ "reference": "80ec936d4341f505edef6054211379616e6a2529",
"shasum": ""
},
+ "require": {
+ "php": ">=7.4"
+ },
"require-dev": {
"friendsofphp/php-cs-fixer": "^3.3"
},
@@ -1376,15 +1379,15 @@
"authors": [
{
"name": "Open Swoole Group",
- "email": "hello@swoole.co.uk"
+ "email": "hello@openswoole.com"
}
],
"description": "IDE help files for Open Swoole.",
"support": {
"issues": "https://github.com/openswoole/ide-helper/issues",
- "source": "https://github.com/openswoole/ide-helper/tree/4.10.2"
+ "source": "https://github.com/openswoole/ide-helper/tree/4.11.4"
},
- "time": "2022-02-25T15:08:21+00:00"
+ "time": "2022-05-24T11:03:39+00:00"
},
{
"name": "opsway/psr12-strict-coding-standard",
@@ -2825,7 +2828,9 @@
],
"aliases": [],
"minimum-stability": "stable",
- "stability-flags": [],
+ "stability-flags": {
+ "openswoole/ide-helper": 20
+ },
"prefer-stable": false,
"prefer-lowest": false,
"platform": {
diff --git a/example/cli.php b/example/cli.php
index 1216c81..a084324 100644
--- a/example/cli.php
+++ b/example/cli.php
@@ -20,7 +20,7 @@
'delay' => 1, // after this time
]
];
-$pool = (new \OpsWay\Doctrine\DBAL\Swoole\PgSQL\ConnectionPullFactory())($connectionParams);
+$pool = (new \OpsWay\Doctrine\DBAL\Swoole\PgSQL\ConnectionPoolFactory())($connectionParams);
$configuration = new \Doctrine\DBAL\Configuration();
$configuration->setMiddlewares(
[new \OpsWay\Doctrine\DBAL\Swoole\PgSQL\DriverMiddleware($pool)]
diff --git a/example/server.php b/example/server.php
index e7a8348..0a7f4a8 100644
--- a/example/server.php
+++ b/example/server.php
@@ -23,7 +23,7 @@
'delay' => 1, // after this time
]
];
-$pool = (new \OpsWay\Doctrine\DBAL\Swoole\PgSQL\ConnectionPullFactory())($connectionParams);
+$pool = (new \OpsWay\Doctrine\DBAL\Swoole\PgSQL\ConnectionPoolFactory())($connectionParams);
$configuration = new \Doctrine\DBAL\Configuration();
$configuration->setMiddlewares(
[new \OpsWay\Doctrine\DBAL\Swoole\PgSQL\DriverMiddleware($pool)]
diff --git a/phpcs.xml b/phpcs.xml
index 7c1dcd6..6b97f0e 100644
--- a/phpcs.xml
+++ b/phpcs.xml
@@ -15,5 +15,7 @@
src
-
+
+
+
diff --git a/src/Swoole/PgSQL/Connection.php b/src/Swoole/PgSQL/Connection.php
index 1f875a8..9652f86 100644
--- a/src/Swoole/PgSQL/Connection.php
+++ b/src/Swoole/PgSQL/Connection.php
@@ -5,25 +5,39 @@
namespace OpsWay\Doctrine\DBAL\Swoole\PgSQL;
use Doctrine\DBAL\Driver\Connection as ConnectionInterface;
+use Doctrine\DBAL\Exception as DBALException;
use Doctrine\DBAL\ParameterType;
use OpsWay\Doctrine\DBAL\SQLParserUtils;
use OpsWay\Doctrine\DBAL\Swoole\PgSQL\Exception\ConnectionException;
-use OpsWay\Doctrine\DBAL\Swoole\PgSQL\Exception\DriverException as SwooleDriverException;
+use OpsWay\Doctrine\DBAL\Swoole\PgSQL\Exception\DriverException;
+use Swoole\Coroutine as Co;
+use Swoole\Coroutine\PostgreSQL;
+use Throwable;
+use WeakMap;
+use function defer;
use function is_resource;
use function strlen;
use function substr;
+use function time;
+use function trim;
final class Connection implements ConnectionInterface
{
- public function __construct(private ConnectionWrapperInterface $connection)
+ /** @psalm-var array */
+ private array $internalStorage = [];
+ private WeakMap $statsStorage;
+
+ public function __construct(private ConnectionPool $pool, private int $retryDelay, private int $maxAttempts)
{
+ $this->statsStorage = new WeakMap();
+ defer(fn () => $this->onDefer());
}
/**
* {@inheritdoc}
*
- * @throws SwooleDriverException
+ * @throws DriverException
*/
public function prepare(string $sql) : Statement
{
@@ -39,8 +53,9 @@ public function prepare(string $sql) : Statement
$posShift += strlen($placeholder) - 1;
$i++;
}
+ $connection = $this->getNativeConnection();
- return new Statement($this->connection, $sql);
+ return new Statement($connection, $sql, $this->connectionStats($connection));
}
/**
@@ -48,13 +63,18 @@ public function prepare(string $sql) : Statement
*/
public function query(string $sql) : Result
{
- $resource = $this->connection->query($sql);
+ $connection = $this->getNativeConnection();
+ $resource = $connection->query($sql);
+ $stats = $this->connectionStats($connection);
+ if ($stats instanceof ConnectionStats) {
+ $stats->counter++;
+ }
if (! is_resource($resource)) {
- throw ConnectionException::fromConnection($this->connection);
+ throw ConnectionException::fromConnection($connection);
}
- return new Result($this->connection, $resource);
+ return new Result($this->getNativeConnection(), $resource);
}
/**
@@ -65,7 +85,7 @@ public function query(string $sql) : Result
*/
public function quote($value, $type = ParameterType::STRING) : string
{
- return "'" . (string) $this->connection->escape($value) . "'";
+ return "'" . (string) $this->getNativeConnection()->escape($value) . "'";
}
/**
@@ -73,13 +93,18 @@ public function quote($value, $type = ParameterType::STRING) : string
*/
public function exec(string $sql) : int
{
- $query = $this->connection->query($sql);
+ $connection = $this->getNativeConnection();
+ $query = $connection->query($sql);
+ $stats = $this->connectionStats($connection);
+ if ($stats instanceof ConnectionStats) {
+ $stats->counter++;
+ }
if (! is_resource($query)) {
- throw ConnectionException::fromConnection($this->connection);
+ throw ConnectionException::fromConnection($this->getNativeConnection());
}
- return $this->connection->affectedRows($query);
+ return (int) $this->getNativeConnection()->affectedRows($query);
}
/**
@@ -101,7 +126,7 @@ public function lastInsertId($name = null) : string
*/
public function beginTransaction() : bool
{
- $this->connection->query('START TRANSACTION');
+ $this->getNativeConnection()->query('START TRANSACTION');
return true;
}
@@ -111,7 +136,7 @@ public function beginTransaction() : bool
*/
public function commit() : bool
{
- $this->connection->query('COMMIT');
+ $this->getNativeConnection()->query('COMMIT');
return true;
}
@@ -121,18 +146,85 @@ public function commit() : bool
*/
public function rollBack() : bool
{
- $this->connection->query('ROLLBACK');
+ $this->getNativeConnection()->query('ROLLBACK');
return true;
}
public function errorCode() : int
{
- return $this->connection->errorCode();
+ return (int) $this->getNativeConnection()->errCode;
}
public function errorInfo() : string
{
- return $this->connection->error();
+ return (string) $this->getNativeConnection()->error;
+ }
+
+ public function getNativeConnection() : PostgreSQL
+ {
+ $connection = $this->internalStorage[Co::getCid()] ?? null;
+ if (! $connection instanceof PostgreSQL) {
+ $lastException = null;
+ for ($i = 0; $i < $this->maxAttempts; $i++) {
+ try {
+ /** @psalm-suppress MissingDependency */
+ [$connection, $stats] = $this->pool->get(2);
+ if (! $connection instanceof PostgreSQL) {
+ throw new DriverException('No connect available in pull');
+ }
+ /** @var resource|bool $query */
+ $query = $connection->query('SELECT 1');
+ $affectedRows = is_resource($query) ? (int) $connection->affectedRows($query) : 0;
+ if ($affectedRows !== 1) {
+ $errCode = trim($connection->errCode);
+ throw new ConnectionException(
+ "Connection ping failed. Trying reconnect (attempt $i). Reason: $errCode"
+ );
+ }
+ $this->internalStorage[Co::getCid()] = $connection;
+ $this->statsStorage[$connection] = $stats;
+
+ break;
+ } catch (Throwable $e) {
+ $errCode = '';
+ if ($connection instanceof PostgreSQL) {
+ $errCode = $connection->errCode;
+ $connection = null;
+ }
+ $lastException = $e instanceof DBALException
+ ? $e
+ : new ConnectionException($e->getMessage(), (string) $errCode, '', (int) $e->getCode(), $e);
+ Co::sleep($this->retryDelay); // Sleep s after failure
+ }
+ }
+ if (! $connection instanceof PostgreSQL) {
+ $lastException instanceof Throwable
+ ? throw $lastException
+ : throw new ConnectionException('Connection could not be initiated');
+ }
+ }
+
+ return $this->internalStorage[Co::getCid()];
+ }
+
+ public function connectionStats(PostgreSQL $connection) : ?ConnectionStats
+ {
+ return $this->statsStorage[$connection] ?? null;
+ }
+
+ private function onDefer() : void
+ {
+ $connection = $this->internalStorage[Co::getCid()] ?: null;
+ if (! $connection instanceof PostgreSQL) {
+ return;
+ }
+ $stats = $this->connectionStats($connection);
+ if ($stats instanceof ConnectionStats) {
+ $stats->lastInteraction = time();
+ }
+ $this->pool->put($connection);
+ unset($this->internalStorage[Co::getCid()]);
+ $this->statsStorage->offsetUnset($connection);
}
}
diff --git a/src/Swoole/PgSQL/ConnectionPool.php b/src/Swoole/PgSQL/ConnectionPool.php
new file mode 100644
index 0000000..c0da33e
--- /dev/null
+++ b/src/Swoole/PgSQL/ConnectionPool.php
@@ -0,0 +1,123 @@
+size < 0) {
+ throw new DriverException('Expected, connection pull size > 0');
+ }
+ $this->pool = new Channel($this->size);
+ $this->map = new WeakMap();
+ }
+
+ /** @psalm-return array{0 : PostgreSQL|null, 1 : ConnectionStats|null } */
+ public function get(float $timeout = -1) : array
+ {
+ $connection = $this->pool->pop($timeout);
+ if (! $connection instanceof PostgreSQL) {
+ /** try to fill pull with new connect */
+ $this->make();
+ $connection = $this->pool->pop($timeout);
+ }
+ if (! $connection instanceof PostgreSQL) {
+ return [null, null];
+ }
+
+ return [
+ $connection,
+ $this->map[$connection] ?? throw new DriverException('Connection stats could not be empty'),
+ ];
+ }
+
+ public function put(PostgreSQL $connection) : void
+ {
+ if (! $this->map->offsetExists($connection)) {
+ return;
+ }
+ if ($this->pool->isFull()) {
+ $this->remove($connection);
+
+ return;
+ }
+ /** @psalm-var ConnectionStats|null $stats */
+ $stats = $this->map[$connection] ?? null;
+ if (! $stats || $stats->isOverdue()) {
+ $this->remove($connection);
+
+ return;
+ }
+ $this->pool->push($connection);
+ }
+
+ public function close() : void
+ {
+ $this->pool->close();
+ $this->pool = null;
+ $this->map = null;
+ }
+
+ public function capacity() : int
+ {
+ return $this->pool->capacity;
+ }
+
+ /**
+ * Exclude object data from doctrine cache serialization
+ *
+ * @see vendor/doctrine/dbal/src/Cache/QueryCacheProfile.php:127
+ */
+ public function __serialize() : array
+ {
+ return [];
+ }
+
+ /**
+ * @param string $data
+ */
+ public function __unserialize($data) : void
+ {
+ // Do nothing
+ }
+
+ private function remove(PostgreSQL $connection) : void
+ {
+ $this->map->offsetUnset($connection);
+ unset($connection);
+ }
+
+ private function make() : void
+ {
+ if ($this->pool->capacity === $this->map->count()) {
+ return;
+ }
+ try {
+ $connection = ($this->constructor)();
+ } catch (Throwable) {
+ throw new Exception('Could not initialize connection with constructor');
+ }
+ $this->map[$connection] = new ConnectionStats(time(), 1, $this->connectionTtl, $this->connectionUseLimit);
+ $this->put($connection);
+ }
+}
diff --git a/src/Swoole/PgSQL/ConnectionPullFactory.php b/src/Swoole/PgSQL/ConnectionPoolFactory.php
similarity index 54%
rename from src/Swoole/PgSQL/ConnectionPullFactory.php
rename to src/Swoole/PgSQL/ConnectionPoolFactory.php
index c7d1cfd..f6fe323 100644
--- a/src/Swoole/PgSQL/ConnectionPullFactory.php
+++ b/src/Swoole/PgSQL/ConnectionPoolFactory.php
@@ -4,8 +4,10 @@
namespace OpsWay\Doctrine\DBAL\Swoole\PgSQL;
+use Swoole\Coroutine\PostgreSQL;
+
/**
- * @psalm-type ConnectionPullFactoryConfig = array{
+ * @psalm-type ConnectionPoolFactoryConfig = array{
* dbname: 'mydb',
* user: 'user',
* password: 'secret',
@@ -22,28 +24,27 @@
* }
* @psalm-suppress MissingDependency, UndefinedClass
*/
-class ConnectionPullFactory
+class ConnectionPoolFactory
{
- // TTL in milliseconds
- public const DEFAULT_CONNECTION_TTL = 60000;
+ // Allowed IDLE time in seconds
+ public const DEFAULT_CONNECTION_TTL = 60;
// Allowed queries per connection
- public const DEFAULT_USED_TIMES = 0;
+ public const DEFAULT_USAGE_LIMIT = 0;
/**
- * @psalm-param ConnectionPullFactoryConfig $params
+ * @psalm-param ConnectionPoolFactoryConfig $params
*/
- public function __invoke(array $params) : DownscaleableConnectionPool
+ public function __invoke(array $params) : ConnectionPoolInterface
{
/**
* @var int|null $pullSize
*/
$pullSize = $params['poolSize'] ?? null;
- /**
- * @var int|string|null $tickFrequency
- */
- $tickFrequency = $params['tickFrequency'] ?? null;
+ /** @var int|string $usageLimit */
+ $usageLimit = $params['usedTimes'] ?? self::DEFAULT_USAGE_LIMIT;
+
/** @psalm-suppress RedundantCastGivenDocblockType */
- $connectionTtl = (int) ($params['connectionTtl'] ?? self::DEFAULT_CONNECTION_TTL) / 1000;
+ $connectionTtl = (int) ($params['connectionTtl'] ?? self::DEFAULT_CONNECTION_TTL);
/**
* @psalm-suppress MissingDependency
@@ -51,14 +52,11 @@ public function __invoke(array $params) : DownscaleableConnectionPool
* @psalm-suppress RedundantCastGivenDocblockType
* @psalm-suppress RedundantCast
*/
- return new DownscaleableConnectionPool(
- static fn() : PsqlConnectionWrapper => Driver::createConnection(
- Driver::generateDSN($params),
- (int) $connectionTtl, // TTL in seconds
- (int) ($params['usedTimes'] ?? self::DEFAULT_USED_TIMES)
- ),
+ return new ConnectionPool(
+ static fn() : PostgreSQL => Driver::createConnection(Driver::generateDSN($params)),
$pullSize,
- tickFrequency: $tickFrequency ? (int) $tickFrequency : null
+ $connectionTtl,
+ (int) $usageLimit
);
}
}
diff --git a/src/Swoole/PgSQL/ConnectionPoolInterface.php b/src/Swoole/PgSQL/ConnectionPoolInterface.php
new file mode 100644
index 0000000..8deb9fa
--- /dev/null
+++ b/src/Swoole/PgSQL/ConnectionPoolInterface.php
@@ -0,0 +1,19 @@
+counterLimit && ! $this->ttl,
+ $this->counterLimit && $this->counterLimit > $this->counter,
+ $this->ttl && time() - $this->lastInteraction > $this->ttl => false,
+ default => true
+ };
+ }
+}
diff --git a/src/Swoole/PgSQL/ConnectionWrapperInterface.php b/src/Swoole/PgSQL/ConnectionWrapperInterface.php
deleted file mode 100644
index 82473cb..0000000
--- a/src/Swoole/PgSQL/ConnectionWrapperInterface.php
+++ /dev/null
@@ -1,51 +0,0 @@
-connectsMap = new Channel(1);
-
- parent::__construct($constructor, $size ?? self::DEFAULT_POOL_SIZE, $proxy);
- }
-
- public function get(float $timeout = -1) : ConnectionWrapperInterface|bool|null
- {
- $this->init();
- /** @psalm-var ConnectionWrapperInterface|null $connection */
- $connection = parent::get($timeout);
- if ($connection instanceof ConnectionWrapperInterface) {
- /** @psalm-var string[] $map */
- $map = $this->connectsMap->pop();
- $map = array_filter($map, fn(string $id) : bool => $id !== $connection->id());
- $this->connectsMap->push($map);
- }
-
- return $connection;
- }
-
- public function removeConnect(?ConnectionWrapperInterface $connection) : void
- {
- $this->init();
- if (! $connection instanceof ConnectionWrapperInterface) {
- return;
- }
- /** @psalm-var string[] $map */
- $map = $this->connectsMap->pop();
- /** if connect presents in map, it`s attempt to remove previously returned connect */
- if (! in_array($connection->id(), $map) && $this->num > 0) {
- $this->num--;
- }
- $connection->__destruct();
- $this->connectsMap->push($map);
- }
-
- /** @param ConnectionWrapperInterface|null $connection */
- public function put($connection, bool $updateLastInteraction = true) : void
- {
- $this->init();
- if (! $connection instanceof ConnectionWrapperInterface) {
- return;
- }
- if ($updateLastInteraction) {
- $connection->updateLastInteraction();
- }
- if (! $connection->isReusable()) {
- $this->removeConnect($connection);
-
- return;
- }
- /** @psalm-var string[] $map */
- $map = $this->connectsMap->pop();
- if (! in_array($connection->id(), $map)) {
- $map[] = $connection->id();
- parent::put($connection);
- }
- $this->connectsMap->push($map);
- }
-
- public function close() : void
- {
- if ($this->downscaleTimerId > 0) {
- Timer::clear($this->downscaleTimerId);
- }
-
- parent::close();
- }
-
- private function init() : void
- {
- if ($this->inited) {
- return;
- }
- $this->connectsMap->push([]);
- $this->inited = true;
-
- // Prevent running timer for CLI commands
- // But run for HTTP server and Message workers (coroutine starts with 2)
- if (Co::getCid() > 1) {
- $this->downscaleTimerId = (int) Timer::tick(
- $this->tickFrequency ?? self::DOWNSCALE_TICK_FREQUENCY,
- fn() => $this->downscale()
- );
- }
- }
-
- public function downscale() : void
- {
- /** @var array $map */
- $map = $this->connectsMap->pop();
- $this->connectsMap->push($map);
- if ($this->num === 0) {
- return;
- }
- $step = $this->num;
- $upToDateConnections = [];
- /** downscaling overdue ttl connections */
- while ($step > 0) {
- $step--;
- $connection = $this->get(1);
- if (! $connection instanceof ConnectionWrapperInterface) {
- continue;
- }
- if (! $connection->isUptoDate() || ! $connection->isReusable()) {
- $this->removeConnect($connection);
-
- continue;
- }
- $upToDateConnections[] = $connection;
- }
- foreach ($upToDateConnections as $connection) {
- $this->put($connection, false);
- }
- unset($upToDateConnections);
- }
-
- /**
- * Exclude object data from doctrine cache serialization
- *
- * @see vendor/doctrine/dbal/src/Cache/QueryCacheProfile.php:127
- */
- public function __serialize() : array
- {
- return [];
- }
-
- /**
- * @param string $data
- */
- public function __unserialize($data) : void
- {
- // Do nothing
- }
-}
diff --git a/src/Swoole/PgSQL/Driver.php b/src/Swoole/PgSQL/Driver.php
index 3f80e52..74dbd5b 100644
--- a/src/Swoole/PgSQL/Driver.php
+++ b/src/Swoole/PgSQL/Driver.php
@@ -5,25 +5,18 @@
namespace OpsWay\Doctrine\DBAL\Swoole\PgSQL;
use Doctrine\DBAL\Driver\AbstractPostgreSQLDriver;
-use Doctrine\DBAL\Exception as DBALException;
use OpsWay\Doctrine\DBAL\Swoole\PgSQL\Exception\ConnectionException;
use OpsWay\Doctrine\DBAL\Swoole\PgSQL\Exception\DriverException;
use Swoole\Coroutine\PostgreSQL;
-use Throwable;
-use function abs;
use function array_key_exists;
-use function defer;
use function implode;
-use function is_resource;
use function sprintf;
-use function trim;
-use function usleep;
/** @psalm-suppress UndefinedClass, DeprecatedInterface, MissingDependency */
final class Driver extends AbstractPostgreSQLDriver
{
- public function __construct(private ?ConnectionPullInterface $pool = null)
+ public function __construct(private ?ConnectionPoolInterface $pool = null)
{
}
@@ -35,58 +28,13 @@ public function __construct(private ?ConnectionPullInterface $pool = null)
*/
public function connect(array $params, $username = null, $password = null, array $driverOptions = []) : Connection
{
- $pool = $this->pool;
- if (! $pool instanceof ConnectionPullInterface) {
+ if (! $this->pool instanceof ConnectionPoolInterface) {
throw new DriverException('Connection pull should be initialized');
}
$retryMaxAttempts = (int) ($params['retry']['max_attempts'] ?? 1);
$retryDelay = (int) ($params['retry']['delay'] ?? 0);
- $lastException = null;
- $connect = null;
- for ($i = 0; $i < $retryMaxAttempts; $i++) {
- try {
- /** @psalm-suppress MissingDependency */
- $connect = $pool->get(2);
- if (! $connect instanceof ConnectionWrapperInterface) {
- throw new DriverException('No connect available in pull');
- }
- /** @var resource|bool $query */
- $query = $connect->query('SELECT 1');
- $affectedRows = is_resource($query) ? $connect->affectedRows($query) : 0;
- if ($affectedRows !== 1) {
- throw new ConnectionException(
- 'Connection ping failed. Trying reconnect (attempt ' . $i . '). Reason: '
- . trim($connect->error())
- );
- }
- break;
- } catch (Throwable $e) {
- $errCode = '';
- if ($connect instanceof ConnectionWrapperInterface) {
- $errCode = $connect->errorCode();
- /** @psalm-suppress MissingDependency */
- $pool->removeConnect($connect);
- $connect = null;
- }
- $lastException = $e instanceof DBALException
- ? $e
- : new ConnectionException($e->getMessage(), (string) $errCode, '', (int) $e->getCode(), $e);
- /** @psalm-suppress ArgumentTypeCoercion */
- usleep(abs($retryDelay) * 1000); // Sleep ms after failure
- }
- }
- if (! $connect instanceof ConnectionWrapperInterface) {
- $lastException instanceof Throwable
- ? throw $lastException
- : throw new ConnectionException('Connection could not be initiated');
- }
-
- /** @psalm-suppress MissingClosureReturnType,PossiblyNullReference,UnusedFunctionCall,UnusedVariable */
- defer(static fn() => $pool->put($connect));
-
- /** @psalm-suppress PossiblyNullArgument */
- return new Connection($connect);
+ return new Connection($this->pool, $retryDelay, $retryMaxAttempts);
}
/**
@@ -94,14 +42,14 @@ public function connect(array $params, $username = null, $password = null, array
*
* @throws ConnectionException
*/
- public static function createConnection(string $dsn, int $ttl, int $maxUsageTimes) : PsqlConnectionWrapper
+ public static function createConnection(string $dsn) : PostgreSQL
{
$pgsql = new PostgreSQL();
if (! $pgsql->connect($dsn)) {
throw new ConnectionException(sprintf('Failed to connect: %s', (string) ($pgsql->error ?? 'Unknown')));
}
- return new PsqlConnectionWrapper($pgsql, $ttl, $maxUsageTimes);
+ return $pgsql;
}
/**
diff --git a/src/Swoole/PgSQL/DriverMiddleware.php b/src/Swoole/PgSQL/DriverMiddleware.php
index 1e7b256..785f6cc 100644
--- a/src/Swoole/PgSQL/DriverMiddleware.php
+++ b/src/Swoole/PgSQL/DriverMiddleware.php
@@ -9,7 +9,7 @@
final class DriverMiddleware implements MiddlewareInterface
{
- public function __construct(private ConnectionPullInterface $connectionPool)
+ public function __construct(private ConnectionPoolInterface $connectionPool)
{
}
diff --git a/src/Swoole/PgSQL/Exception/ExceptionFromConnectionTrait.php b/src/Swoole/PgSQL/Exception/ExceptionFromConnectionTrait.php
index 2bedc79..89e280a 100644
--- a/src/Swoole/PgSQL/Exception/ExceptionFromConnectionTrait.php
+++ b/src/Swoole/PgSQL/Exception/ExceptionFromConnectionTrait.php
@@ -5,22 +5,22 @@
namespace OpsWay\Doctrine\DBAL\Swoole\PgSQL\Exception;
use ArrayAccess;
-use OpsWay\Doctrine\DBAL\Swoole\PgSQL\ConnectionWrapperInterface;
+use Swoole\Coroutine\PostgreSQL;
/** @psalm-immutable */
trait ExceptionFromConnectionTrait
{
- public static function fromConnection(ConnectionWrapperInterface $connection) : self
+ public static function fromConnection(PostgreSQL $connection) : self
{
/** @var ArrayAccess $resultDiag */
- $resultDiag = $connection->resultDiag() ?? [];
+ $resultDiag = $connection->resultDiag ?? [];
$sqlstate = (string) ($resultDiag['sqlstate'] ?? '');
return new self(
- $connection->error(),
- (string) $connection->errorCode(),
+ (string) $connection->error,
+ (string) $connection->errCode,
$sqlstate,
- $connection->errorCode(),
+ (int) $connection->errCode,
);
}
}
diff --git a/src/Swoole/PgSQL/PsqlConnectionWrapper.php b/src/Swoole/PgSQL/PsqlConnectionWrapper.php
deleted file mode 100644
index 6c1192a..0000000
--- a/src/Swoole/PgSQL/PsqlConnectionWrapper.php
+++ /dev/null
@@ -1,144 +0,0 @@
-lastInteraction = time();
- $this->id = uniqid((string) mt_rand(), true);
- }
-
- public function __destruct()
- {
- unset($this->connection);
- $this->connection = null;
- }
-
- public function updateLastInteraction() : void
- {
- $this->lastInteraction = time();
- }
-
- public function isUptoDate() : bool
- {
- return $this->lastInteraction + $this->ttl > time();
- }
-
- public function id() : string
- {
- return $this->id;
- }
-
- public function times() : int
- {
- return $this->usedTimes;
- }
-
- public function isReusable() : bool
- {
- return $this->usedTimes < $this->maxUsageTimes;
- }
-
- /** @return resource|false */
- public function query(string $sql)
- {
- $this->usedTimes++;
- $result = $this->connection->query($sql);
-
- if (! is_resource($result)) {
- $result = false;
- }
-
- return $result;
- }
-
- /** @param resource $queryResult */
- public function affectedRows($queryResult) : int
- {
- return (int) $this->connection->affectedRows($queryResult);
- }
-
- /** @param resource $queryResult */
- public function fetchAssoc($queryResult) : array|bool
- {
- $result = $this->connection->fetchAssoc($queryResult);
-
- if (is_array($result) && count($result) === 0) {
- $result = false;
- }
-
- return $result;
- }
-
- /** @param resource $queryResult*/
- public function fetchArray($queryResult, int|null $row = null, mixed $resultType = null) : array|bool
- {
- return match (true) {
- $row !== null && $resultType !== null => $this->connection->fetchArray($queryResult, $row, $resultType),
- $row !== null => $this->connection->fetchArray($queryResult, $row),
- $resultType !== null => $this->connection->fetchArray($queryResult, null, $resultType),
- default => $this->connection->fetchArray($queryResult)
- };
- }
-
- public function prepare(string $key, string $sql) : Result|bool
- {
- return $this->connection->prepare($key, $sql);
- }
-
- /** @return resource|false */
- public function execute(string $key, array $params)
- {
- $this->usedTimes++;
-
- return $this->connection->execute($key, $params);
- }
-
- /** @return mixed */
- public function escape(mixed $value)
- {
- return $this->connection->escape($value);
- }
-
- /** @param resource|null $result*/
- public function fieldCount($result) : int
- {
- return (int) $this->connection->fieldCount($result);
- }
-
- public function error() : string
- {
- return (string) $this->connection->error;
- }
-
- public function errorCode() : int
- {
- return (int) $this->connection->errCode;
- }
-
- /** @return ArrayAccess|array|null */
- public function resultDiag()
- {
- return $this->connection->resultDiag;
- }
-}
diff --git a/src/Swoole/PgSQL/Result.php b/src/Swoole/PgSQL/Result.php
index ac9cee0..848b598 100644
--- a/src/Swoole/PgSQL/Result.php
+++ b/src/Swoole/PgSQL/Result.php
@@ -6,7 +6,10 @@
use Doctrine\DBAL\Driver\Result as ResultInterface;
use OpsWay\Doctrine\DBAL\Swoole\PgSQL\Exception\DriverException as SwooleDriverException;
+use Swoole\Coroutine\PostgreSQL;
+use function count;
+use function is_array;
use function is_resource;
use const OPENSWOOLE_PGSQL_NUM;
@@ -14,7 +17,7 @@
class Result implements ResultInterface
{
/** @param resource|null $result */
- public function __construct(private ConnectionWrapperInterface $connection, private $result)
+ public function __construct(private PostgreSQL $connection, private $result)
{
}
@@ -28,7 +31,7 @@ public function fetchNumeric() : array|bool
* @psalm-var list|false $result
* @psalm-suppress UndefinedConstant
*/
- $result = $this->connection->fetchArray($this->result, resultType: OPENSWOOLE_PGSQL_NUM);
+ $result = $this->connection->fetchArray($this->result, null, OPENSWOOLE_PGSQL_NUM);
return $result;
}
@@ -40,7 +43,10 @@ public function fetchAssociative() : array|bool
throw SwooleDriverException::fromConnection($this->connection);
}
/** @psalm-var array|false $result */
- $result = $this->connection->fetchAssoc($this->result);
+ $result = $this->connection->fetchAssoc($this->result, null);
+ if (is_array($result) && count($result) === 0) {
+ $result = false;
+ }
return $result;
}
@@ -97,7 +103,7 @@ public function rowCount() : int
throw SwooleDriverException::fromConnection($this->connection);
}
- return $this->connection->affectedRows($this->result);
+ return (int) $this->connection->affectedRows($this->result);
}
/** {@inheritdoc} */
@@ -107,7 +113,7 @@ public function columnCount() : int
throw SwooleDriverException::fromConnection($this->connection);
}
- return $this->connection->fieldCount($this->result);
+ return (int) $this->connection->fieldCount($this->result);
}
/** {@inheritdoc} */
diff --git a/src/Swoole/PgSQL/Scaler.php b/src/Swoole/PgSQL/Scaler.php
new file mode 100644
index 0000000..eeebe87
--- /dev/null
+++ b/src/Swoole/PgSQL/Scaler.php
@@ -0,0 +1,58 @@
+timerId) {
+ return;
+ }
+ $this->timerId = Timer::tick(
+ $this->tickFrequency ?? self::DOWNSCALE_TICK_FREQUENCY,
+ fn() => $this->downscale()
+ ) ?: null;
+ }
+
+ private function downscale() : void
+ {
+ $poolCapacity = $this->pool->capacity();
+ /** @psalm-var PostgreSQL[] $connections */
+ $connections = [];
+ while ($poolCapacity > 0) {
+ /** @psalm-suppress UnusedVariable */
+ [$connection, $connectionStats] = $this->pool->get();
+ /** connection never null if poll capacity > 0 */
+ if (! $connection) {
+ return;
+ }
+ $connections[] = $connection;
+ $poolCapacity--;
+ }
+ array_map(fn(PostgreSQL $connection) => $this->pool->put($connection), $connections);
+ }
+
+ public function close() : void
+ {
+ if (! $this->timerId) {
+ return;
+ }
+ Timer::clear($this->timerId);
+ }
+}
diff --git a/src/Swoole/PgSQL/Statement.php b/src/Swoole/PgSQL/Statement.php
index be78c44..9aa4c62 100644
--- a/src/Swoole/PgSQL/Statement.php
+++ b/src/Swoole/PgSQL/Statement.php
@@ -8,6 +8,7 @@
use Doctrine\DBAL\Driver\Statement as StatementInterface;
use Doctrine\DBAL\ParameterType;
use OpsWay\Doctrine\DBAL\Swoole\PgSQL\Exception\DriverException as SwooleDriverException;
+use Swoole\Coroutine\PostgreSQL;
use function is_array;
use function is_bool;
@@ -19,7 +20,7 @@ final class Statement implements StatementInterface
private string $key;
private array $params = [];
- public function __construct(private ConnectionWrapperInterface $connection, string $sql)
+ public function __construct(private PostgreSQL $connection, string $sql, private ?ConnectionStats $stats)
{
$this->key = uniqid('stmt_', true);
if ($this->connection->prepare($this->key, $sql) === false) {
@@ -72,6 +73,9 @@ public function execute($params = []) : ResultInterface
}
$result = $this->connection->execute($this->key, $mergedParams);
+ if ($this->stats instanceof ConnectionStats) {
+ $this->stats->counter++;
+ }
if (! is_resource($result)) {
throw SwooleDriverException::fromConnection($this->connection);
}
@@ -81,12 +85,12 @@ public function execute($params = []) : ResultInterface
public function errorCode() : int
{
- return $this->connection->errorCode();
+ return (int) $this->connection->errCode;
}
public function errorInfo() : string
{
- return $this->connection->error();
+ return (string) $this->connection->error;
}
private function escapeValue(mixed $value, int $type = ParameterType::STRING) : ?string