Skip to content

Commit

Permalink
[BOODMO-33976]prepare() on null problem resolve
Browse files Browse the repository at this point in the history
  • Loading branch information
fon-MaXX authored and mrVrAlex committed Jul 26, 2022
1 parent 73b8d00 commit 01f36aa
Show file tree
Hide file tree
Showing 20 changed files with 403 additions and 499 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"doctrine/dbal": "^3.2"
},
"require-dev": {
"openswoole/ide-helper": "^4.9",
"openswoole/ide-helper": "^v4.10.0.x-dev",
"opsway/psr12-strict-coding-standard": "^0.7.0",
"vimeo/psalm": "^4.22",
"weirdan/doctrine-psalm-plugin": "^2.0"
Expand Down
23 changes: 14 additions & 9 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion example/cli.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
'delay' => 1, // after this time
]
];
$pool = (new \OpsWay\Doctrine\DBAL\Swoole\PgSQL\ConnectionPullFactory())($connectionParams);
$pool = (new \OpsWay\Doctrine\DBAL\Swoole\PgSQL\ConnectionPoolFactory())($connectionParams);
$configuration = new \Doctrine\DBAL\Configuration();
$configuration->setMiddlewares(
[new \OpsWay\Doctrine\DBAL\Swoole\PgSQL\DriverMiddleware($pool)]
Expand Down
2 changes: 1 addition & 1 deletion example/server.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
'delay' => 1, // after this time
]
];
$pool = (new \OpsWay\Doctrine\DBAL\Swoole\PgSQL\ConnectionPullFactory())($connectionParams);
$pool = (new \OpsWay\Doctrine\DBAL\Swoole\PgSQL\ConnectionPoolFactory())($connectionParams);
$configuration = new \Doctrine\DBAL\Configuration();
$configuration->setMiddlewares(
[new \OpsWay\Doctrine\DBAL\Swoole\PgSQL\DriverMiddleware($pool)]
Expand Down
4 changes: 3 additions & 1 deletion phpcs.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,7 @@
<file>src</file>

<!-- Include all rules from the Zend Coding Standard -->
<rule ref="OpsWayStrictPSR12CodingStandard"/>
<rule ref="OpsWayStrictPSR12CodingStandard">
<exclude name="WebimpressCodingStandard.Namespaces.UnusedUseStatement.UnusedUse"/>
</rule>
</ruleset>
124 changes: 108 additions & 16 deletions src/Swoole/PgSQL/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,39 @@
namespace OpsWay\Doctrine\DBAL\Swoole\PgSQL;

use Doctrine\DBAL\Driver\Connection as ConnectionInterface;
use Doctrine\DBAL\Exception as DBALException;
use Doctrine\DBAL\ParameterType;
use OpsWay\Doctrine\DBAL\SQLParserUtils;
use OpsWay\Doctrine\DBAL\Swoole\PgSQL\Exception\ConnectionException;
use OpsWay\Doctrine\DBAL\Swoole\PgSQL\Exception\DriverException as SwooleDriverException;
use OpsWay\Doctrine\DBAL\Swoole\PgSQL\Exception\DriverException;
use Swoole\Coroutine as Co;
use Swoole\Coroutine\PostgreSQL;
use Throwable;
use WeakMap;

use function defer;
use function is_resource;
use function strlen;
use function substr;
use function time;
use function trim;

final class Connection implements ConnectionInterface
{
public function __construct(private ConnectionWrapperInterface $connection)
/** @psalm-var array<int, PostgreSQL> */
private array $internalStorage = [];
private WeakMap $statsStorage;

public function __construct(private ConnectionPool $pool, private int $retryDelay, private int $maxAttempts)
{
$this->statsStorage = new WeakMap();
defer(fn () => $this->onDefer());
}

/**
* {@inheritdoc}
*
* @throws SwooleDriverException
* @throws DriverException
*/
public function prepare(string $sql) : Statement
{
Expand All @@ -39,22 +53,28 @@ public function prepare(string $sql) : Statement
$posShift += strlen($placeholder) - 1;
$i++;
}
$connection = $this->getNativeConnection();

return new Statement($this->connection, $sql);
return new Statement($connection, $sql, $this->connectionStats($connection));
}

/**
* {@inheritdoc}
*/
public function query(string $sql) : Result
{
$resource = $this->connection->query($sql);
$connection = $this->getNativeConnection();
$resource = $connection->query($sql);
$stats = $this->connectionStats($connection);
if ($stats instanceof ConnectionStats) {
$stats->counter++;
}

if (! is_resource($resource)) {
throw ConnectionException::fromConnection($this->connection);
throw ConnectionException::fromConnection($connection);
}

return new Result($this->connection, $resource);
return new Result($this->getNativeConnection(), $resource);
}

/**
Expand All @@ -65,21 +85,26 @@ public function query(string $sql) : Result
*/
public function quote($value, $type = ParameterType::STRING) : string
{
return "'" . (string) $this->connection->escape($value) . "'";
return "'" . (string) $this->getNativeConnection()->escape($value) . "'";
}

/**
* {@inheritdoc}
*/
public function exec(string $sql) : int
{
$query = $this->connection->query($sql);
$connection = $this->getNativeConnection();
$query = $connection->query($sql);
$stats = $this->connectionStats($connection);
if ($stats instanceof ConnectionStats) {
$stats->counter++;
}

if (! is_resource($query)) {
throw ConnectionException::fromConnection($this->connection);
throw ConnectionException::fromConnection($this->getNativeConnection());
}

return $this->connection->affectedRows($query);
return (int) $this->getNativeConnection()->affectedRows($query);
}

/**
Expand All @@ -101,7 +126,7 @@ public function lastInsertId($name = null) : string
*/
public function beginTransaction() : bool
{
$this->connection->query('START TRANSACTION');
$this->getNativeConnection()->query('START TRANSACTION');

return true;
}
Expand All @@ -111,7 +136,7 @@ public function beginTransaction() : bool
*/
public function commit() : bool
{
$this->connection->query('COMMIT');
$this->getNativeConnection()->query('COMMIT');

return true;
}
Expand All @@ -121,18 +146,85 @@ public function commit() : bool
*/
public function rollBack() : bool
{
$this->connection->query('ROLLBACK');
$this->getNativeConnection()->query('ROLLBACK');

return true;
}

public function errorCode() : int
{
return $this->connection->errorCode();
return (int) $this->getNativeConnection()->errCode;
}

public function errorInfo() : string
{
return $this->connection->error();
return (string) $this->getNativeConnection()->error;
}

public function getNativeConnection() : PostgreSQL
{
$connection = $this->internalStorage[Co::getCid()] ?? null;
if (! $connection instanceof PostgreSQL) {
$lastException = null;
for ($i = 0; $i < $this->maxAttempts; $i++) {
try {
/** @psalm-suppress MissingDependency */
[$connection, $stats] = $this->pool->get(2);
if (! $connection instanceof PostgreSQL) {
throw new DriverException('No connect available in pull');
}
/** @var resource|bool $query */
$query = $connection->query('SELECT 1');
$affectedRows = is_resource($query) ? (int) $connection->affectedRows($query) : 0;
if ($affectedRows !== 1) {
$errCode = trim($connection->errCode);
throw new ConnectionException(
"Connection ping failed. Trying reconnect (attempt $i). Reason: $errCode"
);
}
$this->internalStorage[Co::getCid()] = $connection;
$this->statsStorage[$connection] = $stats;

break;
} catch (Throwable $e) {
$errCode = '';
if ($connection instanceof PostgreSQL) {
$errCode = $connection->errCode;
$connection = null;
}
$lastException = $e instanceof DBALException
? $e
: new ConnectionException($e->getMessage(), (string) $errCode, '', (int) $e->getCode(), $e);
Co::sleep($this->retryDelay); // Sleep s after failure
}
}
if (! $connection instanceof PostgreSQL) {
$lastException instanceof Throwable
? throw $lastException
: throw new ConnectionException('Connection could not be initiated');
}
}

return $this->internalStorage[Co::getCid()];
}

public function connectionStats(PostgreSQL $connection) : ?ConnectionStats
{
return $this->statsStorage[$connection] ?? null;
}

private function onDefer() : void
{
$connection = $this->internalStorage[Co::getCid()] ?: null;
if (! $connection instanceof PostgreSQL) {
return;
}
$stats = $this->connectionStats($connection);
if ($stats instanceof ConnectionStats) {
$stats->lastInteraction = time();
}
$this->pool->put($connection);
unset($this->internalStorage[Co::getCid()]);
$this->statsStorage->offsetUnset($connection);
}
}
Loading

0 comments on commit 01f36aa

Please sign in to comment.