diff --git a/phpmainthread.go b/phpmainthread.go index e9378070a..c0ffb1614 100644 --- a/phpmainthread.go +++ b/phpmainthread.go @@ -4,7 +4,6 @@ package frankenphp import "C" import ( "fmt" - "net/http" "sync" ) @@ -36,12 +35,7 @@ func initPHPThreads(numThreads int) error { // initialize all threads as inactive for i := 0; i < numThreads; i++ { - phpThreads[i] = &phpThread{ - threadIndex: i, - drainChan: make(chan struct{}), - requestChan: make(chan *http.Request), - state: newThreadState(), - } + phpThreads[i] = newPHPThread(i) convertToInactiveThread(phpThreads[i]) } @@ -66,6 +60,7 @@ func drainPHPThreads() { doneWG := sync.WaitGroup{} doneWG.Add(len(phpThreads)) for _, thread := range phpThreads { + thread.mu.Lock() thread.state.set(stateShuttingDown) close(thread.drainChan) } @@ -73,6 +68,7 @@ func drainPHPThreads() { for _, thread := range phpThreads { go func(thread *phpThread) { thread.state.waitFor(stateDone) + thread.mu.Unlock() doneWG.Done() }(thread) } diff --git a/phpmainthread_test.go b/phpmainthread_test.go index f9f46cc15..25e448f2c 100644 --- a/phpmainthread_test.go +++ b/phpmainthread_test.go @@ -1,8 +1,14 @@ package frankenphp import ( + "io" + "math/rand/v2" + "net/http/httptest" "path/filepath" + "sync" + "sync/atomic" "testing" + "time" "github.com/stretchr/testify/assert" "go.uber.org/zap" @@ -20,30 +26,23 @@ func TestStartAndStopTheMainThreadWithOneInactiveThread(t *testing.T) { assert.Nil(t, phpThreads) } -func TestTransition2RegularThreadsToWorkerThreadsAndBack(t *testing.T) { - numThreads := 2 - logger, _ = zap.NewDevelopment() - assert.NoError(t, initPHPThreads(numThreads)) +func TestTransitionRegularThreadToWorkerThread(t *testing.T) { + logger = zap.NewNop() + assert.NoError(t, initPHPThreads(1)) - // transition to worker thread - for i := 0; i < numThreads; i++ { - convertToRegularThread(phpThreads[i]) - assert.IsType(t, ®ularThread{}, phpThreads[i].handler) - } + // transition to regular thread + convertToRegularThread(phpThreads[0]) + assert.IsType(t, ®ularThread{}, phpThreads[0].handler) // transition to worker thread - worker := getDummyWorker() - for i := 0; i < numThreads; i++ { - convertToWorkerThread(phpThreads[i], worker) - assert.IsType(t, &workerThread{}, phpThreads[i].handler) - } - assert.Len(t, worker.threads, numThreads) + worker := getDummyWorker("worker-transition-1.php") + convertToWorkerThread(phpThreads[0], worker) + assert.IsType(t, &workerThread{}, phpThreads[0].handler) + assert.Len(t, worker.threads, 1) // transition back to regular thread - for i := 0; i < numThreads; i++ { - convertToRegularThread(phpThreads[i]) - assert.IsType(t, ®ularThread{}, phpThreads[i].handler) - } + convertToRegularThread(phpThreads[0]) + assert.IsType(t, ®ularThread{}, phpThreads[0].handler) assert.Len(t, worker.threads, 0) drainPHPThreads() @@ -51,26 +50,108 @@ func TestTransition2RegularThreadsToWorkerThreadsAndBack(t *testing.T) { } func TestTransitionAThreadBetween2DifferentWorkers(t *testing.T) { - logger, _ = zap.NewDevelopment() + logger = zap.NewNop() assert.NoError(t, initPHPThreads(1)) + firstWorker := getDummyWorker("worker-transition-1.php") + secondWorker := getDummyWorker("worker-transition-2.php") // convert to first worker thread - firstWorker := getDummyWorker() convertToWorkerThread(phpThreads[0], firstWorker) firstHandler := phpThreads[0].handler.(*workerThread) assert.Same(t, firstWorker, firstHandler.worker) + assert.Len(t, firstWorker.threads, 1) + assert.Len(t, secondWorker.threads, 0) // convert to second worker thread - secondWorker := getDummyWorker() convertToWorkerThread(phpThreads[0], secondWorker) secondHandler := phpThreads[0].handler.(*workerThread) assert.Same(t, secondWorker, secondHandler.worker) + assert.Len(t, firstWorker.threads, 0) + assert.Len(t, secondWorker.threads, 1) drainPHPThreads() assert.Nil(t, phpThreads) } -func getDummyWorker() *worker { - path, _ := filepath.Abs("./testdata/index.php") - return &worker{fileName: path} +func TestTransitionThreadsWhileDoingRequests(t *testing.T) { + numThreads := 10 + numRequestsPerThread := 100 + isRunning := atomic.Bool{} + isRunning.Store(true) + wg := sync.WaitGroup{} + worker1Path, _ := filepath.Abs("./testdata/transition-worker-1.php") + worker2Path, _ := filepath.Abs("./testdata/transition-worker-2.php") + + Init( + WithNumThreads(numThreads), + WithWorkers(worker1Path, 4, map[string]string{"ENV1": "foo"}, []string{}), + WithWorkers(worker2Path, 4, map[string]string{"ENV1": "foo"}, []string{}), + WithLogger(zap.NewNop()), + ) + + // randomly transition threads between regular and 2 worker threads + go func() { + for { + for i := 0; i < numThreads; i++ { + switch rand.IntN(3) { + case 0: + convertToRegularThread(phpThreads[i]) + case 1: + convertToWorkerThread(phpThreads[i], workers[worker1Path]) + case 2: + convertToWorkerThread(phpThreads[i], workers[worker2Path]) + } + time.Sleep(time.Millisecond) + if !isRunning.Load() { + return + } + } + } + }() + + // randomly do requests to the 3 endpoints + wg.Add(numThreads) + for i := 0; i < numThreads; i++ { + go func(i int) { + for j := 0; j < numRequestsPerThread; j++ { + switch rand.IntN(3) { + case 0: + assertRequestBody(t, "http://localhost/transition-worker-1.php", "Hello from worker 1") + case 1: + assertRequestBody(t, "http://localhost/transition-worker-2.php", "Hello from worker 2") + case 2: + assertRequestBody(t, "http://localhost/transition-regular.php", "Hello from regular thread") + } + } + wg.Done() + }(i) + } + + wg.Wait() + isRunning.Store(false) + Shutdown() +} + +func getDummyWorker(fileName string) *worker { + if workers == nil { + workers = make(map[string]*worker) + } + absFileName, _ := filepath.Abs("./testdata/" + fileName) + worker, _ := newWorker(workerOpt{ + fileName: absFileName, + num: 1, + }) + return worker +} + +func assertRequestBody(t *testing.T, url string, expected string) { + r := httptest.NewRequest("GET", url, nil) + w := httptest.NewRecorder() + req, err := NewRequestWithContext(r, WithRequestDocumentRoot("/go/src/app/testdata", false)) + assert.NoError(t, err) + err = ServeHTTP(w, req) + assert.NoError(t, err) + resp := w.Result() + body, _ := io.ReadAll(resp.Body) + assert.Equal(t, expected, string(body)) } diff --git a/phpthread.go b/phpthread.go index 5ee1ff34a..55e96a6de 100644 --- a/phpthread.go +++ b/phpthread.go @@ -5,9 +5,8 @@ import "C" import ( "net/http" "runtime" + "sync" "unsafe" - - "go.uber.org/zap" ) // representation of the actual underlying PHP thread @@ -21,6 +20,7 @@ type phpThread struct { drainChan chan struct{} handler threadHandler state *threadState + mu *sync.Mutex } // interface that defines how the callbacks from the C thread should be handled @@ -30,16 +30,30 @@ type threadHandler interface { getActiveRequest() *http.Request } +func newPHPThread(threadIndex int) *phpThread { + return &phpThread{ + threadIndex: threadIndex, + drainChan: make(chan struct{}), + requestChan: make(chan *http.Request), + mu: &sync.Mutex{}, + state: newThreadState(), + } +} + func (thread *phpThread) getActiveRequest() *http.Request { return thread.handler.getActiveRequest() } // change the thread handler safely func (thread *phpThread) setHandler(handler threadHandler) { - logger.Debug("transitioning thread", zap.Int("threadIndex", thread.threadIndex)) + thread.mu.Lock() + defer thread.mu.Unlock() + if thread.state.is(stateShuttingDown) { + return + } thread.state.set(stateTransitionRequested) close(thread.drainChan) - thread.state.waitFor(stateTransitionInProgress) + thread.state.waitFor(stateTransitionInProgress, stateShuttingDown) thread.handler = handler thread.drainChan = make(chan struct{}) thread.state.set(stateTransitionComplete) diff --git a/testdata/sleep.php b/testdata/sleep.php deleted file mode 100644 index d2c78b865..000000000 --- a/testdata/sleep.php +++ /dev/null @@ -1,4 +0,0 @@ -