Skip to content

Commit

Permalink
Adds support for thread transitioning.
Browse files Browse the repository at this point in the history
  • Loading branch information
Alliballibaba2 committed Dec 7, 2024
1 parent ec8aeb7 commit b598bd3
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 40 deletions.
10 changes: 3 additions & 7 deletions phpmainthread.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package frankenphp
import "C"
import (
"fmt"
"net/http"
"sync"
)

Expand Down Expand Up @@ -36,12 +35,7 @@ func initPHPThreads(numThreads int) error {

// initialize all threads as inactive
for i := 0; i < numThreads; i++ {
phpThreads[i] = &phpThread{
threadIndex: i,
drainChan: make(chan struct{}),
requestChan: make(chan *http.Request),
state: newThreadState(),
}
phpThreads[i] = newPHPThread(i)
convertToInactiveThread(phpThreads[i])
}

Expand All @@ -66,13 +60,15 @@ func drainPHPThreads() {
doneWG := sync.WaitGroup{}
doneWG.Add(len(phpThreads))
for _, thread := range phpThreads {
thread.mu.Lock()
thread.state.set(stateShuttingDown)
close(thread.drainChan)
}
close(mainThread.done)
for _, thread := range phpThreads {
go func(thread *phpThread) {
thread.state.waitFor(stateDone)
thread.mu.Unlock()
doneWG.Done()
}(thread)
}
Expand Down
131 changes: 106 additions & 25 deletions phpmainthread_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
package frankenphp

import (
"io"
"math/rand/v2"
"net/http/httptest"
"path/filepath"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"
Expand All @@ -20,57 +26,132 @@ func TestStartAndStopTheMainThreadWithOneInactiveThread(t *testing.T) {
assert.Nil(t, phpThreads)
}

func TestTransition2RegularThreadsToWorkerThreadsAndBack(t *testing.T) {
numThreads := 2
logger, _ = zap.NewDevelopment()
assert.NoError(t, initPHPThreads(numThreads))
func TestTransitionRegularThreadToWorkerThread(t *testing.T) {
logger = zap.NewNop()
assert.NoError(t, initPHPThreads(1))

// transition to worker thread
for i := 0; i < numThreads; i++ {
convertToRegularThread(phpThreads[i])
assert.IsType(t, &regularThread{}, phpThreads[i].handler)
}
// transition to regular thread
convertToRegularThread(phpThreads[0])
assert.IsType(t, &regularThread{}, phpThreads[0].handler)

// transition to worker thread
worker := getDummyWorker()
for i := 0; i < numThreads; i++ {
convertToWorkerThread(phpThreads[i], worker)
assert.IsType(t, &workerThread{}, phpThreads[i].handler)
}
assert.Len(t, worker.threads, numThreads)
worker := getDummyWorker("worker-transition-1.php")
convertToWorkerThread(phpThreads[0], worker)
assert.IsType(t, &workerThread{}, phpThreads[0].handler)
assert.Len(t, worker.threads, 1)

// transition back to regular thread
for i := 0; i < numThreads; i++ {
convertToRegularThread(phpThreads[i])
assert.IsType(t, &regularThread{}, phpThreads[i].handler)
}
convertToRegularThread(phpThreads[0])
assert.IsType(t, &regularThread{}, phpThreads[0].handler)
assert.Len(t, worker.threads, 0)

drainPHPThreads()
assert.Nil(t, phpThreads)
}

func TestTransitionAThreadBetween2DifferentWorkers(t *testing.T) {
logger, _ = zap.NewDevelopment()
logger = zap.NewNop()
assert.NoError(t, initPHPThreads(1))
firstWorker := getDummyWorker("worker-transition-1.php")
secondWorker := getDummyWorker("worker-transition-2.php")

// convert to first worker thread
firstWorker := getDummyWorker()
convertToWorkerThread(phpThreads[0], firstWorker)
firstHandler := phpThreads[0].handler.(*workerThread)
assert.Same(t, firstWorker, firstHandler.worker)
assert.Len(t, firstWorker.threads, 1)
assert.Len(t, secondWorker.threads, 0)

// convert to second worker thread
secondWorker := getDummyWorker()
convertToWorkerThread(phpThreads[0], secondWorker)
secondHandler := phpThreads[0].handler.(*workerThread)
assert.Same(t, secondWorker, secondHandler.worker)
assert.Len(t, firstWorker.threads, 0)
assert.Len(t, secondWorker.threads, 1)

drainPHPThreads()
assert.Nil(t, phpThreads)
}

func getDummyWorker() *worker {
path, _ := filepath.Abs("./testdata/index.php")
return &worker{fileName: path}
func TestTransitionThreadsWhileDoingRequests(t *testing.T) {
numThreads := 10
numRequestsPerThread := 100
isRunning := atomic.Bool{}
isRunning.Store(true)
wg := sync.WaitGroup{}
worker1Path, _ := filepath.Abs("./testdata/transition-worker-1.php")
worker2Path, _ := filepath.Abs("./testdata/transition-worker-2.php")

Init(
WithNumThreads(numThreads),
WithWorkers(worker1Path, 4, map[string]string{"ENV1": "foo"}, []string{}),
WithWorkers(worker2Path, 4, map[string]string{"ENV1": "foo"}, []string{}),
WithLogger(zap.NewNop()),
)

// randomly transition threads between regular and 2 worker threads
go func() {
for {
for i := 0; i < numThreads; i++ {
switch rand.IntN(3) {
case 0:
convertToRegularThread(phpThreads[i])
case 1:
convertToWorkerThread(phpThreads[i], workers[worker1Path])
case 2:
convertToWorkerThread(phpThreads[i], workers[worker2Path])
}
time.Sleep(time.Millisecond)
if !isRunning.Load() {
return
}
}
}
}()

// randomly do requests to the 3 endpoints
wg.Add(numThreads)
for i := 0; i < numThreads; i++ {
go func(i int) {
for j := 0; j < numRequestsPerThread; j++ {
switch rand.IntN(3) {
case 0:
assertRequestBody(t, "http://localhost/transition-worker-1.php", "Hello from worker 1")
case 1:
assertRequestBody(t, "http://localhost/transition-worker-2.php", "Hello from worker 2")
case 2:
assertRequestBody(t, "http://localhost/transition-regular.php", "Hello from regular thread")
}
}
wg.Done()
}(i)
}

wg.Wait()
isRunning.Store(false)
Shutdown()
}

func getDummyWorker(fileName string) *worker {
if workers == nil {
workers = make(map[string]*worker)
}
absFileName, _ := filepath.Abs("./testdata/" + fileName)
worker, _ := newWorker(workerOpt{
fileName: absFileName,
num: 1,
})
return worker
}

func assertRequestBody(t *testing.T, url string, expected string) {
r := httptest.NewRequest("GET", url, nil)
w := httptest.NewRecorder()
req, err := NewRequestWithContext(r, WithRequestDocumentRoot("/go/src/app/testdata", false))
assert.NoError(t, err)
err = ServeHTTP(w, req)
assert.NoError(t, err)
resp := w.Result()
body, _ := io.ReadAll(resp.Body)
assert.Equal(t, expected, string(body))
}
22 changes: 18 additions & 4 deletions phpthread.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import "C"
import (
"net/http"
"runtime"
"sync"
"unsafe"

"go.uber.org/zap"
)

// representation of the actual underlying PHP thread
Expand All @@ -21,6 +20,7 @@ type phpThread struct {
drainChan chan struct{}
handler threadHandler
state *threadState
mu *sync.Mutex
}

// interface that defines how the callbacks from the C thread should be handled
Expand All @@ -30,16 +30,30 @@ type threadHandler interface {
getActiveRequest() *http.Request
}

func newPHPThread(threadIndex int) *phpThread {
return &phpThread{
threadIndex: threadIndex,
drainChan: make(chan struct{}),
requestChan: make(chan *http.Request),
mu: &sync.Mutex{},
state: newThreadState(),
}
}

func (thread *phpThread) getActiveRequest() *http.Request {
return thread.handler.getActiveRequest()
}

// change the thread handler safely
func (thread *phpThread) setHandler(handler threadHandler) {
logger.Debug("transitioning thread", zap.Int("threadIndex", thread.threadIndex))
thread.mu.Lock()
defer thread.mu.Unlock()
if thread.state.is(stateShuttingDown) {
return
}
thread.state.set(stateTransitionRequested)
close(thread.drainChan)
thread.state.waitFor(stateTransitionInProgress)
thread.state.waitFor(stateTransitionInProgress, stateShuttingDown)
thread.handler = handler
thread.drainChan = make(chan struct{})
thread.state.set(stateTransitionComplete)
Expand Down
4 changes: 0 additions & 4 deletions testdata/sleep.php

This file was deleted.

3 changes: 3 additions & 0 deletions testdata/transition-regular.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<?php

echo "Hello from regular thread";
7 changes: 7 additions & 0 deletions testdata/transition-worker-1.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php

while (frankenphp_handle_request(function () {
echo "Hello from worker 1";
})) {

}
8 changes: 8 additions & 0 deletions testdata/transition-worker-2.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?php

while (frankenphp_handle_request(function () {
echo "Hello from worker 2";
usleep(1000);
})) {

}

0 comments on commit b598bd3

Please sign in to comment.