diff --git a/composer.json b/composer.json index e7e756b..538588b 100644 --- a/composer.json +++ b/composer.json @@ -23,8 +23,8 @@ "ext-sockets": "*", "ext-pdo": "*", "cloudtay/ripple": "^1.0.0", - "cloudtay/ripple-http": "dev-main", - "cloudtay/ripple-websocket": "dev-main" + "cloudtay/ripple-http": "^1.0.0", + "cloudtay/ripple-websocket": "^1.0.0" }, "require-dev": { "amphp/mysql": "^3.0", diff --git a/example/workerman4.php b/example/workerman4.php index d981207..d777c09 100644 --- a/example/workerman4.php +++ b/example/workerman4.php @@ -34,54 +34,44 @@ include_once __DIR__ . '/../vendor/autoload.php'; +use Ripple\Driver\Workerman\AsyncTcpConnection; use Ripple\Driver\Workerman\Driver4; -use Ripple\Utils\Output; -use Workerman\Timer; +use Ripple\Http\Guzzle; +use Workerman\Connection\TcpConnection; use Workerman\Worker; use function Co\async; -use function Co\delay; $worker = new Worker('tcp://127.0.0.1:28008'); $worker->onWorkerStart = function () { - $timerId = Timer::add(0.1, function () { - Output::info("memory usage: " . \memory_get_usage()); - }); - - $timerId2 = Timer::add(1, function () { - Output::info("memory usage: " . \memory_get_usage()); - \gc_collect_cycles(); - }); + $connect = new AsyncTcpConnection('ssl://www.google.com:443'); + $connect->onConnect = function (TcpConnection $connection) { + \var_dump('Connected'); + $connection->send("GET / HTTP/1.1\r\nHost: www.google.com\r\nConnection: close\r\n\r\n", true); + }; - delay(function () use ($timerId) { - Timer::del($timerId); - }, 3); + $connect->onMessage = function (TcpConnection $connection, $data) { + \var_dump($data); + }; + $connect->connectViaProxy('socks5://127.0.0.1:1080'); }; $worker->onMessage = function ($connection, $data) { - // //方式1 - async(function ($r) use ($connection) { + async(function () use ($connection) { \Co\sleep(3); - $fileContent = \Co\IO::File()->getContents(__FILE__); - $hash = \hash('sha256', $fileContent); $connection->send("[await] File content hash: {$hash}" . \PHP_EOL); - - $r(); }); //使用原生guzzle实现异步请求 try { - $response = \Ripple\Http\Guzzle::newClient()->get('https://www.baidu.com/'); + $response = Guzzle::newClient()->get('https://www.baidu.com/'); \var_dump($response->getStatusCode()); $connection->send("[async] Response status code: {$response->getStatusCode()}" . \PHP_EOL); } catch (Throwable $exception) { $connection->send("[async] Exception: {$exception->getMessage()}" . \PHP_EOL); - } - - $connection->send("say {$data}"); }; diff --git a/example/workerman5.php b/example/workerman5.php index ed6f3e3..746666e 100644 --- a/example/workerman5.php +++ b/example/workerman5.php @@ -35,6 +35,7 @@ include_once __DIR__ . '/../vendor/autoload.php'; use Ripple\Driver\Workerman\Driver5; +use Ripple\Http\Guzzle; use Ripple\Utils\Output; use Workerman\Timer; use Workerman\Worker; @@ -60,25 +61,20 @@ $worker->onMessage = function ($connection, $data) { // //方式1 - async(function ($r) use ($connection) { + async(function () use ($connection) { \Co\sleep(3); - $fileContent = \Co\IO::File()->getContents(__FILE__); - $hash = \hash('sha256', $fileContent); $connection->send("[await] File content hash: {$hash}" . \PHP_EOL); - - $r(); }); //使用原生guzzle实现异步请求 try { - $response = \Ripple\Http\Guzzle::newClient()->get('https://www.baidu.com/'); + $response = Guzzle::newClient()->get('https://www.baidu.com/'); \var_dump($response->getStatusCode()); $connection->send("[async] Response status code: {$response->getStatusCode()}" . \PHP_EOL); } catch (Throwable $exception) { $connection->send("[async] Exception: {$exception->getMessage()}" . \PHP_EOL); - } $connection->send("say {$data}"); diff --git a/src/Laravel/.env.example b/src/Laravel/.env.example index fe15ae1..a2217d3 100644 --- a/src/Laravel/.env.example +++ b/src/Laravel/.env.example @@ -1,4 +1,3 @@ PRP_HTTP_LISTEN=http://127.0.0.1:8008 PRP_HTTP_WORKERS=4 PRP_HTTP_RELOAD=0 -PRP_HTTP_SANDBOX=1 diff --git a/src/Laravel/Driver.php b/src/Laravel/Driver.php index fc86609..932910c 100644 --- a/src/Laravel/Driver.php +++ b/src/Laravel/Driver.php @@ -81,7 +81,6 @@ class Driver extends Command 'HTTP_LISTEN' => 'string', 'HTTP_WORKERS' => 'int', 'HTTP_RELOAD' => 'bool', - 'HTTP_SANDBOX' => 'bool', 'HTTP_ISOLATION' => 'bool', ]; @@ -236,9 +235,8 @@ private function start(): void $listen = Config::get('ripple.HTTP_LISTEN', 'http://127.0.0.1:8008'); $count = intval(Config::get('ripple.HTTP_WORKERS', 4)); - $sandbox = \Ripple\Driver\Utils\Config::value2bool(Config::get('ripple.HTTP_SANDBOX', 1)); - $this->manager->addWorker(new Worker($listen, $count, $sandbox)); + $this->manager->addWorker(new Worker($listen, $count)); $this->manager->run(); } diff --git a/src/Laravel/Worker.php b/src/Laravel/Worker.php index 12f6f7c..1ad9a77 100644 --- a/src/Laravel/Worker.php +++ b/src/Laravel/Worker.php @@ -78,12 +78,10 @@ class Worker extends \Ripple\Worker /** * @param string $address * @param int $count - * @param bool $sandbox */ public function __construct( private readonly string $address = 'http://127.0.0.1:8008', int $count = 4, - private readonly bool $sandbox = true, ) { $this->name = 'http-server'; $this->count = $count; @@ -156,7 +154,7 @@ public function boot(): void $this->server->onRequest(function ( Request $request ) { - $application = $this->sandbox ? clone $this->application : $this->application; + $application = clone $this->application; /*** @var Kernel $kernel */ $kernel = $application->make(Kernel::class); @@ -201,10 +199,8 @@ public function boot(): void } catch (Throwable $e) { $this->dispatchEvent($application, new WorkerErrorOccurred($this->application, $application, $e)); } finally { - if ($this->sandbox) { - unset($application); - } unset($laravelRequest, $response, $laravelResponse, $kernel); + unset($application); } }); $this->server->listen(); diff --git a/src/Laravel/config/ripple.php b/src/Laravel/config/ripple.php index 679dbd7..0c8da80 100644 --- a/src/Laravel/config/ripple.php +++ b/src/Laravel/config/ripple.php @@ -38,6 +38,5 @@ return [ 'HTTP_LISTEN' => Env::get('PRP_HTTP_LISTEN', 'http://127.0.0.1:8008'), 'HTTP_WORKERS' => Env::get('PRP_HTTP_WORKERS', 4), - 'HTTP_RELOAD' => Env::get('PRP_HTTP_RELOAD', 0), - 'HTTP_SANDBOX' => Env::get('PRP_HTTP_SANDBOX', 1), + 'HTTP_RELOAD' => Env::get('PRP_HTTP_RELOAD', 0) ]; diff --git a/src/Workerman/Driver4.php b/src/Workerman/Driver4.php index 36687cb..cc7e8ee 100644 --- a/src/Workerman/Driver4.php +++ b/src/Workerman/Driver4.php @@ -35,9 +35,9 @@ namespace Ripple\Driver\Workerman; use Closure; -use Co\System; -use Ripple\Stream; use Ripple\Kernel; +use Ripple\Process; +use Ripple\Stream; use Throwable; use Workerman\Events\EventInterface; use Workerman\Worker; @@ -73,7 +73,10 @@ class Driver4 implements EventInterface protected array $_timer = []; /*** @var array */ - protected array $_fd2ids = []; + protected array $_fd2RIDs = []; + + /*** @var array */ + protected array $_fd2WIDs = []; /*** @var array */ protected array $_signal2ids = []; @@ -113,12 +116,10 @@ public function add($fd, $flag, $func, $args = []): bool|int } } - // 未找到回调 if (!isset($closure)) { return false; } - // 注册信号处理器 $id = onSignal($fd, $closure); $this->_signal2ids[$fd] = string2int($id); return string2int($id); @@ -127,7 +128,6 @@ public function add($fd, $flag, $func, $args = []): bool|int } case EventInterface::EV_TIMER: - // 定时器 $this->_timer[] = $timerId = repeat(function () use ($func, $args) { try { call_user_func_array($func, $args); @@ -135,11 +135,9 @@ public function add($fd, $flag, $func, $args = []): bool|int Worker::stopAll(250, $e); } }, $fd); - return string2int($timerId); case EventInterface::EV_TIMER_ONCE: - // 一次性定时器 $this->_timer[] = $timerId = delay(function () use ($func, $args) { try { call_user_func_array($func, $args); @@ -147,27 +145,24 @@ public function add($fd, $flag, $func, $args = []): bool|int Worker::stopAll(250, $e); } }, $fd); - return string2int($timerId); case EventInterface::EV_READ: - // 读事件 $stream = new Stream($fd); $eventId = $stream->onReadable(function (Stream $stream) use ($func) { $func($stream->stream); }); - $this->_fd2ids[$stream->id][] = string2int($eventId); + $this->_fd2RIDs[$stream->id][] = string2int($eventId); return string2int($eventId); case EventInterface::EV_WRITE: - // 写事件 $stream = new Stream($fd); - $eventId = $stream->onWritable(function (Stream $stream) use ($func) { + $eventId = $stream->onWriteable(function (Stream $stream) use ($func) { $func($stream->stream); }); - $this->_fd2ids[$stream->id][] = string2int($eventId); + $this->_fd2WIDs[$stream->id][] = string2int($eventId); return string2int($eventId); } return false; @@ -192,19 +187,26 @@ public function del($fd, $flag): void } if ($flag === EventInterface::EV_READ || $flag === EventInterface::EV_WRITE) { - // 取消读写事件监听 + if (!$fd) { + return; + } + $streamId = get_resource_id($fd); - if (isset($this->_fd2ids[$streamId])) { - foreach ($this->_fd2ids[$streamId] as $id) { - $this->cancel($id); + if ($flag === EventInterface::EV_READ) { + foreach ($this->_fd2RIDs[$streamId] ?? [] as $eventId) { + cancel(int2string($eventId)); + } + unset($this->_fd2RIDs[$streamId]); + } else { + foreach ($this->_fd2WIDs[$streamId] ?? [] as $eventId) { + cancel(int2string($eventId)); } - unset($this->_fd2ids[$streamId]); + unset($this->_fd2WIDs[$streamId]); } return; } if ($flag === EventInterface::EV_SIGNAL) { - // 取消信号监听 $signalId = $this->_signal2ids[$fd] ?? null; if ($signalId) { $this->cancel($signalId); @@ -247,8 +249,9 @@ public function loop(): void Driver4::$baseProcessId = (Kernel::getInstance()->supportProcessControl() ? getmypid() : posix_getpid()); } elseif (Driver4::$baseProcessId !== (Kernel::getInstance()->supportProcessControl() ? getmypid() : posix_getpid())) { Driver4::$baseProcessId = (Kernel::getInstance()->supportProcessControl() ? getmypid() : posix_getpid()); - cancelAll(); - System::Process()->forkedTick(); + Process::getInstance()->processedInMain(static function () { + Process::getInstance()->forgetEvents(); + }); } wait(); diff --git a/src/Workerman/Driver5.php b/src/Workerman/Driver5.php index 3ebaa96..805d11d 100644 --- a/src/Workerman/Driver5.php +++ b/src/Workerman/Driver5.php @@ -34,14 +34,13 @@ namespace Ripple\Driver\Workerman; -use Co\System; use Revolt\EventLoop; use Ripple\Kernel; +use Ripple\Process; use Workerman\Events\EventInterface; use function array_shift; use function Co\cancel; -use function Co\cancelAll; use function Co\delay; use function Co\repeat; use function Co\stop; @@ -95,9 +94,9 @@ public function run(): void Driver5::$baseProcessId = (getmypid()); } elseif (Driver5::$baseProcessId !== (getmypid())) { Driver5::$baseProcessId = (getmypid()); - - cancelAll(); - System::Process()->forkedTick(); + Process::getInstance()->processedInMain(static function () { + Process::getInstance()->forgetEvents(); + }); } wait(); diff --git a/src/Workerman/Extensions/IteratorResponse.php b/src/Workerman/Extensions/IteratorResponse.php deleted file mode 100644 index 088f3f8..0000000 --- a/src/Workerman/Extensions/IteratorResponse.php +++ /dev/null @@ -1,138 +0,0 @@ -iterator = $iterator; - - if ($autopilot) { - switch (Worker::$eventLoopClass) { - case Driver5::class: - case Driver4::class: - \Co\defer(fn () => $this->processIterator()); - break; - default: - if (version_compare(Worker::VERSION, '5.0.0', '>=')) { - Worker::$globalEvent->delay(0, function () use ($iterator) { - $this->processIterator(); - }); - } else { - Timer::add(0.1, function () use ($iterator) { - $this->processIterator(); - }, [], false); - } - break; - } - } - - parent::__construct(200, array_merge([ - ], [])); - } - - /** - * @return $this - */ - public function processIterator(): static - { - foreach ($this->iterator as $frame) { - $this->tcpConnection->send($frame, true); - } - - if ($this->closeWhenFinish) { - $this->close(); - } - - return $this; - } - - /** - * @return void - */ - public function close(): void - { - $this->tcpConnection->close(); - } - - /** - * @param Iterator|Closure $iterator - * @param TcpConnection $tcpConnection - * @param bool $closeWhenFinish - * @param bool $autopilot - * - * @return IteratorResponse - */ - public static function create( - Iterator|Closure $iterator, - TcpConnection $tcpConnection, - bool $closeWhenFinish = false, - bool $autopilot = true, - ): IteratorResponse { - return new static($iterator, $tcpConnection, $closeWhenFinish, $autopilot); - } -} diff --git a/src/Workerman/RPL.php b/src/Workerman/RPL.php deleted file mode 100644 index c1cacc9..0000000 --- a/src/Workerman/RPL.php +++ /dev/null @@ -1,60 +0,0 @@ -