diff --git a/cgi.go b/cgi.go index e9bb736ad..b41638762 100644 --- a/cgi.go +++ b/cgi.go @@ -227,8 +227,6 @@ func go_frankenphp_release_known_variable_keys(threadIndex C.uintptr_t) { for _, v := range thread.knownVariableKeys { C.frankenphp_release_zend_string(v) } - // release everything that might still be pinned to the thread - thread.Unpin() thread.knownVariableKeys = nil } diff --git a/frankenphp.c b/frankenphp.c index a19a56dab..e0e5095c4 100644 --- a/frankenphp.c +++ b/frankenphp.c @@ -89,7 +89,7 @@ static void frankenphp_free_request_context() { free(ctx->cookie_data); ctx->cookie_data = NULL; - /* Is freed via thread.Unpin() at the end of each request */ + /* Is freed via thread.Unpin() */ SG(request_info).auth_password = NULL; SG(request_info).auth_user = NULL; SG(request_info).request_method = NULL; @@ -243,7 +243,7 @@ PHP_FUNCTION(frankenphp_finish_request) { /* {{{ */ php_header(); if (ctx->has_active_request) { - go_frankenphp_finish_request(thread_index, false); + go_frankenphp_finish_php_request(thread_index); } ctx->finished = true; @@ -443,7 +443,7 @@ PHP_FUNCTION(frankenphp_handle_request) { frankenphp_worker_request_shutdown(); ctx->has_active_request = false; - go_frankenphp_finish_request(thread_index, true); + go_frankenphp_finish_worker_request(thread_index); RETURN_TRUE; } @@ -811,9 +811,9 @@ static void set_thread_name(char *thread_name) { } static void *php_thread(void *arg) { - char thread_name[16] = {0}; - snprintf(thread_name, 16, "php-%" PRIxPTR, (uintptr_t)arg); thread_index = (uintptr_t)arg; + char thread_name[16] = {0}; + snprintf(thread_name, 16, "php-%" PRIxPTR, thread_index); set_thread_name(thread_name); #ifdef ZTS @@ -832,7 +832,11 @@ static void *php_thread(void *arg) { cfg_get_string("filter.default", &default_filter); should_filter_var = default_filter != NULL; - while (go_handle_request(thread_index)) { + // loop until Go signals to stop + char *scriptName = NULL; + while ((scriptName = go_frankenphp_before_script_execution(thread_index))) { + go_frankenphp_after_script_execution(thread_index, + frankenphp_execute_script(scriptName)); } go_frankenphp_release_known_variable_keys(thread_index); @@ -841,6 +845,8 @@ static void *php_thread(void *arg) { ts_free_thread(); #endif + go_frankenphp_on_thread_shutdown(thread_index); + return NULL; } @@ -858,13 +864,11 @@ static void *php_main(void *arg) { exit(EXIT_FAILURE); } - intptr_t num_threads = (intptr_t)arg; - set_thread_name("php-main"); #ifdef ZTS #if (PHP_VERSION_ID >= 80300) - php_tsrm_startup_ex(num_threads); + php_tsrm_startup_ex((intptr_t)arg); #else php_tsrm_startup(); #endif @@ -892,28 +896,7 @@ static void *php_main(void *arg) { frankenphp_sapi_module.startup(&frankenphp_sapi_module); - pthread_t *threads = malloc(num_threads * sizeof(pthread_t)); - if (threads == NULL) { - perror("malloc failed"); - exit(EXIT_FAILURE); - } - - for (uintptr_t i = 0; i < num_threads; i++) { - if (pthread_create(&(*(threads + i)), NULL, &php_thread, (void *)i) != 0) { - perror("failed to create PHP thread"); - free(threads); - exit(EXIT_FAILURE); - } - } - - for (int i = 0; i < num_threads; i++) { - if (pthread_join((*(threads + i)), NULL) != 0) { - perror("failed to join PHP thread"); - free(threads); - exit(EXIT_FAILURE); - } - } - free(threads); + go_frankenphp_main_thread_is_ready(); /* channel closed, shutdown gracefully */ frankenphp_sapi_module.shutdown(&frankenphp_sapi_module); @@ -929,25 +912,30 @@ static void *php_main(void *arg) { frankenphp_sapi_module.ini_entries = NULL; } #endif - - go_shutdown(); - + go_frankenphp_shutdown_main_thread(); return NULL; } -int frankenphp_init(int num_threads) { +int frankenphp_new_main_thread(int num_threads) { pthread_t thread; if (pthread_create(&thread, NULL, &php_main, (void *)(intptr_t)num_threads) != 0) { - go_shutdown(); - return -1; } return pthread_detach(thread); } +bool frankenphp_new_php_thread(uintptr_t thread_index) { + pthread_t thread; + if (pthread_create(&thread, NULL, &php_thread, (void *)thread_index) != 0) { + return false; + } + pthread_detach(thread); + return true; +} + int frankenphp_request_startup() { if (php_request_startup() == SUCCESS) { return SUCCESS; @@ -960,8 +948,6 @@ int frankenphp_request_startup() { int frankenphp_execute_script(char *file_name) { if (frankenphp_request_startup() == FAILURE) { - free(file_name); - file_name = NULL; return FAILURE; } @@ -970,8 +956,6 @@ int frankenphp_execute_script(char *file_name) { zend_file_handle file_handle; zend_stream_init_filename(&file_handle, file_name); - free(file_name); - file_name = NULL; file_handle.primary_script = 1; diff --git a/frankenphp.go b/frankenphp.go index 4f3544e4c..809e4af7d 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -64,8 +64,6 @@ var ( ScriptExecutionError = errors.New("error during PHP script execution") requestChan chan *http.Request - done chan struct{} - shutdownWG sync.WaitGroup loggerMu sync.RWMutex logger *zap.Logger @@ -123,7 +121,7 @@ type FrankenPHPContext struct { closed sync.Once responseWriter http.ResponseWriter - exitStatus C.int + exitStatus int done chan interface{} startedAt time.Time @@ -244,7 +242,7 @@ func Config() PHPConfig { // MaxThreads is internally used during tests. It is written to, but never read and may go away in the future. var MaxThreads int -func calculateMaxThreads(opt *opt) error { +func calculateMaxThreads(opt *opt) (int, int, error) { maxProcs := runtime.GOMAXPROCS(0) * 2 var numWorkers int @@ -266,13 +264,13 @@ func calculateMaxThreads(opt *opt) error { opt.numThreads = maxProcs } } else if opt.numThreads <= numWorkers { - return NotEnoughThreads + return opt.numThreads, numWorkers, NotEnoughThreads } metrics.TotalThreads(opt.numThreads) MaxThreads = opt.numThreads - return nil + return opt.numThreads, numWorkers, nil } // Init starts the PHP runtime and the configured workers. @@ -311,7 +309,7 @@ func Init(options ...Option) error { metrics = opt.metrics } - err := calculateMaxThreads(opt) + totalThreadCount, workerThreadCount, err := calculateMaxThreads(opt) if err != nil { return err } @@ -327,29 +325,26 @@ func Init(options ...Option) error { logger.Warn(`Zend Max Execution Timers are not enabled, timeouts (e.g. "max_execution_time") are disabled, recompile PHP with the "--enable-zend-max-execution-timers" configuration option to fix this issue`) } } else { - opt.numThreads = 1 + totalThreadCount = 1 logger.Warn(`ZTS is not enabled, only 1 thread will be available, recompile PHP using the "--enable-zts" configuration option or performance will be degraded`) } - shutdownWG.Add(1) - done = make(chan struct{}) requestChan = make(chan *http.Request, opt.numThreads) - initPHPThreads(opt.numThreads) - - if C.frankenphp_init(C.int(opt.numThreads)) != 0 { - return MainThreadCreationError + if err := initPHPThreads(totalThreadCount); err != nil { + return err } - if err := initWorkers(opt.workers); err != nil { - return err + for i := 0; i < totalThreadCount-workerThreadCount; i++ { + thread := getInactivePHPThread() + convertToRegularThread(thread) } - if err := restartWorkersOnFileChanges(opt.workers); err != nil { + if err := initWorkers(opt.workers); err != nil { return err } if c := logger.Check(zapcore.InfoLevel, "FrankenPHP started 🐘"); c != nil { - c.Write(zap.String("php_version", Version().Version), zap.Int("num_threads", opt.numThreads)) + c.Write(zap.String("php_version", Version().Version), zap.Int("num_threads", totalThreadCount)) } if EmbeddedAppPath != "" { if c := logger.Check(zapcore.InfoLevel, "embedded PHP app 📦"); c != nil { @@ -363,7 +358,7 @@ func Init(options ...Option) error { // Shutdown stops the workers and the PHP runtime. func Shutdown() { drainWorkers() - drainThreads() + drainPHPThreads() metrics.Shutdown() requestChan = nil @@ -375,17 +370,6 @@ func Shutdown() { logger.Debug("FrankenPHP shut down") } -//export go_shutdown -func go_shutdown() { - shutdownWG.Done() -} - -func drainThreads() { - close(done) - shutdownWG.Wait() - phpThreads = nil -} - func getLogger() *zap.Logger { loggerMu.RLock() defer loggerMu.RUnlock() @@ -466,9 +450,6 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error return nil } - shutdownWG.Add(1) - defer shutdownWG.Done() - fc, ok := FromContext(request.Context()) if !ok { return InvalidRequestError @@ -477,76 +458,25 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error fc.responseWriter = responseWriter fc.startedAt = time.Now() - isWorker := fc.responseWriter == nil - // Detect if a worker is available to handle this request - if !isWorker { - if worker, ok := workers[fc.scriptFilename]; ok { - metrics.StartWorkerRequest(fc.scriptFilename) - worker.handleRequest(request) - <-fc.done - metrics.StopWorkerRequest(fc.scriptFilename, time.Since(fc.startedAt)) - return nil - } else { - metrics.StartRequest() - } + if worker, ok := workers[fc.scriptFilename]; ok { + worker.handleRequest(request, fc) + return nil } + metrics.StartRequest() + select { - case <-done: + case <-mainThread.done: case requestChan <- request: <-fc.done } - if !isWorker { - metrics.StopRequest() - } + metrics.StopRequest() return nil } -//export go_handle_request -func go_handle_request(threadIndex C.uintptr_t) bool { - select { - case <-done: - return false - - case r := <-requestChan: - thread := phpThreads[threadIndex] - thread.mainRequest = r - - fc, ok := FromContext(r.Context()) - if !ok { - panic(InvalidRequestError) - } - defer func() { - maybeCloseContext(fc) - thread.mainRequest = nil - thread.Unpin() - }() - - if err := updateServerContext(thread, r, true, false); err != nil { - rejectRequest(fc.responseWriter, err.Error()) - return true - } - - // scriptFilename is freed in frankenphp_execute_script() - fc.exitStatus = C.frankenphp_execute_script(C.CString(fc.scriptFilename)) - if fc.exitStatus < 0 { - panic(ScriptExecutionError) - } - - // if the script has errored or timed out, make sure any pending worker requests are closed - if fc.exitStatus > 0 && thread.workerRequest != nil { - fc := thread.workerRequest.Context().Value(contextKey).(*FrankenPHPContext) - maybeCloseContext(fc) - thread.workerRequest = nil - } - - return true - } -} - func maybeCloseContext(fc *FrankenPHPContext) { fc.closed.Do(func() { close(fc.done) @@ -598,7 +528,7 @@ func go_apache_request_headers(threadIndex C.uintptr_t, hasActiveRequest bool) ( if !hasActiveRequest { // worker mode, not handling a request - mfc := thread.mainRequest.Context().Value(contextKey).(*FrankenPHPContext) + mfc := thread.getActiveRequest().Context().Value(contextKey).(*FrankenPHPContext) if c := mfc.logger.Check(zapcore.DebugLevel, "apache_request_headers() called in non-HTTP context"); c != nil { c.Write(zap.String("worker", mfc.scriptFilename)) @@ -784,21 +714,11 @@ func freeArgs(argv []*C.char) { } } -func executePHPFunction(functionName string) { +func executePHPFunction(functionName string) bool { cFunctionName := C.CString(functionName) defer C.free(unsafe.Pointer(cFunctionName)) - success := C.frankenphp_execute_php_function(cFunctionName) - - if success == 1 { - if c := logger.Check(zapcore.DebugLevel, "php function call successful"); c != nil { - c.Write(zap.String("function", functionName)) - } - } else { - if c := logger.Check(zapcore.ErrorLevel, "php function call failed"); c != nil { - c.Write(zap.String("function", functionName)) - } - } + return C.frankenphp_execute_php_function(cFunctionName) == 1 } // Ensure that the request path does not contain null bytes diff --git a/frankenphp.h b/frankenphp.h index 41a5a2124..5e498b6c7 100644 --- a/frankenphp.h +++ b/frankenphp.h @@ -40,7 +40,8 @@ typedef struct frankenphp_config { } frankenphp_config; frankenphp_config frankenphp_get_config(); -int frankenphp_init(int num_threads); +int frankenphp_new_main_thread(int num_threads); +bool frankenphp_new_php_thread(uintptr_t thread_index); int frankenphp_update_server_context( bool create, bool has_main_request, bool has_active_request, diff --git a/frankenphp_arginfo.h b/frankenphp_arginfo.h index ec97502e7..c1bd7b550 100644 --- a/frankenphp_arginfo.h +++ b/frankenphp_arginfo.h @@ -36,22 +36,17 @@ ZEND_FUNCTION(frankenphp_finish_request); ZEND_FUNCTION(frankenphp_request_headers); ZEND_FUNCTION(frankenphp_response_headers); +// clang-format off static const zend_function_entry ext_functions[] = { - ZEND_FE(frankenphp_handle_request, arginfo_frankenphp_handle_request) - ZEND_FE(headers_send, arginfo_headers_send) ZEND_FE( - frankenphp_finish_request, arginfo_frankenphp_finish_request) - ZEND_FALIAS(fastcgi_finish_request, frankenphp_finish_request, - arginfo_fastcgi_finish_request) - ZEND_FE(frankenphp_request_headers, - arginfo_frankenphp_request_headers) - ZEND_FALIAS(apache_request_headers, - frankenphp_request_headers, - arginfo_apache_request_headers) - ZEND_FALIAS(getallheaders, frankenphp_request_headers, - arginfo_getallheaders) - ZEND_FE(frankenphp_response_headers, - arginfo_frankenphp_response_headers) - ZEND_FALIAS(apache_response_headers, - frankenphp_response_headers, - arginfo_apache_response_headers) - ZEND_FE_END}; + ZEND_FE(frankenphp_handle_request, arginfo_frankenphp_handle_request) + ZEND_FE(headers_send, arginfo_headers_send) + ZEND_FE(frankenphp_finish_request, arginfo_frankenphp_finish_request) + ZEND_FALIAS(fastcgi_finish_request, frankenphp_finish_request, arginfo_fastcgi_finish_request) + ZEND_FE(frankenphp_request_headers, arginfo_frankenphp_request_headers) + ZEND_FALIAS(apache_request_headers, frankenphp_request_headers, arginfo_apache_request_headers) + ZEND_FALIAS(getallheaders, frankenphp_request_headers, arginfo_getallheaders) + ZEND_FE(frankenphp_response_headers, arginfo_frankenphp_response_headers) + ZEND_FALIAS(apache_response_headers, frankenphp_response_headers, arginfo_apache_response_headers) + ZEND_FE_END +}; +// clang-format on diff --git a/frankenphp_test.go b/frankenphp_test.go index 3e3c5f01c..f87e43009 100644 --- a/frankenphp_test.go +++ b/frankenphp_test.go @@ -592,6 +592,23 @@ func testFiberNoCgo(t *testing.T, opts *testOptions) { }, opts) } +func TestFiberBasic_module(t *testing.T) { testFiberBasic(t, &testOptions{}) } +func TestFiberBasic_worker(t *testing.T) { + testFiberBasic(t, &testOptions{workerScript: "fiber-basic.php"}) +} +func testFiberBasic(t *testing.T, opts *testOptions) { + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, i int) { + req := httptest.NewRequest("GET", fmt.Sprintf("http://example.com/fiber-basic.php?i=%d", i), nil) + w := httptest.NewRecorder() + handler(w, req) + + resp := w.Result() + body, _ := io.ReadAll(resp.Body) + + assert.Equal(t, string(body), fmt.Sprintf("Fiber %d", i)) + }, opts) +} + func TestRequestHeaders_module(t *testing.T) { testRequestHeaders(t, &testOptions{}) } func TestRequestHeaders_worker(t *testing.T) { testRequestHeaders(t, &testOptions{workerScript: "request-headers.php"}) diff --git a/phpmainthread.go b/phpmainthread.go new file mode 100644 index 000000000..5561cbd77 --- /dev/null +++ b/phpmainthread.go @@ -0,0 +1,108 @@ +package frankenphp + +// #include "frankenphp.h" +import "C" +import ( + "sync" + + "go.uber.org/zap" +) + +// represents the main PHP thread +// the thread needs to keep running as long as all other threads are running +type phpMainThread struct { + state *threadState + done chan struct{} + numThreads int +} + +var ( + phpThreads []*phpThread + mainThread *phpMainThread +) + +// reserve a fixed number of PHP threads on the Go side +func initPHPThreads(numThreads int) error { + mainThread = &phpMainThread{ + state: newThreadState(), + done: make(chan struct{}), + numThreads: numThreads, + } + phpThreads = make([]*phpThread, numThreads) + + if err := mainThread.start(); err != nil { + return err + } + + // initialize all threads as inactive + for i := 0; i < numThreads; i++ { + phpThreads[i] = newPHPThread(i) + convertToInactiveThread(phpThreads[i]) + } + + // start the underlying C threads + ready := sync.WaitGroup{} + ready.Add(numThreads) + for _, thread := range phpThreads { + go func() { + if !C.frankenphp_new_php_thread(C.uintptr_t(thread.threadIndex)) { + logger.Panic("unable to create thread", zap.Int("threadIndex", thread.threadIndex)) + } + thread.state.waitFor(stateInactive) + ready.Done() + }() + } + ready.Wait() + + return nil +} + +func drainPHPThreads() { + doneWG := sync.WaitGroup{} + doneWG.Add(len(phpThreads)) + for _, thread := range phpThreads { + thread.handlerMu.Lock() + _ = thread.state.requestSafeStateChange(stateShuttingDown) + close(thread.drainChan) + } + close(mainThread.done) + for _, thread := range phpThreads { + go func(thread *phpThread) { + thread.state.waitFor(stateDone) + thread.handlerMu.Unlock() + doneWG.Done() + }(thread) + } + doneWG.Wait() + mainThread.state.set(stateShuttingDown) + mainThread.state.waitFor(stateDone) + phpThreads = nil +} + +func (mainThread *phpMainThread) start() error { + if C.frankenphp_new_main_thread(C.int(mainThread.numThreads)) != 0 { + return MainThreadCreationError + } + mainThread.state.waitFor(stateReady) + return nil +} + +func getInactivePHPThread() *phpThread { + for _, thread := range phpThreads { + if thread.state.is(stateInactive) { + return thread + } + } + panic("not enough threads reserved") +} + +//export go_frankenphp_main_thread_is_ready +func go_frankenphp_main_thread_is_ready() { + mainThread.state.set(stateReady) + mainThread.state.waitFor(stateShuttingDown) +} + +//export go_frankenphp_shutdown_main_thread +func go_frankenphp_shutdown_main_thread() { + mainThread.state.set(stateDone) +} diff --git a/phpmainthread_test.go b/phpmainthread_test.go new file mode 100644 index 000000000..6d0cf0f60 --- /dev/null +++ b/phpmainthread_test.go @@ -0,0 +1,161 @@ +package frankenphp + +import ( + "io" + "math/rand/v2" + "net/http/httptest" + "path/filepath" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +var testDataPath, _ = filepath.Abs("./testdata") + +func TestStartAndStopTheMainThreadWithOneInactiveThread(t *testing.T) { + logger = zap.NewNop() // the logger needs to not be nil + assert.NoError(t, initPHPThreads(1)) // reserve 1 thread + + assert.Len(t, phpThreads, 1) + assert.Equal(t, 0, phpThreads[0].threadIndex) + assert.True(t, phpThreads[0].state.is(stateInactive)) + + drainPHPThreads() + assert.Nil(t, phpThreads) +} + +func TestTransitionRegularThreadToWorkerThread(t *testing.T) { + logger = zap.NewNop() + assert.NoError(t, initPHPThreads(1)) + + // transition to regular thread + convertToRegularThread(phpThreads[0]) + assert.IsType(t, ®ularThread{}, phpThreads[0].handler) + + // transition to worker thread + worker := getDummyWorker("transition-worker-1.php") + convertToWorkerThread(phpThreads[0], worker) + assert.IsType(t, &workerThread{}, phpThreads[0].handler) + assert.Len(t, worker.threads, 1) + + // transition back to inactive thread + convertToInactiveThread(phpThreads[0]) + assert.IsType(t, &inactiveThread{}, phpThreads[0].handler) + assert.Len(t, worker.threads, 0) + + drainPHPThreads() + assert.Nil(t, phpThreads) +} + +func TestTransitionAThreadBetween2DifferentWorkers(t *testing.T) { + logger = zap.NewNop() + assert.NoError(t, initPHPThreads(1)) + firstWorker := getDummyWorker("transition-worker-1.php") + secondWorker := getDummyWorker("transition-worker-2.php") + + // convert to first worker thread + convertToWorkerThread(phpThreads[0], firstWorker) + firstHandler := phpThreads[0].handler.(*workerThread) + assert.Same(t, firstWorker, firstHandler.worker) + assert.Len(t, firstWorker.threads, 1) + assert.Len(t, secondWorker.threads, 0) + + // convert to second worker thread + convertToWorkerThread(phpThreads[0], secondWorker) + secondHandler := phpThreads[0].handler.(*workerThread) + assert.Same(t, secondWorker, secondHandler.worker) + assert.Len(t, firstWorker.threads, 0) + assert.Len(t, secondWorker.threads, 1) + + drainPHPThreads() + assert.Nil(t, phpThreads) +} + +func TestTransitionThreadsWhileDoingRequests(t *testing.T) { + numThreads := 10 + numRequestsPerThread := 100 + isRunning := atomic.Bool{} + isRunning.Store(true) + wg := sync.WaitGroup{} + worker1Path := testDataPath + "/transition-worker-1.php" + worker2Path := testDataPath + "/transition-worker-2.php" + + assert.NoError(t, Init( + WithNumThreads(numThreads), + WithWorkers(worker1Path, 1, map[string]string{"ENV1": "foo"}, []string{}), + WithWorkers(worker2Path, 1, map[string]string{"ENV1": "foo"}, []string{}), + WithLogger(zap.NewNop()), + )) + + // randomly transition threads between regular, inactive and 2 worker threads + go func() { + for { + for i := 0; i < numThreads; i++ { + switch rand.IntN(4) { + case 0: + convertToRegularThread(phpThreads[i]) + case 1: + convertToWorkerThread(phpThreads[i], workers[worker1Path]) + case 2: + convertToWorkerThread(phpThreads[i], workers[worker2Path]) + case 3: + convertToInactiveThread(phpThreads[i]) + } + time.Sleep(time.Millisecond) + if !isRunning.Load() { + return + } + } + } + }() + + // randomly do requests to the 3 endpoints + wg.Add(numThreads) + for i := 0; i < numThreads; i++ { + go func(i int) { + for j := 0; j < numRequestsPerThread; j++ { + switch rand.IntN(3) { + case 0: + assertRequestBody(t, "http://localhost/transition-worker-1.php", "Hello from worker 1") + case 1: + assertRequestBody(t, "http://localhost/transition-worker-2.php", "Hello from worker 2") + case 2: + assertRequestBody(t, "http://localhost/transition-regular.php", "Hello from regular thread") + } + } + wg.Done() + }(i) + } + + wg.Wait() + isRunning.Store(false) + Shutdown() +} + +func getDummyWorker(fileName string) *worker { + if workers == nil { + workers = make(map[string]*worker) + } + worker, _ := newWorker(workerOpt{ + fileName: testDataPath + "/" + fileName, + num: 1, + }) + return worker +} + +func assertRequestBody(t *testing.T, url string, expected string) { + r := httptest.NewRequest("GET", url, nil) + w := httptest.NewRecorder() + + req, err := NewRequestWithContext(r, WithRequestDocumentRoot(testDataPath, false)) + assert.NoError(t, err) + err = ServeHTTP(w, req) + assert.NoError(t, err) + resp := w.Result() + body, _ := io.ReadAll(resp.Body) + assert.Equal(t, expected, string(body)) +} diff --git a/phpthread.go b/phpthread.go index 9a81761fb..eabc58a98 100644 --- a/phpthread.go +++ b/phpthread.go @@ -1,7 +1,6 @@ package frankenphp -// #include -// #include +// #include "frankenphp.h" import "C" import ( "net/http" @@ -10,32 +9,65 @@ import ( "unsafe" ) -var phpThreads []*phpThread - +// representation of the actual underlying PHP thread +// identified by the index in the phpThreads slice type phpThread struct { runtime.Pinner - mainRequest *http.Request - workerRequest *http.Request - worker *worker - requestChan chan *http.Request + threadIndex int knownVariableKeys map[string]*C.zend_string - readiedOnce sync.Once + requestChan chan *http.Request + drainChan chan struct{} + handlerMu *sync.Mutex + handler threadHandler + state *threadState +} + +// interface that defines how the callbacks from the C thread should be handled +type threadHandler interface { + beforeScriptExecution() string + afterScriptExecution(exitStatus int) + getActiveRequest() *http.Request } -func initPHPThreads(numThreads int) { - phpThreads = make([]*phpThread, 0, numThreads) - for i := 0; i < numThreads; i++ { - phpThreads = append(phpThreads, &phpThread{}) +func newPHPThread(threadIndex int) *phpThread { + return &phpThread{ + threadIndex: threadIndex, + drainChan: make(chan struct{}), + requestChan: make(chan *http.Request), + handlerMu: &sync.Mutex{}, + state: newThreadState(), } } -func (thread *phpThread) getActiveRequest() *http.Request { - if thread.workerRequest != nil { - return thread.workerRequest +// change the thread handler safely +// must be called from outside of the PHP thread +func (thread *phpThread) setHandler(handler threadHandler) { + logger.Debug("setHandler") + thread.handlerMu.Lock() + defer thread.handlerMu.Unlock() + if !thread.state.requestSafeStateChange(stateTransitionRequested) { + // no state change allowed == shutdown + return } + close(thread.drainChan) + thread.state.waitFor(stateTransitionInProgress) + thread.handler = handler + thread.drainChan = make(chan struct{}) + thread.state.set(stateTransitionComplete) +} - return thread.mainRequest +// transition to a new handler safely +// is triggered by setHandler and executed on the PHP thread +func (thread *phpThread) transitionToNewHandler() string { + thread.state.set(stateTransitionInProgress) + thread.state.waitFor(stateTransitionComplete) + // execute beforeScriptExecution of the new handler + return thread.handler.beforeScriptExecution() +} + +func (thread *phpThread) getActiveRequest() *http.Request { + return thread.handler.getActiveRequest() } // Pin a string that is not null-terminated @@ -50,3 +82,34 @@ func (thread *phpThread) pinString(s string) *C.char { func (thread *phpThread) pinCString(s string) *C.char { return thread.pinString(s + "\x00") } + +//export go_frankenphp_before_script_execution +func go_frankenphp_before_script_execution(threadIndex C.uintptr_t) *C.char { + thread := phpThreads[threadIndex] + scriptName := thread.handler.beforeScriptExecution() + + // if no scriptName is passed, shut down + if scriptName == "" { + return nil + } + // return the name of the PHP script that should be executed + return thread.pinCString(scriptName) +} + +//export go_frankenphp_after_script_execution +func go_frankenphp_after_script_execution(threadIndex C.uintptr_t, exitStatus C.int) { + thread := phpThreads[threadIndex] + if exitStatus < 0 { + panic(ScriptExecutionError) + } + thread.handler.afterScriptExecution(int(exitStatus)) + + // unpin all memory used during script execution + thread.Unpin() +} + +//export go_frankenphp_on_thread_shutdown +func go_frankenphp_on_thread_shutdown(threadIndex C.uintptr_t) { + phpThreads[threadIndex].Unpin() + phpThreads[threadIndex].state.set(stateDone) +} diff --git a/phpthread_test.go b/phpthread_test.go deleted file mode 100644 index 63afe4d89..000000000 --- a/phpthread_test.go +++ /dev/null @@ -1,40 +0,0 @@ -package frankenphp - -import ( - "net/http" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestInitializeTwoPhpThreadsWithoutRequests(t *testing.T) { - initPHPThreads(2) - - assert.Len(t, phpThreads, 2) - assert.NotNil(t, phpThreads[0]) - assert.NotNil(t, phpThreads[1]) - assert.Nil(t, phpThreads[0].mainRequest) - assert.Nil(t, phpThreads[0].workerRequest) -} - -func TestMainRequestIsActiveRequest(t *testing.T) { - mainRequest := &http.Request{} - initPHPThreads(1) - thread := phpThreads[0] - - thread.mainRequest = mainRequest - - assert.Equal(t, mainRequest, thread.getActiveRequest()) -} - -func TestWorkerRequestIsActiveRequest(t *testing.T) { - mainRequest := &http.Request{} - workerRequest := &http.Request{} - initPHPThreads(1) - thread := phpThreads[0] - - thread.mainRequest = mainRequest - thread.workerRequest = workerRequest - - assert.Equal(t, workerRequest, thread.getActiveRequest()) -} diff --git a/state.go b/state.go new file mode 100644 index 000000000..001213282 --- /dev/null +++ b/state.go @@ -0,0 +1,142 @@ +package frankenphp + +import ( + "slices" + "strconv" + "sync" +) + +type stateID uint8 + +const ( + // lifecycle states of a thread + stateBooting stateID = iota + stateShuttingDown + stateDone + + // these states are safe to transition from at any time + stateInactive + stateReady + + // states necessary for restarting workers + stateRestarting + stateYielding + + // states necessary for transitioning between different handlers + stateTransitionRequested + stateTransitionInProgress + stateTransitionComplete +) + +type threadState struct { + currentState stateID + mu sync.RWMutex + subscribers []stateSubscriber +} + +type stateSubscriber struct { + states []stateID + ch chan struct{} +} + +func newThreadState() *threadState { + return &threadState{ + currentState: stateBooting, + subscribers: []stateSubscriber{}, + mu: sync.RWMutex{}, + } +} + +func (ts *threadState) is(state stateID) bool { + ts.mu.RLock() + ok := ts.currentState == state + ts.mu.RUnlock() + + return ok +} + +func (ts *threadState) compareAndSwap(compareTo stateID, swapTo stateID) bool { + ts.mu.Lock() + ok := ts.currentState == compareTo + if ok { + ts.currentState = swapTo + ts.notifySubscribers(swapTo) + } + ts.mu.Unlock() + + return ok +} + +func (ts *threadState) name() string { + // TODO: return the actual name for logging/metrics + return "state:" + strconv.Itoa(int(ts.get())) +} + +func (ts *threadState) get() stateID { + ts.mu.RLock() + id := ts.currentState + ts.mu.RUnlock() + + return id +} + +func (ts *threadState) set(nextState stateID) { + ts.mu.Lock() + ts.currentState = nextState + ts.notifySubscribers(nextState) + ts.mu.Unlock() +} + +func (ts *threadState) notifySubscribers(nextState stateID) { + if len(ts.subscribers) == 0 { + return + } + newSubscribers := []stateSubscriber{} + // notify subscribers to the state change + for _, sub := range ts.subscribers { + if !slices.Contains(sub.states, nextState) { + newSubscribers = append(newSubscribers, sub) + continue + } + close(sub.ch) + } + ts.subscribers = newSubscribers +} + +// block until the thread reaches a certain state +func (ts *threadState) waitFor(states ...stateID) { + ts.mu.Lock() + if slices.Contains(states, ts.currentState) { + ts.mu.Unlock() + return + } + sub := stateSubscriber{ + states: states, + ch: make(chan struct{}), + } + ts.subscribers = append(ts.subscribers, sub) + ts.mu.Unlock() + <-sub.ch +} + +// safely request a state change from a different goroutine +func (ts *threadState) requestSafeStateChange(nextState stateID) bool { + ts.mu.Lock() + switch ts.currentState { + // disallow state changes if shutting down + case stateShuttingDown, stateDone: + ts.mu.Unlock() + return false + // ready and inactive are safe states to transition from + case stateReady, stateInactive: + ts.currentState = nextState + ts.notifySubscribers(nextState) + ts.mu.Unlock() + return true + } + ts.mu.Unlock() + + // wait for the state to change to a safe state + ts.waitFor(stateReady, stateInactive, stateShuttingDown) + return ts.requestSafeStateChange(nextState) +} diff --git a/state_test.go b/state_test.go new file mode 100644 index 000000000..29a10c348 --- /dev/null +++ b/state_test.go @@ -0,0 +1,56 @@ +package frankenphp + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func Test2GoroutinesYieldToEachOtherViaStates(t *testing.T) { + threadState := &threadState{currentState: stateBooting} + + go func() { + threadState.waitFor(stateInactive) + assert.True(t, threadState.is(stateInactive)) + threadState.set(stateReady) + }() + + threadState.set(stateInactive) + threadState.waitFor(stateReady) + assert.True(t, threadState.is(stateReady)) +} + +func TestStateShouldHaveCorrectAmountOfSubscribers(t *testing.T) { + threadState := &threadState{currentState: stateBooting} + + // 3 subscribers waiting for different states + go threadState.waitFor(stateInactive) + go threadState.waitFor(stateInactive, stateShuttingDown) + go threadState.waitFor(stateShuttingDown) + + assertNumberOfSubscribers(t, threadState, 3) + + threadState.set(stateInactive) + assertNumberOfSubscribers(t, threadState, 1) + + assert.True(t, threadState.compareAndSwap(stateInactive, stateShuttingDown)) + assertNumberOfSubscribers(t, threadState, 0) +} + +func assertNumberOfSubscribers(t *testing.T, threadState *threadState, expected int) { + maxWaits := 10_000 // wait for 1 second max + + for i := 0; i < maxWaits; i++ { + time.Sleep(100 * time.Microsecond) + threadState.mu.RLock() + if len(threadState.subscribers) == expected { + threadState.mu.RUnlock() + break + } + threadState.mu.RUnlock() + } + threadState.mu.RLock() + assert.Len(t, threadState.subscribers, expected) + threadState.mu.RUnlock() +} diff --git a/testdata/fiber-basic.php b/testdata/fiber-basic.php new file mode 100644 index 000000000..bdb52336f --- /dev/null +++ b/testdata/fiber-basic.php @@ -0,0 +1,9 @@ +start(); +}; diff --git a/testdata/transition-regular.php b/testdata/transition-regular.php new file mode 100644 index 000000000..31c7f436c --- /dev/null +++ b/testdata/transition-regular.php @@ -0,0 +1,3 @@ + // #include "frankenphp.h" import "C" import ( "fmt" "github.com/dunglas/frankenphp/internal/fastabs" "net/http" - "path/filepath" "sync" "time" "github.com/dunglas/frankenphp/internal/watcher" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" ) +// represents a worker script and can have many threads assigned to it type worker struct { fileName string num int @@ -23,40 +20,45 @@ type worker struct { requestChan chan *http.Request threads []*phpThread threadMutex sync.RWMutex - ready chan struct{} } var ( + workers map[string]*worker watcherIsEnabled bool - workerShutdownWG sync.WaitGroup - workersDone chan interface{} - workers = make(map[string]*worker) ) func initWorkers(opt []workerOpt) error { - workersDone = make(chan interface{}) - - ready := sync.WaitGroup{} + workers = make(map[string]*worker, len(opt)) + workersReady := sync.WaitGroup{} + directoriesToWatch := getDirectoriesToWatch(opt) + watcherIsEnabled = len(directoriesToWatch) > 0 for _, o := range opt { worker, err := newWorker(o) worker.threads = make([]*phpThread, 0, o.num) + workersReady.Add(o.num) if err != nil { return err } for i := 0; i < worker.num; i++ { - go worker.startNewWorkerThread() + thread := getInactivePHPThread() + convertToWorkerThread(thread, worker) + go func() { + thread.state.waitFor(stateReady) + workersReady.Done() + }() } - ready.Add(1) - go func() { - for i := 0; i < worker.num; i++ { - <-worker.ready - } - ready.Done() - }() } - ready.Wait() + workersReady.Wait() + + if !watcherIsEnabled { + return nil + } + + if err := watcher.InitWatcher(directoriesToWatch, restartWorkers, getLogger()); err != nil { + return err + } return nil } @@ -67,12 +69,6 @@ func newWorker(o workerOpt) (*worker, error) { return nil, fmt.Errorf("worker filename is invalid %q: %w", o.fileName, err) } - // if the worker already exists, return it, - // it's necessary since we don't want to destroy the channels when restarting on file changes - if w, ok := workers[absFileName]; ok { - return w, nil - } - if o.env == nil { o.env = make(PreparedEnv, 1) } @@ -83,247 +79,90 @@ func newWorker(o workerOpt) (*worker, error) { num: o.num, env: o.env, requestChan: make(chan *http.Request), - ready: make(chan struct{}, o.num), } workers[absFileName] = w return w, nil } -func (worker *worker) startNewWorkerThread() { - workerShutdownWG.Add(1) - defer workerShutdownWG.Done() - backoff := &exponentialBackoff{ - maxBackoff: 1 * time.Second, - minBackoff: 100 * time.Millisecond, - maxConsecutiveFailures: 6, - } - - for { - // if the worker can stay up longer than backoff*2, it is probably an application error - backoff.wait() - - metrics.StartWorker(worker.fileName) - - // Create main dummy request - r, err := http.NewRequest(http.MethodGet, filepath.Base(worker.fileName), nil) - if err != nil { - panic(err) - } - - r, err = NewRequestWithContext( - r, - WithRequestDocumentRoot(filepath.Dir(worker.fileName), false), - WithRequestPreparedEnv(worker.env), - ) - if err != nil { - panic(err) - } - - if c := logger.Check(zapcore.DebugLevel, "starting"); c != nil { - c.Write(zap.String("worker", worker.fileName), zap.Int("num", worker.num)) - } - - if err := ServeHTTP(nil, r); err != nil { - panic(err) - } - - fc := r.Context().Value(contextKey).(*FrankenPHPContext) - - // if we are done, exit the loop that restarts the worker script - select { - case _, ok := <-workersDone: - if !ok { - metrics.StopWorker(worker.fileName, StopReasonShutdown) - - if c := logger.Check(zapcore.DebugLevel, "terminated"); c != nil { - c.Write(zap.String("worker", worker.fileName)) - } - - return - } - // continue on since the channel is still open - default: - // continue on since the channel is still open - } - - // on exit status 0 we just run the worker script again - if fc.exitStatus == 0 { - // TODO: make the max restart configurable - if c := logger.Check(zapcore.InfoLevel, "restarting"); c != nil { - c.Write(zap.String("worker", worker.fileName)) - } - metrics.StopWorker(worker.fileName, StopReasonRestart) - backoff.recordSuccess() - continue - } +func drainWorkers() { + watcher.DrainWatcher() +} - // on exit status 1 we log the error and apply an exponential backoff when restarting - if backoff.recordFailure() { - if !watcherIsEnabled { - panic(fmt.Errorf("workers %q: too many consecutive failures", worker.fileName)) +func restartWorkers() { + ready := sync.WaitGroup{} + threadsToRestart := make([]*phpThread, 0) + for _, worker := range workers { + worker.threadMutex.RLock() + ready.Add(len(worker.threads)) + for _, thread := range worker.threads { + if !thread.state.requestSafeStateChange(stateRestarting) { + // no state change allowed = shutdown + continue } - logger.Warn("many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", backoff.failureCount)) + close(thread.drainChan) + threadsToRestart = append(threadsToRestart, thread) + go func(thread *phpThread) { + thread.state.waitFor(stateYielding) + ready.Done() + }(thread) } - metrics.StopWorker(worker.fileName, StopReasonCrash) + worker.threadMutex.RUnlock() } - // unreachable -} + ready.Wait() -func (worker *worker) handleRequest(r *http.Request) { - worker.threadMutex.RLock() - // dispatch requests to all worker threads in order - for _, thread := range worker.threads { - select { - case thread.requestChan <- r: - worker.threadMutex.RUnlock() - return - default: - } + for _, thread := range threadsToRestart { + thread.drainChan = make(chan struct{}) + thread.state.set(stateReady) } - worker.threadMutex.RUnlock() - // if no thread was available, fan the request out to all threads - // TODO: theoretically there could be autoscaling of threads here - worker.requestChan <- r -} - -func stopWorkers() { - close(workersDone) -} - -func drainWorkers() { - watcher.DrainWatcher() - watcherIsEnabled = false - stopWorkers() - workerShutdownWG.Wait() - workers = make(map[string]*worker) } -func restartWorkersOnFileChanges(workerOpts []workerOpt) error { - var directoriesToWatch []string +func getDirectoriesToWatch(workerOpts []workerOpt) []string { + directoriesToWatch := []string{} for _, w := range workerOpts { directoriesToWatch = append(directoriesToWatch, w.watch...) } - watcherIsEnabled = len(directoriesToWatch) > 0 - if !watcherIsEnabled { - return nil - } - restartWorkers := func() { - restartWorkers(workerOpts) - } - if err := watcher.InitWatcher(directoriesToWatch, restartWorkers, getLogger()); err != nil { - return err - } - - return nil -} - -func restartWorkers(workerOpts []workerOpt) { - stopWorkers() - workerShutdownWG.Wait() - if err := initWorkers(workerOpts); err != nil { - logger.Error("failed to restart workers when watching files") - panic(err) - } - logger.Info("workers restarted successfully") + return directoriesToWatch } -func assignThreadToWorker(thread *phpThread) { - fc := thread.mainRequest.Context().Value(contextKey).(*FrankenPHPContext) - worker, ok := workers[fc.scriptFilename] - if !ok { - panic("worker not found for script: " + fc.scriptFilename) - } - thread.worker = worker - thread.requestChan = make(chan *http.Request) +func (worker *worker) attachThread(thread *phpThread) { worker.threadMutex.Lock() worker.threads = append(worker.threads, thread) worker.threadMutex.Unlock() } -//export go_frankenphp_worker_handle_request_start -func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool { - thread := phpThreads[threadIndex] - - // we assign a worker to the thread if it doesn't have one already - if thread.worker == nil { - assignThreadToWorker(thread) - } - thread.readiedOnce.Do(func() { - // inform metrics that the worker is ready - metrics.ReadyWorker(thread.worker.fileName) - }) - - select { - case thread.worker.ready <- struct{}{}: - default: - } - - if c := logger.Check(zapcore.DebugLevel, "waiting for request"); c != nil { - c.Write(zap.String("worker", thread.worker.fileName)) - } - - var r *http.Request - select { - case <-workersDone: - if c := logger.Check(zapcore.DebugLevel, "shutting down"); c != nil { - c.Write(zap.String("worker", thread.worker.fileName)) - } - thread.worker = nil - C.frankenphp_reset_opcache() - - return C.bool(false) - case r = <-thread.worker.requestChan: - case r = <-thread.requestChan: - } - - thread.workerRequest = r - - if c := logger.Check(zapcore.DebugLevel, "request handling started"); c != nil { - c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI)) - } - - if err := updateServerContext(thread, r, false, true); err != nil { - // Unexpected error - if c := logger.Check(zapcore.DebugLevel, "unexpected error"); c != nil { - c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI), zap.Error(err)) +func (worker *worker) detachThread(thread *phpThread) { + worker.threadMutex.Lock() + for i, t := range worker.threads { + if t == thread { + worker.threads = append(worker.threads[:i], worker.threads[i+1:]...) + break } - fc := r.Context().Value(contextKey).(*FrankenPHPContext) - rejectRequest(fc.responseWriter, err.Error()) - maybeCloseContext(fc) - thread.workerRequest = nil - thread.Unpin() - - return go_frankenphp_worker_handle_request_start(threadIndex) } - return C.bool(true) + worker.threadMutex.Unlock() } -//export go_frankenphp_finish_request -func go_frankenphp_finish_request(threadIndex C.uintptr_t, isWorkerRequest bool) { - thread := phpThreads[threadIndex] - r := thread.getActiveRequest() - fc := r.Context().Value(contextKey).(*FrankenPHPContext) - - if isWorkerRequest { - thread.workerRequest = nil - } - - maybeCloseContext(fc) +func (worker *worker) handleRequest(r *http.Request, fc *FrankenPHPContext) { + metrics.StartWorkerRequest(fc.scriptFilename) - if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil { - var fields []zap.Field - if isWorkerRequest { - fields = append(fields, zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI)) - } else { - fields = append(fields, zap.String("url", r.RequestURI)) + // dispatch requests to all worker threads in order + worker.threadMutex.RLock() + for _, thread := range worker.threads { + select { + case thread.requestChan <- r: + worker.threadMutex.RUnlock() + <-fc.done + metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt)) + return + default: } - - c.Write(fields...) } + worker.threadMutex.RUnlock() - if isWorkerRequest { - thread.Unpin() - } + // if no thread was available, fan the request out to all threads + // TODO: theoretically there could be autoscaling of threads here + worker.requestChan <- r + <-fc.done + metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt)) }