Skip to content

Commit

Permalink
Make watcher cancellable and add test
Browse files Browse the repository at this point in the history
  • Loading branch information
J12934 committed Nov 24, 2024
1 parent 39488ee commit a4e0cc2
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 23 deletions.
6 changes: 4 additions & 2 deletions balancer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ func main() {
bundle := bundle.New()
scoringService := scoring.NewScoringService(bundle)

ctx := context.Background()

go StartMetricsServer()
scoringService.CalculateAndCacheScoreBoard(context.Background())
go scoringService.StartingScoringWorker()
scoringService.CalculateAndCacheScoreBoard(ctx)
go scoringService.StartingScoringWorker(ctx)
StartBalancerServer(bundle, scoringService)
}

Expand Down
49 changes: 29 additions & 20 deletions balancer/pkg/scoring/scoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ func (s *ScoringService) GetTopScores() []*TeamScore {
return s.currentScoresSorted
}

// TrackScoresWorker is a worker that runs in the background and cheks the scores of all JuiceShop instances every 5 seconds
func (s *ScoringService) StartingScoringWorker() {
watcher, err := s.bundle.ClientSet.AppsV1().Deployments(s.bundle.RuntimeEnvironment.Namespace).Watch(context.Background(), metav1.ListOptions{
// StartingScoringWorker starts a worker that listens for changes in JuiceShop deployments and updates the scores accordingly
func (s *ScoringService) StartingScoringWorker(ctx context.Context) {
watcher, err := s.bundle.ClientSet.AppsV1().Deployments(s.bundle.RuntimeEnvironment.Namespace).Watch(ctx, metav1.ListOptions{
LabelSelector: "app.kubernetes.io/name=juice-shop,app.kubernetes.io/part-of=multi-juicer",
})

Expand All @@ -78,25 +78,34 @@ func (s *ScoringService) StartingScoringWorker() {
}
defer watcher.Stop()

for event := range watcher.ResultChan() {
switch event.Type {
case watch.Added, watch.Modified:
deployment := event.Object.(*appsv1.Deployment)
score := calculateScore(s.bundle, deployment, cachedChallengesMap)
s.currentScoresMutex.Lock()
s.currentScores[score.Name] = score
s.currentScoresMutex.Unlock()
case watch.Deleted:
deployment := event.Object.(*appsv1.Deployment)
team := deployment.Labels["team"]
s.currentScoresMutex.Lock()
delete(s.currentScores, team)
s.currentScoresMutex.Unlock()
default:
s.bundle.Log.Printf("Unknown event type: %v", event.Type)
for {
select {
case event, ok := <-watcher.ResultChan():
if !ok {
s.bundle.Log.Printf("Watcher for JuiceShop deployments has been closed. Exiting the watcher.")
return
}
switch event.Type {
case watch.Added, watch.Modified:
deployment := event.Object.(*appsv1.Deployment)
score := calculateScore(s.bundle, deployment, cachedChallengesMap)
s.currentScoresMutex.Lock()
s.currentScores[score.Name] = score
s.currentScoresMutex.Unlock()
case watch.Deleted:
deployment := event.Object.(*appsv1.Deployment)
team := deployment.Labels["team"]
s.currentScoresMutex.Lock()
delete(s.currentScores, team)
s.currentScoresMutex.Unlock()
default:
s.bundle.Log.Printf("Unknown event type: %v", event.Type)
}
case <-ctx.Done():
s.bundle.Log.Printf("Context canceled. Exiting the watcher.")
return
}
}
s.bundle.Log.Printf("Watcher for JuiceShop deployments has been closed. Exiting the watcher.")
}

func (s *ScoringService) CalculateAndCacheScoreBoard(context context.Context) error {
Expand Down
27 changes: 26 additions & 1 deletion balancer/pkg/scoring/scoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/fake"
testcore "k8s.io/client-go/testing"
)

func TestScoreingService(t *testing.T) {
Expand Down Expand Up @@ -75,7 +77,6 @@ func TestScoreingService(t *testing.T) {
})

t.Run("teams with the same score get the same position assigned", func(t *testing.T) {

clientset := fake.NewSimpleClientset(
createTeam("foobar", `[{"key":"scoreBoardChallenge","solvedAt":"2024-11-01T19:55:48.211Z"},{"key":"nullByteChallenge","solvedAt":"2024-11-01T19:55:48.211Z"}]`, "2"),
createTeam("barfoo-1", `[{"key":"scoreBoardChallenge","solvedAt":"2024-11-01T19:55:48.211Z"}]`, "1"),
Expand Down Expand Up @@ -172,6 +173,30 @@ func TestScoreingService(t *testing.T) {
},
}, scores)
})

t.Run("watcher properly updates scores", func(t *testing.T) {
clientset := fake.NewClientset(
createTeam("foobar", `[{"key":"scoreBoardChallenge","solvedAt":"2024-11-01T19:55:48.211Z"}]`, "1"),
)
bundle := testutil.NewTestBundleWithCustomFakeClient(clientset)
scoringService := NewScoringService(bundle)

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

err := scoringService.CalculateAndCacheScoreBoard(ctx)
assert.Nil(t, err)
go scoringService.StartingScoringWorker(ctx)
assert.Equal(t, 10, scoringService.GetScores()["foobar"].Score)

watcher := watch.NewFake()
clientset.PrependWatchReactor("deployments", testcore.DefaultWatchReactor(watcher, nil))
watcher.Modify(createTeam("foobar", `[{"key":"scoreBoardChallenge","solvedAt":"2024-11-01T19:55:48.211Z"},{"key":"nullByteChallenge","solvedAt":"2024-11-01T19:55:48.211Z"}]`, "2"))

assert.Eventually(t, func() bool {
return scoringService.GetScores()["foobar"].Score == 50
}, 1*time.Second, 10*time.Millisecond)
})
}

func TestScoreingSorting(t *testing.T) {
Expand Down

0 comments on commit a4e0cc2

Please sign in to comment.