From 1f524ee6e2be29e11f7f4ce85fd873c646b2b371 Mon Sep 17 00:00:00 2001 From: cclilshy Date: Fri, 15 Nov 2024 14:29:25 +0800 Subject: [PATCH] Refactoring laravel worker architecture --- .php-cs-fixer.php | 36 +- composer.json | 1 - example/workerman4.php | 79 ---- example/workerman5.php | 84 ----- phpunit.xml | 36 +- src/Laravel/Coroutine/ContainerMap.php | 36 +- src/Laravel/Driver.php | 207 ++++------- src/Laravel/Events/RequestHandled.php | 50 +-- src/Laravel/Events/RequestReceived.php | 67 ++-- src/Laravel/Events/RequestTerminated.php | 50 +-- src/Laravel/Events/WorkerErrorOccurred.php | 48 +-- .../Listeners/FlushAuthenticationState.php | 37 ++ src/Laravel/Listeners/FlushQueuedCookies.php | 30 ++ src/Laravel/Listeners/FlushSessionState.php | 33 ++ src/Laravel/Provider.php | 36 +- src/Laravel/Response/IteratorResponse.php | 36 +- src/Laravel/Server.php | 343 ++++++++++++++++++ src/Laravel/Tests/routes-test.php | 36 +- src/Laravel/Traits/DispatchesEvents.php | 36 +- src/Laravel/Worker.php | 208 ----------- src/Laravel/config/ripple.php | 36 +- src/ThinkPHP/Coroutine/AppMap.php | 36 +- src/ThinkPHP/Driver.php | 42 +-- src/ThinkPHP/Service.php | 36 +- src/ThinkPHP/Tests/routes-test.php | 36 +- src/ThinkPHP/Worker.php | 36 +- src/Utils/Config.php | 36 +- src/Utils/Console.php | 36 +- src/Utils/Guard.php | 40 +- src/Workerman/Driver4.php | 36 +- src/Workerman/Driver5.php | 40 +- src/Yii2/Application.php | 36 +- src/Yii2/Bootstrap.php | 36 +- src/Yii2/Driver.php | 36 +- src/Yii2/Request.php | 36 +- src/Yii2/RippleController.php | 36 +- src/Yii2/Worker.php | 42 +-- src/helpers.php | 36 +- tests/HttpTest.php | 36 +- 39 files changed, 782 insertions(+), 1411 deletions(-) delete mode 100644 example/workerman4.php delete mode 100644 example/workerman5.php create mode 100644 src/Laravel/Listeners/FlushAuthenticationState.php create mode 100644 src/Laravel/Listeners/FlushQueuedCookies.php create mode 100644 src/Laravel/Listeners/FlushSessionState.php create mode 100644 src/Laravel/Server.php delete mode 100644 src/Laravel/Worker.php diff --git a/.php-cs-fixer.php b/.php-cs-fixer.php index e817c31..93e4394 100644 --- a/.php-cs-fixer.php +++ b/.php-cs-fixer.php @@ -1,35 +1,13 @@ onWorkerStart = function () { - $connect = new AsyncTcpConnection('ssl://www.google.com:443'); - $connect->onConnect = function (TcpConnection $connection) { - \var_dump('Connected'); - $connection->send("GET / HTTP/1.1\r\nHost: www.google.com\r\nConnection: close\r\n\r\n", true); - }; - - $connect->onMessage = function (TcpConnection $connection, $data) { - \var_dump($data); - }; - $connect->connectViaProxy('socks5://127.0.0.1:1080'); -}; - -$worker->onMessage = function ($connection, $data) { - async(function () use ($connection) { - \Co\sleep(3); - $fileContent = \Co\IO::File()->getContents(__FILE__); - $hash = \hash('sha256', $fileContent); - $connection->send("[await] File content hash: {$hash}" . \PHP_EOL); - }); - - //使用原生guzzle实现异步请求 - try { - $response = Guzzle::newClient()->get('https://www.baidu.com/'); - \var_dump($response->getStatusCode()); - $connection->send("[async] Response status code: {$response->getStatusCode()}" . \PHP_EOL); - } catch (Throwable $exception) { - $connection->send("[async] Exception: {$exception->getMessage()}" . \PHP_EOL); - } - $connection->send("say {$data}"); -}; - -Worker::$eventLoopClass = Driver4::class; -Worker::runAll(); diff --git a/example/workerman5.php b/example/workerman5.php deleted file mode 100644 index 746666e..0000000 --- a/example/workerman5.php +++ /dev/null @@ -1,84 +0,0 @@ -onWorkerStart = function () { - $timerId = Timer::add(0.1, function () { - Output::info("memory usage: " . \memory_get_usage()); - }); - - $timerId2 = Timer::add(1, function () { - Output::info("memory usage: " . \memory_get_usage()); - \gc_collect_cycles(); - }); - - delay(function () use ($timerId) { - Timer::del($timerId); - }, 3); -}; - -$worker->onMessage = function ($connection, $data) { - // //方式1 - async(function () use ($connection) { - \Co\sleep(3); - $fileContent = \Co\IO::File()->getContents(__FILE__); - $hash = \hash('sha256', $fileContent); - $connection->send("[await] File content hash: {$hash}" . \PHP_EOL); - }); - - //使用原生guzzle实现异步请求 - try { - $response = Guzzle::newClient()->get('https://www.baidu.com/'); - \var_dump($response->getStatusCode()); - $connection->send("[async] Response status code: {$response->getStatusCode()}" . \PHP_EOL); - } catch (Throwable $exception) { - $connection->send("[async] Exception: {$exception->getMessage()}" . \PHP_EOL); - } - - $connection->send("say {$data}"); -}; - -Worker::$eventLoopClass = Driver5::class; -Worker::runAll(); diff --git a/phpunit.xml b/phpunit.xml index 57c50b6..e2c6c46 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -1,34 +1,12 @@ - manager = app(Manager::class); - $this->controlPipePath = storage_path('ripple.pipe'); - $this->controlLockPath = storage_path('ripple.lock'); } /** * 运行服务 * * @return void - * @throws ConnectionException|UnsupportedFeatureException + * @throws \Revolt\EventLoop\UnsupportedFeatureException */ public function handle(): void { - $zx7e = new Zx7e(); + if (!file_exists(config_path('ripple.php'))) { + Output::warning('Please execute the following command to publish the configuration files first.'); + Output::writeln('php artisan vendor:publish --tag=ripple-config'); + return; + } + + $projectPath = base_path(); + $lock = lock($projectPath); + switch ($this->argument('action')) { case 'start': + if (!$lock->exclusion(false)) { + Output::warning('the server is already running'); + return; + } + $lock->unlock(); + if (!$this->option('daemon')) { $this->start(); wait(); @@ -150,30 +122,32 @@ public function handle(): void } exit(0); case 'stop': - if (!file_exists($this->controlPipePath)) { - Output::warning('The server is not running'); + if ($lock->exclusion(false)) { + Output::warning('the server is not running'); return; } - $controlStream = new Stream(fopen($this->controlPipePath, 'r+')); - $controlStream->write($zx7e->encodeFrame('{"action":"stop"}')); - Output::writeln('The server is stopping'); + $channel = channel($projectPath); + $channel->send('stop'); break; + case 'reload': - if (!file_exists($this->controlPipePath)) { - Output::warning('The server is not running'); + if ($lock->exclusion(false)) { + Output::warning('the server is not running'); return; } - $controlStream = new Stream(fopen($this->controlPipePath, 'r+')); - $controlStream->write($zx7e->encodeFrame('{"action":"reload"}')); - Output::writeln('The server is reloading'); + $channel = channel($projectPath); + $channel->send('reload'); + Output::info('the server is reloading'); break; + case 'status': - if (!file_exists($this->controlPipePath)) { - Output::writeln('The server is not running'); + if ($lock->exclusion(false)) { + Output::writeln('the server is not running'); } else { - Output::writeln('The server is running'); + Output::info('the server is running'); } break; + default: Output::warning('Unsupported operation'); return; @@ -188,69 +162,28 @@ private function start(): void { /*** @compatible:Windows */ if (PHP_OS_FAMILY !== 'Windows') { - onSignal(SIGINT, fn () => $this->stop()); - onSignal(SIGTERM, fn () => $this->stop()); - onSignal(SIGQUIT, fn () => $this->stop()); + onSignal(SIGINT, static fn () => exit(0)); + onSignal(SIGTERM, static fn () => exit(0)); + onSignal(SIGQUIT, static fn () => exit(0)); } - if (!file_exists($this->controlPipePath)) { - /*** @compatible:Windows */ - if (!Kernel::getInstance()->supportProcessControl()) { - touch($this->controlPipePath); - } else { - posix_mkfifo($this->controlPipePath, 0600); - } - } - - if (!file_exists($this->controlLockPath)) { - touch($this->controlLockPath); - } - - $zx7e = new Zx7e(); - $controlStream = new Stream(fopen($this->controlPipePath, 'r+')); - $controlStream->setBlocking(false); - - if (!flock(fopen($this->controlLockPath, 'r+'), LOCK_EX | LOCK_NB)) { - Output::warning('The server is already running'); - exit(0); - } - - $controlStream->onReadable(function (Stream $controlStream) use ($zx7e) { - $content = $controlStream->read(1024); - foreach ($zx7e->decodeStream($content) as $command) { - $command = json_decode($command, true); - $action = $command['action']; - switch ($action) { - case 'stop': - $this->stop(); - break; - case 'reload': - $this->manager->reload(); - break; - case 'status': - break; - } - } - }); - - $listen = Config::get('ripple.HTTP_LISTEN', 'http://127.0.0.1:8008'); - $count = intval(Config::get('ripple.HTTP_WORKERS', 4)); - - $this->manager->addWorker(new Worker($listen, $count)); - $this->manager->run(); - } - - /** - * @Author cclilshy - * @Date 2024/8/19 10:33 - * @return void - */ - private function stop(): void - { - cancelAll(); - $this->manager->stop(); - if (file_exists($this->controlPipePath)) { - unlink($this->controlPipePath); + foreach (['RIP_HTTP_LISTEN', 'RIP_HTTP_WORKERS', 'RIP_HTTP_RELOAD',] as $key) { + $configKey = str_replace('RIP_', 'ripple.', $key); + $configValue = Config::get($configKey); + putenv("{$key}={$configValue}"); } + putenv('RIP_PROJECT_PATH=' . base_path()); + + $session = proc(PHP_BINARY); + $session->onMessage = static function (string $data) { + fwrite(STDOUT, $data); + }; + $session->onErrorMessage = static function (string $data) { + Output::warning($data); + }; + $session->write(file_get_contents(__DIR__ . '/Server.php')); + $session->inputEot(); + $session->onClose = static fn () => exit(0); + wait(); } } diff --git a/src/Laravel/Events/RequestHandled.php b/src/Laravel/Events/RequestHandled.php index bf7ebd4..e61a71e 100644 --- a/src/Laravel/Events/RequestHandled.php +++ b/src/Laravel/Events/RequestHandled.php @@ -1,35 +1,13 @@ make($listener)->handle($this); + } catch (BindingResolutionException $e) { + Output::warning($e->getMessage()); + } + } + ContainerMap::bind($sandbox); } } diff --git a/src/Laravel/Events/RequestTerminated.php b/src/Laravel/Events/RequestTerminated.php index e14e706..6b21160 100644 --- a/src/Laravel/Events/RequestTerminated.php +++ b/src/Laravel/Events/RequestTerminated.php @@ -1,35 +1,13 @@ sandbox->resolved('auth.driver')) { + $event->sandbox->forgetInstance('auth.driver'); + } + + if ($event->sandbox->resolved('auth')) { + with($event->sandbox->make('auth'), function ($auth) use ($event) { + $auth->setApplication($event->sandbox); + $auth->forgetGuards(); + }); + } + } +} diff --git a/src/Laravel/Listeners/FlushQueuedCookies.php b/src/Laravel/Listeners/FlushQueuedCookies.php new file mode 100644 index 0000000..8c1b4aa --- /dev/null +++ b/src/Laravel/Listeners/FlushQueuedCookies.php @@ -0,0 +1,30 @@ +sandbox->resolved('cookie')) { + return; + } + + $event->sandbox->make('cookie')->flushQueuedCookies(); + } +} diff --git a/src/Laravel/Listeners/FlushSessionState.php b/src/Laravel/Listeners/FlushSessionState.php new file mode 100644 index 0000000..461a4ef --- /dev/null +++ b/src/Laravel/Listeners/FlushSessionState.php @@ -0,0 +1,33 @@ +sandbox->resolved('session')) { + return; + } + + $driver = $event->sandbox->make('session')->driver(); + + $driver->flush(); + $driver->regenerate(); + } +} diff --git a/src/Laravel/Provider.php b/src/Laravel/Provider.php index 9ef5aca..b0cd626 100644 --- a/src/Laravel/Provider.php +++ b/src/Laravel/Provider.php @@ -1,35 +1,13 @@ name = 'http-server'; + } + + /** + * @Author cclilshy + * @Date 2024/8/16 23:34 + * + * @param Manager $manager + * + * @return void + * @throws Throwable + */ + public function register(Manager $manager): void + { + /*** output worker*/ + fwrite(STDOUT, $this->formatRow(['Worker', $this->getName()])); + + /*** output env*/ + fwrite(STDOUT, $this->formatRow(["Conf"])); + fwrite(STDOUT, $this->formatRow(["- Listen", $this->address])); + fwrite(STDOUT, $this->formatRow(["- Workers", $this->count])); + fwrite(STDOUT, $this->formatRow(["- Reload", Config::value2string($this->reload, 'bool')])); + + /*** output logs*/ + fwrite(STDOUT, $this->formatRow(["Logs"])); + + /*** initialize*/ + $this->server = new HttpServer($this->address, ['socket' => ['so_reuseport' => 1, 'so_reuseaddr' => 1]]); + if ($this->reload) { + $monitor = File::getInstance()->monitor(); + $monitor->add(RIP_PROJECT_PATH . ('/app')); + $monitor->add(RIP_PROJECT_PATH . ('/bootstrap')); + $monitor->add(RIP_PROJECT_PATH . ('/config')); + $monitor->add(RIP_PROJECT_PATH . ('/routes')); + $monitor->add(RIP_PROJECT_PATH . ('/resources')); + if (file_exists(RIP_PROJECT_PATH . ('/.env'))) { + $monitor->add(RIP_PROJECT_PATH . ('/.env')); + } + Guard::relevance($manager, $this, $monitor); + } + } + + /** + * @Author cclilshy + * @Date 2024/8/17 11:08 + * @return void + */ + public function boot(): void + { + try { + onSignal(SIGINT, function () { + exit(0); + }); + + onSignal(SIGTERM, function () { + exit(0); + }); + + onSignal(SIGQUIT, function () { + exit(0); + }); + } catch (UnsupportedFeatureException $e) { + Output::warning("signal registration failure may cause the program to fail to exit normally: {$e->getMessage()}"); + } + + cli_set_process_title('laravel-worker'); + + $this->application = Server::createApplication(); + + try { + $kernel = $this->application->make(Kernel::class); + } catch (BindingResolutionException $e) { + Output::warning("kernel resolution failed: {$e->getMessage()}"); + exit(1); + } + $kernel->bootstrap(); + + $this->application->loadDeferredProviders(); + foreach (Server::defaultServicesToWarm() as $service) { + try { + $this->application->make($service); + } catch (Throwable $e) { + Output::warning("service warm-up failed: {$service}"); + } + } + + $this->server->onRequest(function (Request $request) { + $laravelRequest = new \Illuminate\Http\Request( + $request->GET, + $request->POST, + [], + $request->COOKIE, + $request->FILES, + $request->SERVER, + $request->CONTENT, + ); + + $application = clone $this->application; + $this->dispatchEvent($application, new RequestReceived($this->application, $application, $laravelRequest)); + + try { + /*** @var Kernel $kernel */ + $kernel = $application->make(Kernel::class); + $laravelResponse = $kernel->handle($laravelRequest); + + /*** handle response*/ + $response = $request->getResponse(); + $response->setStatusCode($laravelResponse->getStatusCode()); + + foreach ($laravelResponse->headers->allPreserveCaseWithoutCookies() as $key => $value) { + $response->withHeader($key, $value); + } + + foreach ($laravelResponse->headers->getCookies() as $cookie) { + $response->withCookie($cookie->getName(), $cookie->__toString()); + } + + if ($laravelResponse instanceof BinaryFileResponse) { + $response->setContent(fopen($laravelResponse->getFile()->getPathname(), 'r+')); + } elseif ($laravelResponse instanceof IteratorResponse) { + $response->setContent($laravelResponse->getIterator()); + } else { + $response->setContent($laravelResponse->getContent()); + } + + $response->respond(); + $this->dispatchEvent($application, new RequestHandled($this->application, $application, $laravelRequest, $laravelResponse)); + + /*** terminate*/ + $kernel->terminate($laravelRequest, $laravelResponse); + $this->dispatchEvent($application, new RequestTerminated($this->application, $application, $laravelRequest, $laravelResponse)); + } catch (Throwable $e) { + $request->respond($e->getMessage(), [], $e->getCode()); + $this->dispatchEvent($application, new WorkerErrorOccurred($this->application, $application, $e)); + } finally { + $application->flush(); + unset($laravelRequest, $laravelResponse, $request, $response, $kernel, $application); + } + }); + + $this->server->listen(); + } + + /*** @return \Illuminate\Foundation\Application */ + public static function createApplication(): Application + { + return Application::configure(basePath: RIP_PROJECT_PATH) + ->withRouting( + web: RIP_PROJECT_PATH . '/routes/web.php', + commands: RIP_PROJECT_PATH . '/routes/console.php', + health: '/up', + ) + ->withMiddleware(function (Middleware $middleware) { + }) + ->withExceptions(function (Exceptions $exceptions) { + // + })->create(); + } + + /** + * @return string[] + */ + public static function defaultServicesToWarm(): array + { + return [ + 'auth', + 'cache', + 'cache.store', + 'config', + 'cookie', + 'db', + 'db.factory', + 'db.transactions', + 'encrypter', + 'files', + 'hash', + 'log', + 'router', + 'routes', + 'session', + 'session.store', + 'translator', + 'url', + 'view', + ]; + } +} + +$lock = lock(RIP_PROJECT_PATH); +if (!$lock->exclusion(false)) { + echo "the server is already running\n"; + exit(0); +} + +cli_set_process_title('laravel-guard'); + +$channel = channel(RIP_PROJECT_PATH, true); +$listen = Config::value2string($env['RIP_HTTP_LISTEN'] ?? 'http://127.0.0.1:8008', 'string'); +$workers = Config::value2string($env['RIP_HTTP_WORKERS'] ?? 4, 'string'); +$reload = Config::value2bool($env['RIP_HTTP_RELOAD'] ?? false); + +$worker = new Server($listen, intval($workers), $reload); +$manager = new Manager(); +$manager->addWorker($worker); +$manager->run(); + +/*** Guardian part*/ +try { + onSignal(SIGINT, function () use ($manager) { + $manager->stop(); + exit(0); + }); + onSignal(SIGTERM, function () use ($manager) { + $manager->stop(); + exit(0); + }); + onSignal(SIGQUIT, function () use ($manager) { + $manager->stop(); + exit(0); + }); + + async(static function () use ($manager, $channel) { + while (1) { + $control = $channel->receive(); + if ($control === 'stop') { + $manager->stop(); + exit(0); + } + + if ($control === 'reload') { + $manager->reload(); + } + } + }); +} catch (UnsupportedFeatureException $e) { + Output::warning("Signal registration failure may cause the program to fail to exit normally: {$e->getMessage()}"); +} + +wait(); diff --git a/src/Laravel/Tests/routes-test.php b/src/Laravel/Tests/routes-test.php index 620878f..b6c8878 100644 --- a/src/Laravel/Tests/routes-test.php +++ b/src/Laravel/Tests/routes-test.php @@ -1,36 +1,14 @@ name = 'http-server'; - $this->count = $count; - } - - /** - * @Author cclilshy - * @Date 2024/8/16 23:34 - * - * @param Manager $manager - * - * @return void - * @throws Throwable - */ - public function register(Manager $manager): void - { - cli_set_process_title('laravel-guard'); - - /*** output worker*/ - fwrite(STDOUT, $this->formatRow(['Worker', $this->getName()])); - - /*** output env*/ - fwrite(STDOUT, $this->formatRow(["- Conf"])); - foreach (Driver::DECLARE_OPTIONS as $key => $type) { - fwrite(STDOUT, $this->formatRow([ - $key, - \Ripple\Driver\Utils\Config::value2string(Config::get("ripple.{$key}"), $type), - ])); - } - - /*** output logs*/ - fwrite(STDOUT, $this->formatRow(["- Logs"])); - - $server = new Server($this->address, [ - 'socket' => [ - 'so_reuseport' => 1, - 'so_reuseaddr' => 1 - ] - ]); - - $this->server = $server; - $this->application = Application::getInstance(); - - if (\Ripple\Driver\Utils\Config::value2bool(Config::get('ripple.HTTP_RELOAD'))) { - $monitor = File::getInstance()->monitor(); - $monitor->add(base_path('app')); - $monitor->add(base_path('bootstrap')); - $monitor->add(base_path('config')); - $monitor->add(base_path('routes')); - $monitor->add(base_path('resources')); - if (file_exists(base_path('.env'))) { - $monitor->add(base_path('.env')); - } - - Guard::relevance($manager, $this, $monitor); - } - } - - /** - * @Author cclilshy - * @Date 2024/8/17 11:08 - * @return void - */ - public function boot(): void - { - cli_set_process_title('laravel-worker'); - - $this->application->bind(Worker::class, fn () => $this); - - $this->server->onRequest(function ( - Request $request - ) { - $application = clone $this->application; - /*** @var Kernel $kernel */ - $kernel = $application->make(Kernel::class); - - $laravelRequest = new \Illuminate\Http\Request( - $request->GET, - $request->POST, - [], - $request->COOKIE, - $request->FILES, - $request->SERVER, - $request->CONTENT, - ); - $laravelRequest->attributes->set('ripple.request', $request); - $this->dispatchEvent($application, new RequestReceived($this->application, $application, $laravelRequest)); - - try { - $laravelResponse = $kernel->handle($laravelRequest); - $response = $request->getResponse(); - $response->setStatusCode($laravelResponse->getStatusCode()); - - foreach ($laravelResponse->headers->allPreserveCaseWithoutCookies() as $key => $value) { - $response->withHeader($key, $value); - } - - foreach ($laravelResponse->headers->getCookies() as $cookie) { - $response->withCookie($cookie->getName(), $cookie->__toString()); - } - - if ($laravelResponse instanceof BinaryFileResponse) { - $response->setContent(fopen($laravelResponse->getFile()->getPathname(), 'r+')); - } elseif ($laravelResponse instanceof IteratorResponse) { - $response->setContent($laravelResponse->getIterator()); - } else { - $response->setContent($laravelResponse->getContent()); - } - - $response->respond(); - $this->dispatchEvent($application, new RequestHandled($this->application, $application, $laravelRequest, $laravelResponse)); - - $kernel->terminate($laravelRequest, $laravelResponse); - $this->dispatchEvent($application, new RequestTerminated($this->application, $application, $laravelRequest, $laravelResponse)); - } catch (Throwable $e) { - $this->dispatchEvent($application, new WorkerErrorOccurred($this->application, $application, $e)); - } finally { - unset($laravelRequest, $response, $laravelResponse, $kernel); - unset($application); - } - }); - $this->server->listen(); - } -} diff --git a/src/Laravel/config/ripple.php b/src/Laravel/config/ripple.php index fe434d0..d7608fd 100644 --- a/src/Laravel/config/ripple.php +++ b/src/Laravel/config/ripple.php @@ -1,36 +1,14 @@ eventTimer[$timerId]); $func(...$args); }; - $cbId = delay($closure, $delay); + $cbId = delay($closure, $delay); $this->eventTimer[$timerId] = $cbId; return $timerId; } @@ -146,7 +124,7 @@ public function delay(float $delay, callable $func, array $args = []): int public function repeat(float $interval, callable $func, array $args = []): int { $timerId = $this->timerId++; - $cbId = repeat(static fn () => $func(...$args), $interval); + $cbId = repeat(static fn () => $func(...$args), $interval); $this->eventTimer[$timerId] = $cbId; return $timerId; } diff --git a/src/Yii2/Application.php b/src/Yii2/Application.php index 804d392..292ba42 100644 --- a/src/Yii2/Application.php +++ b/src/Yii2/Application.php @@ -1,35 +1,13 @@