Skip to content

Commit

Permalink
worker/server: add tests for job heartbeats
Browse files Browse the repository at this point in the history
  • Loading branch information
croissanne committed Nov 7, 2024
1 parent 14bd8d3 commit 2eb3c9f
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 4 deletions.
17 changes: 13 additions & 4 deletions internal/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type Config struct {
BasePath string
JWTEnabled bool
TenantProviderFields []string
JobTimeout time.Duration
JobWatchFreq time.Duration
WorkerTimeout time.Duration
WorkerWatchFreq time.Duration
}
Expand All @@ -83,6 +85,12 @@ func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, config Config) *Serve
config: config,
}

if s.config.JobTimeout == 0 {
s.config.JobTimeout = time.Second * 120
}
if s.config.JobWatchFreq == 0 {
s.config.JobWatchFreq = time.Second * 30
}
if s.config.WorkerTimeout == 0 {
s.config.WorkerTimeout = time.Hour
}
Expand Down Expand Up @@ -125,12 +133,13 @@ func (s *Server) Handler() http.Handler {
const maxHeartbeatRetries = 2

// This function should be started as a goroutine
// Every 30 seconds it goes through all running jobs, removing any unresponsive ones.
// It fails jobs which fail to check if they cancelled for more than 2 minutes.

// With default durations it goes through all running jobs every 30 seconds and fails any unresponsive
// ones. Unresponsive jobs haven't checked whether or not they're cancelled in the past 2 minutes.
func (s *Server) WatchHeartbeats() {
//nolint:staticcheck // avoid SA1015, this is an endless function
for range time.Tick(time.Second * 30) {
for _, token := range s.jobs.Heartbeats(time.Second * 120) {
for range time.Tick(s.config.JobWatchFreq) {
for _, token := range s.jobs.Heartbeats(s.config.JobTimeout) {
id, _ := s.jobs.IdFromToken(token)
logrus.Infof("Removing unresponsive job: %s\n", id)

Expand Down
65 changes: 65 additions & 0 deletions internal/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1573,3 +1573,68 @@ func TestRequestJobForWorker(t *testing.T) {
require.NotNil(t, args)
require.Nil(t, dynamicArgs)
}

func TestJobHeartbeats(t *testing.T) {
config := defaultConfig
config.JobTimeout = time.Millisecond * 1
config.JobWatchFreq = time.Millisecond * 100
server := newTestServer(t, t.TempDir(), config, false)

distroStruct := newTestDistro(t)
arch, err := distroStruct.GetArch(test_distro.TestArchName)
if err != nil {
t.Fatalf("error getting arch from distro: %v", err)
}
imageType, err := arch.GetImageType(test_distro.TestImageTypeName)
if err != nil {
t.Fatalf("error getting image type from arch: %v", err)
}
manifest, _, err := imageType.Manifest(nil, distro.ImageOptions{Size: imageType.Size(0)}, nil, 0)
if err != nil {
t.Fatalf("error creating osbuild manifest: %v", err)
}
mf, err := manifest.Serialize(nil, nil, nil, nil)
if err != nil {
t.Fatalf("error creating osbuild manifest: %v", err)
}
jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: mf}, "")
require.NoError(t, err)
require.Equal(t, float64(1), promtest.ToFloat64(prometheus.PendingJobs))

// Can request a job with worker ID
j, _, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil)
require.NoError(t, err)
require.Equal(t, jobId, j)
require.Equal(t, worker.JobTypeOSBuild, typ)
require.NotNil(t, args)
require.Nil(t, dynamicArgs)
require.Equal(t, float64(0), promtest.ToFloat64(prometheus.PendingJobs))
require.Equal(t, float64(1), promtest.ToFloat64(prometheus.RunningJobs))

var jobInfo *worker.JobInfo
var jobRes worker.OSBuildJobResult
retries := 0
for i := 0; i < 3 && retries < 3; i++ {
//wait until job is completely failed
jobInfo, err = server.OSBuildJobInfo(j, &jobRes)
require.NoError(t, err)
if jobInfo.JobStatus.Started.IsZero() {
require.Equal(t, float64(1), promtest.ToFloat64(prometheus.PendingJobs))
require.Equal(t, float64(0), promtest.ToFloat64(prometheus.RunningJobs))
j, _, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil)
require.NoError(t, err)
require.Equal(t, jobId, j)
require.Equal(t, worker.JobTypeOSBuild, typ)
require.NotNil(t, args)
require.Nil(t, dynamicArgs)
retries += 1
}
time.Sleep(time.Millisecond * 200)
}
_, err = server.OSBuildJobInfo(j, &jobRes)
require.NoError(t, err)
require.NotNil(t, jobRes.JobError)
require.Equal(t, clienterrors.ErrorJobMissingHeartbeat, jobRes.JobError.ID)
require.Equal(t, float64(0), promtest.ToFloat64(prometheus.PendingJobs))
require.Equal(t, float64(0), promtest.ToFloat64(prometheus.RunningJobs))
}

0 comments on commit 2eb3c9f

Please sign in to comment.