diff --git a/internal/scheduler/api.go b/internal/scheduler/api.go index 488439daddd..983d52d4161 100644 --- a/internal/scheduler/api.go +++ b/internal/scheduler/api.go @@ -7,7 +7,6 @@ import ( "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" - "github.com/google/uuid" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" @@ -126,8 +125,8 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns if err := stream.Send(&executorapi.LeaseStreamMessage{ Event: &executorapi.LeaseStreamMessage_CancelRuns{ CancelRuns: &executorapi.CancelRuns{ - JobRunIdsToCancel: slices.Map(runsToCancel, func(x uuid.UUID) *armadaevents.Uuid { - return armadaevents.ProtoUuidFromUuid(x) + JobRunIdsToCancel: slices.Map(runsToCancel, func(x string) *armadaevents.Uuid { + return armadaevents.MustProtoUuidFromUuidString(x) }), }, }, @@ -393,19 +392,15 @@ func (srv *ExecutorApi) executorFromLeaseRequest(ctx *armadacontext.Context, req } // runIdsFromLeaseRequest returns the ids of all runs in a lease request, including any not yet assigned to a node. -func runIdsFromLeaseRequest(req *executorapi.LeaseRequest) ([]uuid.UUID, error) { - runIds := make([]uuid.UUID, 0, 256) +func runIdsFromLeaseRequest(req *executorapi.LeaseRequest) ([]string, error) { + runIds := make([]string, 0, 256) for _, node := range req.Nodes { - for runIdStr := range node.RunIdsByState { - if runId, err := uuid.Parse(runIdStr); err != nil { - return nil, errors.WithStack(err) - } else { - runIds = append(runIds, runId) - } + for runId := range node.RunIdsByState { + runIds = append(runIds, runId) } } for _, runId := range req.UnassignedJobRunIds { - runIds = append(runIds, armadaevents.UuidFromProtoUuid(runId)) + runIds = append(runIds, armadaevents.UuidFromProtoUuid(runId).String()) } return runIds, nil } diff --git a/internal/scheduler/api_test.go b/internal/scheduler/api_test.go index 592beee9d5f..04dc7c04141 100644 --- a/internal/scheduler/api_test.go +++ b/internal/scheduler/api_test.go @@ -49,9 +49,9 @@ var priorityClasses = map[string]types.PriorityClass{ func TestExecutorApi_LeaseJobRuns(t *testing.T) { const maxJobsPerCall = uint(100) testClock := clock.NewFakeClock(time.Now()) - runId1 := uuid.New() - runId2 := uuid.New() - runId3 := uuid.New() + runId1 := uuid.NewString() + runId2 := uuid.NewString() + runId3 := uuid.NewString() groups, compressedGroups := groups(t) defaultRequest := &executorapi.LeaseRequest{ ExecutorId: "test-executor", @@ -60,13 +60,13 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) { { Name: "test-node", RunIdsByState: map[string]api.JobState{ - runId1.String(): api.JobState_RUNNING, - runId2.String(): api.JobState_RUNNING, + runId1: api.JobState_RUNNING, + runId2: api.JobState_RUNNING, }, NodeType: "node-type-1", }, }, - UnassignedJobRunIds: []*armadaevents.Uuid{armadaevents.ProtoUuidFromUuid(runId3)}, + UnassignedJobRunIds: []*armadaevents.Uuid{armadaevents.MustProtoUuidFromUuidString(runId3)}, MaxJobsToLease: uint32(maxJobsPerCall), } defaultExpectedExecutor := &schedulerobjects.Executor{ @@ -78,7 +78,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) { Name: "test-node", Executor: "test-executor", TotalResources: schedulerobjects.NewResourceList(0), - StateByJobRunId: map[string]schedulerobjects.JobRunState{runId1.String(): schedulerobjects.JobRunState_RUNNING, runId2.String(): schedulerobjects.JobRunState_RUNNING}, + StateByJobRunId: map[string]schedulerobjects.JobRunState{runId1: schedulerobjects.JobRunState_RUNNING, runId2: schedulerobjects.JobRunState_RUNNING}, NonArmadaAllocatedResources: map[int32]schedulerobjects.ResourceList{}, AllocatableByPriorityAndResource: map[int32]schedulerobjects.ResourceList{ 1000: { @@ -94,7 +94,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) { }, }, LastUpdateTime: testClock.Now().UTC(), - UnassignedJobRuns: []string{runId3.String()}, + UnassignedJobRuns: []string{runId3}, } submit, compressedSubmit := submitMsg( @@ -190,20 +190,20 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) { tests := map[string]struct { request *executorapi.LeaseRequest - runsToCancel []uuid.UUID + runsToCancel []string leases []*database.JobRunLease expectedExecutor *schedulerobjects.Executor expectedMsgs []*executorapi.LeaseStreamMessage }{ "lease and cancel": { request: defaultRequest, - runsToCancel: []uuid.UUID{runId2}, + runsToCancel: []string{runId2}, leases: []*database.JobRunLease{defaultLease}, expectedExecutor: defaultExpectedExecutor, expectedMsgs: []*executorapi.LeaseStreamMessage{ { Event: &executorapi.LeaseStreamMessage_CancelRuns{CancelRuns: &executorapi.CancelRuns{ - JobRunIdsToCancel: []*armadaevents.Uuid{armadaevents.ProtoUuidFromUuid(runId2)}, + JobRunIdsToCancel: []*armadaevents.Uuid{armadaevents.MustProtoUuidFromUuidString(runId2)}, }}, }, { @@ -311,7 +311,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) { assert.Equal(t, tc.expectedExecutor, executor) return nil }).Times(1) - mockJobRepository.EXPECT().FindInactiveRuns(gomock.Any(), schedulermocks.SliceMatcher[uuid.UUID]{Expected: runIds}).Return(tc.runsToCancel, nil).Times(1) + mockJobRepository.EXPECT().FindInactiveRuns(gomock.Any(), schedulermocks.SliceMatcher{Expected: runIds}).Return(tc.runsToCancel, nil).Times(1) mockJobRepository.EXPECT().FetchJobRunLeases(gomock.Any(), tc.request.ExecutorId, maxJobsPerCall, runIds).Return(tc.leases, nil).Times(1) mockAuthorizer.EXPECT().AuthorizeAction(gomock.Any(), permission.Permission(permissions.ExecuteJobs)).Return(nil).Times(1) diff --git a/internal/scheduler/database/job_repository.go b/internal/scheduler/database/job_repository.go index 4cc77ed88d6..7d8daab315c 100644 --- a/internal/scheduler/database/job_repository.go +++ b/internal/scheduler/database/job_repository.go @@ -41,7 +41,7 @@ type JobRepository interface { // FetchJobRunErrors returns all armadaevents.JobRunErrors for the provided job run ids. The returned map is // keyed by job run id. Any dbRuns which don't have errors wil be absent from the map. - FetchJobRunErrors(ctx *armadacontext.Context, runIds []uuid.UUID) (map[uuid.UUID]*armadaevents.Error, error) + FetchJobRunErrors(ctx *armadacontext.Context, runIds []string) (map[string]*armadaevents.Error, error) // CountReceivedPartitions returns a count of the number of partition messages present in the database corresponding // to the provided groupId. This is used by the scheduler to determine if the database represents the state of @@ -50,11 +50,11 @@ type JobRepository interface { // FindInactiveRuns returns a slice containing all dbRuns that the scheduler does not currently consider active // Runs are inactive if they don't exist or if they have succeeded, failed or been cancelled - FindInactiveRuns(ctx *armadacontext.Context, runIds []uuid.UUID) ([]uuid.UUID, error) + FindInactiveRuns(ctx *armadacontext.Context, runIds []string) ([]string, error) // FetchJobRunLeases fetches new job runs for a given executor. A maximum of maxResults rows will be returned, while run // in excludedRunIds will be excluded - FetchJobRunLeases(ctx *armadacontext.Context, executor string, maxResults uint, excludedRunIds []uuid.UUID) ([]*JobRunLease, error) + FetchJobRunLeases(ctx *armadacontext.Context, executor string, maxResults uint, excludedRunIds []string) ([]*JobRunLease, error) } // PostgresJobRepository is an implementation of JobRepository that stores its state in postgres @@ -74,14 +74,14 @@ func NewPostgresJobRepository(db *pgxpool.Pool, batchSize int32) *PostgresJobRep // FetchJobRunErrors returns all armadaevents.JobRunErrors for the provided job run ids. The returned map is // keyed by job run id. Any dbRuns which don't have errors wil be absent from the map. -func (r *PostgresJobRepository) FetchJobRunErrors(ctx *armadacontext.Context, runIds []uuid.UUID) (map[uuid.UUID]*armadaevents.Error, error) { +func (r *PostgresJobRepository) FetchJobRunErrors(ctx *armadacontext.Context, runIds []string) (map[string]*armadaevents.Error, error) { if len(runIds) == 0 { - return map[uuid.UUID]*armadaevents.Error{}, nil + return map[string]*armadaevents.Error{}, nil } chunks := armadaslices.PartitionToMaxLen(runIds, int(r.batchSize)) - errorsByRunId := make(map[uuid.UUID]*armadaevents.Error, len(runIds)) + errorsByRunId := make(map[string]*armadaevents.Error, len(runIds)) decompressor := compress.NewZlibDecompressor() err := pgx.BeginTxFunc(ctx, r.db, pgx.TxOptions{ @@ -116,7 +116,7 @@ func (r *PostgresJobRepository) FetchJobRunErrors(ctx *armadacontext.Context, ru if err != nil { return errors.WithStack(err) } - errorsByRunId[runId] = jobError + errorsByRunId[runId.String()] = jobError } } return nil @@ -192,8 +192,8 @@ func (r *PostgresJobRepository) FetchJobUpdates(ctx *armadacontext.Context, jobS // FindInactiveRuns returns a slice containing all dbRuns that the scheduler does not currently consider active // Runs are inactive if they don't exist or if they have succeeded, failed or been cancelled -func (r *PostgresJobRepository) FindInactiveRuns(ctx *armadacontext.Context, runIds []uuid.UUID) ([]uuid.UUID, error) { - var inactiveRuns []uuid.UUID +func (r *PostgresJobRepository) FindInactiveRuns(ctx *armadacontext.Context, runIds []string) ([]string, error) { + var inactiveRuns []string err := pgx.BeginTxFunc(ctx, r.db, pgx.TxOptions{ IsoLevel: pgx.ReadCommitted, AccessMode: pgx.ReadWrite, @@ -224,7 +224,7 @@ func (r *PostgresJobRepository) FindInactiveRuns(ctx *armadacontext.Context, run if err != nil { return errors.WithStack(err) } - inactiveRuns = append(inactiveRuns, runId) + inactiveRuns = append(inactiveRuns, runId.String()) } return nil }) @@ -233,7 +233,7 @@ func (r *PostgresJobRepository) FindInactiveRuns(ctx *armadacontext.Context, run // FetchJobRunLeases fetches new job runs for a given executor. A maximum of maxResults rows will be returned, while run // in excludedRunIds will be excluded -func (r *PostgresJobRepository) FetchJobRunLeases(ctx *armadacontext.Context, executor string, maxResults uint, excludedRunIds []uuid.UUID) ([]*JobRunLease, error) { +func (r *PostgresJobRepository) FetchJobRunLeases(ctx *armadacontext.Context, executor string, maxResults uint, excludedRunIds []string) ([]*JobRunLease, error) { if maxResults == 0 { return []*JobRunLease{}, nil } @@ -312,7 +312,7 @@ func fetch[T hasSerial](from int64, batchSize int32, fetchBatch func(int64) ([]T } // Insert all run ids into a tmp table. The name of the table is returned -func insertRunIdsToTmpTable(ctx *armadacontext.Context, tx pgx.Tx, runIds []uuid.UUID) (string, error) { +func insertRunIdsToTmpTable(ctx *armadacontext.Context, tx pgx.Tx, runIds []string) (string, error) { tmpTable := database.UniqueTableName("job_runs") _, err := tx.Exec(ctx, fmt.Sprintf("CREATE TEMPORARY TABLE %s (run_id uuid) ON COMMIT DROP", tmpTable)) diff --git a/internal/scheduler/database/job_repository_test.go b/internal/scheduler/database/job_repository_test.go index 519371258f3..3de0ab76c4c 100644 --- a/internal/scheduler/database/job_repository_test.go +++ b/internal/scheduler/database/job_repository_test.go @@ -139,42 +139,42 @@ func TestFetchJobRunErrors(t *testing.T) { tests := map[string]struct { errorsInDb []JobRunError - idsToLookup []uuid.UUID - expected map[uuid.UUID]*armadaevents.Error + idsToLookup []string + expected map[string]*armadaevents.Error expectError bool }{ "single error": { errorsInDb: dbErrors, - idsToLookup: []uuid.UUID{dbErrors[1].RunID}, - expected: map[uuid.UUID]*armadaevents.Error{dbErrors[1].RunID: expectedErrors[1]}, + idsToLookup: []string{dbErrors[1].RunID.String()}, + expected: map[string]*armadaevents.Error{dbErrors[1].RunID.String(): expectedErrors[1]}, }, "multiple errors": { errorsInDb: dbErrors, - idsToLookup: []uuid.UUID{dbErrors[1].RunID, dbErrors[4].RunID, dbErrors[5].RunID, dbErrors[7].RunID}, - expected: map[uuid.UUID]*armadaevents.Error{ - dbErrors[1].RunID: expectedErrors[1], - dbErrors[4].RunID: expectedErrors[4], - dbErrors[5].RunID: expectedErrors[5], - dbErrors[7].RunID: expectedErrors[7], + idsToLookup: []string{dbErrors[1].RunID.String(), dbErrors[4].RunID.String(), dbErrors[5].RunID.String(), dbErrors[7].RunID.String()}, + expected: map[string]*armadaevents.Error{ + dbErrors[1].RunID.String(): expectedErrors[1], + dbErrors[4].RunID.String(): expectedErrors[4], + dbErrors[5].RunID.String(): expectedErrors[5], + dbErrors[7].RunID.String(): expectedErrors[7], }, }, "some errors missing": { errorsInDb: dbErrors, - idsToLookup: []uuid.UUID{dbErrors[1].RunID, uuid.New(), uuid.New(), dbErrors[7].RunID}, - expected: map[uuid.UUID]*armadaevents.Error{ - dbErrors[1].RunID: expectedErrors[1], - dbErrors[7].RunID: expectedErrors[7], + idsToLookup: []string{dbErrors[1].RunID.String(), uuid.NewString(), uuid.NewString(), dbErrors[7].RunID.String()}, + expected: map[string]*armadaevents.Error{ + dbErrors[1].RunID.String(): expectedErrors[1], + dbErrors[7].RunID.String(): expectedErrors[7], }, }, "all errors missing": { errorsInDb: dbErrors, - idsToLookup: []uuid.UUID{uuid.New(), uuid.New(), uuid.New(), uuid.New()}, - expected: map[uuid.UUID]*armadaevents.Error{}, + idsToLookup: []string{uuid.NewString(), uuid.NewString(), uuid.NewString(), uuid.NewString()}, + expected: map[string]*armadaevents.Error{}, }, "emptyDb": { errorsInDb: []JobRunError{}, - idsToLookup: []uuid.UUID{uuid.New(), uuid.New(), uuid.New(), uuid.New()}, - expected: map[uuid.UUID]*armadaevents.Error{}, + idsToLookup: []string{uuid.NewString(), uuid.NewString(), uuid.NewString(), uuid.NewString()}, + expected: map[string]*armadaevents.Error{}, }, "invalid data": { errorsInDb: []JobRunError{{ @@ -182,7 +182,7 @@ func TestFetchJobRunErrors(t *testing.T) { JobID: dbErrors[0].JobID, Error: []byte{0x1, 0x4, 0x5}, // not a valid compressed proto }}, - idsToLookup: []uuid.UUID{dbErrors[0].RunID}, + idsToLookup: []string{dbErrors[0].RunID.String()}, expectError: true, }, } @@ -298,21 +298,24 @@ func createTestJobs(numJobs int) ([]Job, []Job) { } func TestFindInactiveRuns(t *testing.T) { + stringIds := make([]string, 3) uuids := make([]uuid.UUID, 3) - for i := 0; i < len(uuids); i++ { + for i := 0; i < len(stringIds); i++ { uuids[i] = uuid.New() + stringIds[i] = uuids[i].String() } + tests := map[string]struct { dbRuns []Run - runsToCheck []uuid.UUID - expectedInactive []uuid.UUID + runsToCheck []string + expectedInactive []string }{ "empty database": { - runsToCheck: uuids, - expectedInactive: uuids, + runsToCheck: stringIds, + expectedInactive: stringIds, }, "no inactive": { - runsToCheck: uuids, + runsToCheck: stringIds, dbRuns: []Run{ {RunID: uuids[0]}, {RunID: uuids[1]}, @@ -321,39 +324,39 @@ func TestFindInactiveRuns(t *testing.T) { expectedInactive: nil, }, "run succeeded": { - runsToCheck: uuids, + runsToCheck: stringIds, dbRuns: []Run{ {RunID: uuids[0]}, {RunID: uuids[1], Succeeded: true}, {RunID: uuids[2]}, }, - expectedInactive: []uuid.UUID{uuids[1]}, + expectedInactive: []string{stringIds[1]}, }, "run failed": { - runsToCheck: uuids, + runsToCheck: stringIds, dbRuns: []Run{ {RunID: uuids[0]}, {RunID: uuids[1], Failed: true}, {RunID: uuids[2]}, }, - expectedInactive: []uuid.UUID{uuids[1]}, + expectedInactive: []string{stringIds[1]}, }, "run cancelled": { - runsToCheck: uuids, + runsToCheck: stringIds, dbRuns: []Run{ {RunID: uuids[0]}, {RunID: uuids[1], Cancelled: true}, {RunID: uuids[2]}, }, - expectedInactive: []uuid.UUID{uuids[1]}, + expectedInactive: []string{stringIds[1]}, }, "run missing": { - runsToCheck: uuids, + runsToCheck: stringIds, dbRuns: []Run{ {RunID: uuids[0]}, {RunID: uuids[2]}, }, - expectedInactive: []uuid.UUID{uuids[1]}, + expectedInactive: []string{stringIds[1]}, }, } for name, tc := range tests { @@ -367,17 +370,8 @@ func TestFindInactiveRuns(t *testing.T) { inactive, err := repo.FindInactiveRuns(ctx, tc.runsToCheck) require.NoError(t, err) - uuidSort := func(a uuid.UUID, b uuid.UUID) int { - if a.String() > b.String() { - return -1 - } else if a.String() < b.String() { - return 1 - } else { - return 0 - } - } - slices.SortFunc(inactive, uuidSort) - slices.SortFunc(tc.expectedInactive, uuidSort) + slices.Sort(inactive) + slices.Sort(tc.expectedInactive) assert.Equal(t, tc.expectedInactive, inactive) cancel() return nil @@ -466,7 +460,7 @@ func TestFetchJobRunLeases(t *testing.T) { tests := map[string]struct { dbRuns []Run dbJobs []Job - excludedRuns []uuid.UUID + excludedRuns []string maxRowsToFetch uint executor string expectedLeases []*JobRunLease @@ -490,7 +484,7 @@ func TestFetchJobRunLeases(t *testing.T) { "exclude one run": { dbJobs: dbJobs, dbRuns: dbRuns, - excludedRuns: []uuid.UUID{dbRuns[1].RunID}, + excludedRuns: []string{dbRuns[1].RunID.String()}, maxRowsToFetch: 100, executor: executorName, expectedLeases: []*JobRunLease{expectedLeases[0], expectedLeases[2], expectedLeases[3]}, @@ -498,7 +492,7 @@ func TestFetchJobRunLeases(t *testing.T) { "exclude everything": { dbJobs: dbJobs, dbRuns: dbRuns, - excludedRuns: []uuid.UUID{dbRuns[0].RunID, dbRuns[1].RunID, dbRuns[2].RunID, dbRuns[3].RunID}, + excludedRuns: []string{dbRuns[0].RunID.String(), dbRuns[1].RunID.String(), dbRuns[2].RunID.String(), dbRuns[3].RunID.String()}, maxRowsToFetch: 100, executor: executorName, expectedLeases: nil, diff --git a/internal/scheduler/jobdb/job.go b/internal/scheduler/jobdb/job.go index ee27d39846a..063876a6b8b 100644 --- a/internal/scheduler/jobdb/job.go +++ b/internal/scheduler/jobdb/job.go @@ -5,7 +5,6 @@ import ( "time" "github.com/gogo/protobuf/proto" - "github.com/google/uuid" "github.com/hashicorp/go-multierror" "github.com/pkg/errors" "golang.org/x/exp/maps" @@ -66,7 +65,7 @@ type Job struct { // True if the scheduler has marked the job as succeeded succeeded bool // Job Runs by run id - runsById map[uuid.UUID]*JobRun + runsById map[string]*JobRun // The currently active run. The run with the latest timestamp is the active run. activeRun *JobRun // The timestamp of the currently active run. @@ -690,7 +689,7 @@ func (job *Job) WithUpdatedRun(run *JobRun) *Job { j.runsById = maps.Clone(j.runsById) j.runsById[run.id] = run } else { - j.runsById = map[uuid.UUID]*JobRun{run.id: run} + j.runsById = map[string]*JobRun{run.id: run} } return j } @@ -741,7 +740,7 @@ func (job *Job) ResolvedPools() []string { } // RunById returns the Run corresponding to the provided run id or nil if no such Run exists. -func (job *Job) RunById(id uuid.UUID) *JobRun { +func (job *Job) RunById(id string) *JobRun { return job.runsById[id] } diff --git a/internal/scheduler/jobdb/job_run.go b/internal/scheduler/jobdb/job_run.go index c325f20d19c..dc0b257361b 100644 --- a/internal/scheduler/jobdb/job_run.go +++ b/internal/scheduler/jobdb/job_run.go @@ -4,7 +4,6 @@ import ( "fmt" "time" - "github.com/google/uuid" "github.com/hashicorp/go-multierror" "github.com/pkg/errors" ) @@ -15,7 +14,7 @@ import ( // such as `pod_requirements_overlay`; these are not represented here. type JobRun struct { // Unique identifier for the run. - id uuid.UUID + id string // Id of the job this run is associated with. jobId string // Time at which the run was created. @@ -85,8 +84,7 @@ func (run *JobRun) Assert() error { var result *multierror.Error // Assert that required fields are set. - var emptyUuid uuid.UUID - if run.Id() == emptyUuid { + if run.String() == "" { result = multierror.Append(result, errors.New("run has an empty id")) } if run.JobId() == "" { @@ -204,7 +202,7 @@ func (run *JobRun) Equal(other *JobRun) bool { return true } -func MinimalRun(id uuid.UUID, creationTime int64) *JobRun { +func MinimalRun(id string, creationTime int64) *JobRun { return &JobRun{ id: id, created: creationTime, @@ -214,7 +212,7 @@ func MinimalRun(id uuid.UUID, creationTime int64) *JobRun { // CreateRun creates a new scheduler job run from a database job run func (jobDb *JobDb) CreateRun( - id uuid.UUID, + id string, jobId string, creationTime int64, executor string, @@ -266,7 +264,7 @@ func (jobDb *JobDb) CreateRun( } // Id returns the id of the JobRun. -func (run *JobRun) Id() uuid.UUID { +func (run *JobRun) Id() string { return run.id } diff --git a/internal/scheduler/jobdb/job_run_test.go b/internal/scheduler/jobdb/job_run_test.go index 64c4925db93..d03ab970275 100644 --- a/internal/scheduler/jobdb/job_run_test.go +++ b/internal/scheduler/jobdb/job_run_test.go @@ -42,7 +42,7 @@ var ( ) var baseJobRun = jobDb.CreateRun( - uuid.New(), + uuid.New().String(), uuid.NewString(), 5, "test-executor", @@ -119,7 +119,7 @@ func TestJobRun_TestRunAttempted(t *testing.T) { func TestDeepCopy(t *testing.T) { run := jobDb.CreateRun( - uuid.New(), + uuid.NewString(), "job id", 1, "executor", diff --git a/internal/scheduler/jobdb/job_test.go b/internal/scheduler/jobdb/job_test.go index e3de7b273e6..b66fdceadaa 100644 --- a/internal/scheduler/jobdb/job_test.go +++ b/internal/scheduler/jobdb/job_test.go @@ -42,7 +42,7 @@ var baseJob, _ = jobDb.NewJob( ) var baseRun = &JobRun{ - id: uuid.New(), + id: uuid.New().String(), created: 3, executor: "test-executor", running: true, @@ -161,7 +161,7 @@ func TestJob_TestWithUpdatedRun_NewRun(t *testing.T) { func TestJob_TestWithUpdatedRun_UpdateRun(t *testing.T) { run := &JobRun{ - id: uuid.New(), + id: uuid.New().String(), created: 3, executor: "test-executor", running: true, @@ -179,7 +179,7 @@ func TestJob_TestWithUpdatedRun_UpdateRun(t *testing.T) { func TestJob_TestWithUpdatedRun_AdditionalRun(t *testing.T) { additionalRun := &JobRun{ - id: uuid.New(), + id: uuid.New().String(), created: baseRun.created + 1, executor: "test-executor", running: true, @@ -192,7 +192,7 @@ func TestJob_TestWithUpdatedRun_AdditionalRun(t *testing.T) { func TestJob_TestWithUpdatedRun_AdditionalEarlierRun(t *testing.T) { additionalRun := &JobRun{ - id: uuid.New(), + id: uuid.New().String(), created: baseRun.created - 1, executor: "test-executor", running: true, @@ -206,7 +206,7 @@ func TestJob_TestWithUpdatedRun_AdditionalEarlierRun(t *testing.T) { func TestJob_TestNumReturned(t *testing.T) { returnedRun := func() *JobRun { return &JobRun{ - id: uuid.New(), + id: uuid.New().String(), created: baseRun.created, returned: true, } @@ -214,7 +214,7 @@ func TestJob_TestNumReturned(t *testing.T) { nonReturnedRun := func() *JobRun { return &JobRun{ - id: uuid.New(), + id: uuid.New().String(), created: baseRun.created, returned: false, } @@ -238,7 +238,7 @@ func TestJob_TestNumReturned(t *testing.T) { func TestJob_TestNumAttempts(t *testing.T) { attemptedRun := func() *JobRun { return &JobRun{ - id: uuid.New(), + id: uuid.New().String(), created: baseRun.created, returned: true, runAttempted: true, @@ -247,7 +247,7 @@ func TestJob_TestNumAttempts(t *testing.T) { nonAttemptedRun := func() *JobRun { return &JobRun{ - id: uuid.New(), + id: uuid.New().String(), created: baseRun.created, returned: true, runAttempted: false, @@ -273,7 +273,7 @@ func TestJob_TestRunsById(t *testing.T) { runs := make([]*JobRun, 10) job := baseJob for i := 0; i < len(runs); i++ { - runs[i] = &JobRun{id: uuid.New()} + runs[i] = &JobRun{id: uuid.New().String()} job = job.WithUpdatedRun(runs[i]) } for i := 0; i < len(runs); i++ { diff --git a/internal/scheduler/jobdb/jobdb.go b/internal/scheduler/jobdb/jobdb.go index dead632849d..8a65691390a 100644 --- a/internal/scheduler/jobdb/jobdb.go +++ b/internal/scheduler/jobdb/jobdb.go @@ -24,7 +24,7 @@ var emptyList = immutable.NewSortedSet[*Job](JobPriorityComparer{}) type JobDb struct { jobsById *immutable.Map[string, *Job] - jobsByRunId *immutable.Map[uuid.UUID, string] + jobsByRunId *immutable.Map[string, string] jobsByQueue map[string]immutable.SortedSet[*Job] unvalidatedJobs *immutable.Set[*Job] // Configured priority classes. @@ -41,23 +41,23 @@ type JobDb struct { // Set here so that it can be mocked. clock clock.PassiveClock // Used for generating job run ids. - uuidProvider UUIDProvider + uuidProvider IDProvider // Used to make efficient ResourceList types. resourceListFactory *internaltypes.ResourceListFactory // Info about floating resources floatingResourceTypes *floatingresources.FloatingResourceTypes } -// UUIDProvider is an interface used to mock UUID generation for tests. -type UUIDProvider interface { - New() uuid.UUID +// IDProvider is an interface used to mock run id generation for tests. +type IDProvider interface { + New() string } -// RealUUIDProvider calls uuid.New. +// RealUUIDProvider generates an id using a UUID type RealUUIDProvider struct{} -func (_ RealUUIDProvider) New() uuid.UUID { - return uuid.New() +func (_ RealUUIDProvider) New() string { + return uuid.New().String() } func NewJobDb(priorityClasses map[string]types.PriorityClass, @@ -92,7 +92,7 @@ func NewJobDbWithSchedulingKeyGenerator( unvalidatedJobs := immutable.NewSet[*Job](JobIdHasher{}) return &JobDb{ jobsById: immutable.NewMap[string, *Job](nil), - jobsByRunId: immutable.NewMap[uuid.UUID, string](&UUIDHasher{}), + jobsByRunId: immutable.NewMap[string, string](nil), jobsByQueue: map[string]immutable.SortedSet[*Job]{}, unvalidatedJobs: &unvalidatedJobs, priorityClasses: priorityClasses, @@ -110,7 +110,7 @@ func (jobDb *JobDb) SetClock(clock clock.PassiveClock) { jobDb.clock = clock } -func (jobDb *JobDb) SetUUIDProvider(uuidProvider UUIDProvider) { +func (jobDb *JobDb) SetUUIDProvider(uuidProvider IDProvider) { jobDb.uuidProvider = uuidProvider } @@ -168,7 +168,7 @@ func (jobDb *JobDb) NewJob( cancelByJobSetRequested: cancelByJobSetRequested, cancelled: cancelled, validated: validated, - runsById: map[uuid.UUID]*JobRun{}, + runsById: map[string]*JobRun{}, pools: pools, } job.ensureJobSchedulingInfoFieldsInitialised() @@ -254,7 +254,7 @@ type Txn struct { jobsById *immutable.Map[string, *Job] // Map from run ids to jobs. // Note that a job may have multiple runs, i.e., the mapping is many-to-one. - jobsByRunId *immutable.Map[uuid.UUID, string] + jobsByRunId *immutable.Map[string, string] // Queued jobs for each queue. Stored in the order in which they should be scheduled. jobsByQueue map[string]immutable.SortedSet[*Job] // Jobs that require submit checking @@ -430,7 +430,7 @@ func (txn *Txn) Upsert(jobs []*Job) error { } } } else { - jobsByRunId := immutable.NewMapBuilder[uuid.UUID, string](&UUIDHasher{}) + jobsByRunId := immutable.NewMapBuilder[string, string](nil) for _, job := range jobs { for _, run := range job.runsById { jobsByRunId.Set(run.id, job.id) @@ -481,7 +481,7 @@ func (txn *Txn) GetById(id string) *Job { // GetByRunId returns the job with the given run id or nil if no such job exists // The Job returned by this function *must not* be subsequently modified -func (txn *Txn) GetByRunId(runId uuid.UUID) *Job { +func (txn *Txn) GetByRunId(runId string) *Job { jobId, _ := txn.jobsByRunId.Get(runId) return txn.GetById(jobId) } diff --git a/internal/scheduler/jobdb/jobdb_test.go b/internal/scheduler/jobdb/jobdb_test.go index d86a245ec36..7025691836d 100644 --- a/internal/scheduler/jobdb/jobdb_test.go +++ b/internal/scheduler/jobdb/jobdb_test.go @@ -108,7 +108,7 @@ func TestJobDb_TestGetByRunId(t *testing.T) { require.NoError(t, err) assert.Equal(t, job1, txn.GetByRunId(job1.LatestRun().id)) assert.Equal(t, job2, txn.GetByRunId(job2.LatestRun().id)) - assert.Nil(t, txn.GetByRunId(uuid.New())) + assert.Nil(t, txn.GetByRunId(uuid.NewString())) err = txn.BatchDelete([]string{job1.Id()}) require.NoError(t, err) @@ -1267,7 +1267,7 @@ func newJob() *Job { priority: 0, submittedTime: 0, queued: false, - runsById: map[uuid.UUID]*JobRun{}, + runsById: map[string]*JobRun{}, jobSchedulingInfo: jobSchedulingInfo, } } diff --git a/internal/scheduler/jobdb/reconciliation.go b/internal/scheduler/jobdb/reconciliation.go index a75a480102a..d76bc64401d 100644 --- a/internal/scheduler/jobdb/reconciliation.go +++ b/internal/scheduler/jobdb/reconciliation.go @@ -152,7 +152,7 @@ func (jobDb *JobDb) reconcileJobDifferences(job *Job, jobRepoJob *database.Job, // Reconcile run state transitions. for _, jobRepoRun := range jobRepoRuns { - rst := jobDb.reconcileRunDifferences(job.RunById(jobRepoRun.RunID), jobRepoRun) + rst := jobDb.reconcileRunDifferences(job.RunById(jobRepoRun.RunID.String()), jobRepoRun) jst = jst.applyRunStateTransitions(rst) job = job.WithUpdatedRun(rst.JobRun) } @@ -287,7 +287,7 @@ func (jobDb *JobDb) schedulerJobFromDatabaseJob(dbJob *database.Job) (*Job, erro func (jobDb *JobDb) schedulerRunFromDatabaseRun(dbRun *database.Run) *JobRun { nodeId := api.NodeIdFromExecutorAndNodeName(dbRun.Executor, dbRun.Node) return jobDb.CreateRun( - dbRun.RunID, + dbRun.RunID.String(), dbRun.JobID, dbRun.Created, dbRun.Executor, diff --git a/internal/scheduler/jobdb/uuid_hasher.go b/internal/scheduler/jobdb/uuid_hasher.go deleted file mode 100644 index 81d9216e4e0..00000000000 --- a/internal/scheduler/jobdb/uuid_hasher.go +++ /dev/null @@ -1,24 +0,0 @@ -package jobdb - -import ( - "bytes" - - "github.com/google/uuid" -) - -// UUIDHasher is an implementation of Hasher for UUID. -type UUIDHasher struct{} - -// Hash computes a hash for a UUID. -func (h UUIDHasher) Hash(key uuid.UUID) uint32 { - var hash uint32 - for _, b := range key { - hash = hash*31 + uint32(b) - } - return hash -} - -// Equal checks if two UUIDs are equal. -func (h UUIDHasher) Equal(a, b uuid.UUID) bool { - return bytes.Equal(a[:], b[:]) -} diff --git a/internal/scheduler/jobdb/uuid_hasher_test.go b/internal/scheduler/jobdb/uuid_hasher_test.go deleted file mode 100644 index 6b3e7dcad2d..00000000000 --- a/internal/scheduler/jobdb/uuid_hasher_test.go +++ /dev/null @@ -1,74 +0,0 @@ -package jobdb - -import ( - "testing" - - "github.com/google/uuid" -) - -func TestUUIDHasher_Hash(t *testing.T) { - hasher := UUIDHasher{} - - tests := []struct { - name string - key uuid.UUID - }{ - { - name: "Test with zero UUID", - key: uuid.UUID{}, - }, - { - name: "Test with random UUID", - key: uuid.New(), - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := hasher.Hash(tt.key) - - // Assert the hash value is non-negative (a simple check) - if got < 0 { - t.Errorf("Expected non-negative hash, but got %v", got) - } - }) - } -} - -func TestUUIDHasher_Equal(t *testing.T) { - hasher := UUIDHasher{} - - tests := []struct { - name string - a, b uuid.UUID - want bool - }{ - { - name: "Test with two zero UUIDs", - a: uuid.UUID{}, - b: uuid.UUID{}, - want: true, - }, - { - name: "Test with two different UUIDs", - a: uuid.New(), - b: uuid.New(), - want: false, - }, - { - name: "Test with two same UUIDs", - a: uuid.New(), - b: uuid.MustParse("f47ac10b-58cc-4372-a567-0e02b2c3d479"), // Example UUID, replace with any fixed UUID - want: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := hasher.Equal(tt.a, tt.b) - if got != tt.want { - t.Errorf("Expected %v, but got %v", tt.want, got) - } - }) - } -} diff --git a/internal/scheduler/metrics.go b/internal/scheduler/metrics.go index 67a8c5cd838..305980de463 100644 --- a/internal/scheduler/metrics.go +++ b/internal/scheduler/metrics.go @@ -5,7 +5,6 @@ import ( "sync/atomic" "time" - "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" "golang.org/x/exp/maps" "k8s.io/utils/clock" @@ -304,7 +303,7 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p } for runId, jobRunState := range node.StateByJobRunId { - job := txn.GetByRunId(uuid.MustParse(runId)) + job := txn.GetByRunId(runId) if job != nil { phase := schedulerobjects.JobRunState_name[int32(jobRunState)] key := queuePhaseMetricKey{ diff --git a/internal/scheduler/metrics/state_metrics.go b/internal/scheduler/metrics/state_metrics.go index ce3894436a7..d4e3fa6e1c2 100644 --- a/internal/scheduler/metrics/state_metrics.go +++ b/internal/scheduler/metrics/state_metrics.go @@ -4,7 +4,6 @@ import ( "regexp" "time" - "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" v1 "k8s.io/api/core/v1" @@ -160,7 +159,7 @@ func (m *jobStateMetrics) ReportJobLeased(job *jobdb.Job) { func (m *jobStateMetrics) ReportStateTransitions( jsts []jobdb.JobStateTransitions, - jobRunErrorsByRunId map[uuid.UUID]*armadaevents.Error, + jobRunErrorsByRunId map[string]*armadaevents.Error, ) { for _, jst := range jsts { job := jst.Job diff --git a/internal/scheduler/metrics/state_metrics_test.go b/internal/scheduler/metrics/state_metrics_test.go index 0203cdcee2f..aed4d510892 100644 --- a/internal/scheduler/metrics/state_metrics_test.go +++ b/internal/scheduler/metrics/state_metrics_test.go @@ -28,7 +28,7 @@ const ( var ( baseTime = time.Now() - baseRun = jobdb.MinimalRun(uuid.New(), baseTime.UnixNano()). + baseRun = jobdb.MinimalRun(uuid.NewString(), baseTime.UnixNano()). WithPool(testPool).WithNodeName(testNode). WithExecutor(testCluster) @@ -46,7 +46,7 @@ func TestReportJobStateTransitions(t *testing.T) { errorRegexes []*regexp.Regexp trackedResourceNames []v1.ResourceName jsts []jobdb.JobStateTransitions - jobRunErrorsByRunId map[uuid.UUID]*armadaevents.Error + jobRunErrorsByRunId map[string]*armadaevents.Error expectedJobStateCounterByQueue map[[4]string]float64 expectedJobStateCounterByNode map[[5]string]float64 expectedJobStateSecondsByQueue map[[4]string]float64 diff --git a/internal/scheduler/metrics_test.go b/internal/scheduler/metrics_test.go index 11ae37ea65b..09503a0bf73 100644 --- a/internal/scheduler/metrics_test.go +++ b/internal/scheduler/metrics_test.go @@ -35,7 +35,7 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) { // Run that has been returned runStartTime := testfixtures.BaseTime.Add(-time.Duration(400) * time.Second).UnixNano() runTerminatedTime := testfixtures.BaseTime.Add(-time.Duration(200) * time.Second) - run := jobdb.MinimalRun(uuid.New(), runStartTime) + run := jobdb.MinimalRun(uuid.New().String(), runStartTime) run = run.WithFailed(true) run = run.WithReturned(true) run = run.WithTerminatedTime(&runTerminatedTime) @@ -190,8 +190,8 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) { job1 := testfixtures.TestRunningJobDbJob(0) job2 := testfixtures.TestRunningJobDbJob(0) nodeWithJobs := createNode("type-1") - nodeWithJobs.StateByJobRunId[job1.LatestRun().Id().String()] = schedulerobjects.JobRunState_PENDING - nodeWithJobs.StateByJobRunId[job2.LatestRun().Id().String()] = schedulerobjects.JobRunState_RUNNING + nodeWithJobs.StateByJobRunId[job1.LatestRun().Id()] = schedulerobjects.JobRunState_PENDING + nodeWithJobs.StateByJobRunId[job2.LatestRun().Id()] = schedulerobjects.JobRunState_RUNNING nodeWithJobs.ResourceUsageByQueue[testfixtures.TestQueue] = &schedulerobjects.ResourceList{ Resources: map[string]resource.Quantity{ "cpu": resource.MustParse("1"), diff --git a/internal/scheduler/mocks/job_repository.go b/internal/scheduler/mocks/job_repository.go index c5cbb4abb3f..898cbebe7f1 100644 --- a/internal/scheduler/mocks/job_repository.go +++ b/internal/scheduler/mocks/job_repository.go @@ -53,10 +53,10 @@ func (mr *MockJobRepositoryMockRecorder) CountReceivedPartitions(arg0, arg1 inte } // FetchJobRunErrors mocks base method. -func (m *MockJobRepository) FetchJobRunErrors(arg0 *armadacontext.Context, arg1 []uuid.UUID) (map[uuid.UUID]*armadaevents.Error, error) { +func (m *MockJobRepository) FetchJobRunErrors(arg0 *armadacontext.Context, arg1 []string) (map[string]*armadaevents.Error, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FetchJobRunErrors", arg0, arg1) - ret0, _ := ret[0].(map[uuid.UUID]*armadaevents.Error) + ret0, _ := ret[0].(map[string]*armadaevents.Error) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -68,7 +68,7 @@ func (mr *MockJobRepositoryMockRecorder) FetchJobRunErrors(arg0, arg1 interface{ } // FetchJobRunLeases mocks base method. -func (m *MockJobRepository) FetchJobRunLeases(arg0 *armadacontext.Context, arg1 string, arg2 uint, arg3 []uuid.UUID) ([]*database.JobRunLease, error) { +func (m *MockJobRepository) FetchJobRunLeases(arg0 *armadacontext.Context, arg1 string, arg2 uint, arg3 []string) ([]*database.JobRunLease, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FetchJobRunLeases", arg0, arg1, arg2, arg3) ret0, _ := ret[0].([]*database.JobRunLease) @@ -99,10 +99,10 @@ func (mr *MockJobRepositoryMockRecorder) FetchJobUpdates(arg0, arg1, arg2 interf } // FindInactiveRuns mocks base method. -func (m *MockJobRepository) FindInactiveRuns(arg0 *armadacontext.Context, arg1 []uuid.UUID) ([]uuid.UUID, error) { +func (m *MockJobRepository) FindInactiveRuns(arg0 *armadacontext.Context, arg1 []string) ([]string, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FindInactiveRuns", arg0, arg1) - ret0, _ := ret[0].([]uuid.UUID) + ret0, _ := ret[0].([]string) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/internal/scheduler/mocks/matchers.go b/internal/scheduler/mocks/matchers.go index cacc6cdb5b2..d5f434dc81a 100644 --- a/internal/scheduler/mocks/matchers.go +++ b/internal/scheduler/mocks/matchers.go @@ -1,23 +1,17 @@ package schedulermocks import ( - "sort" - "strings" + "golang.org/x/exp/slices" ) -type Stringer interface { - String() string +type SliceMatcher struct { + Expected []string } -type SliceMatcher[T Stringer] struct { - Expected []T -} - -// Matches // Matches input against provided expected input // This matching ignores the input ordering, so args don't need to be passed in a known order -func (s SliceMatcher[T]) Matches(x interface{}) bool { - inputs, ok := x.([]T) +func (s SliceMatcher) Matches(x interface{}) bool { + inputs, ok := x.([]string) if !ok { return false } @@ -25,14 +19,10 @@ func (s SliceMatcher[T]) Matches(x interface{}) bool { if len(inputs) != len(expected) { return false } - sort.Slice(inputs, func(i, j int) bool { - return strings.Compare(inputs[i].String(), inputs[j].String()) < 0 - }) - sort.Slice(expected, func(i, j int) bool { - return strings.Compare(expected[i].String(), expected[j].String()) < 0 - }) + slices.Sort(inputs) + slices.Sort(expected) for i, inputValue := range inputs { - if inputValue.String() != expected[i].String() { + if inputValue != expected[i] { return false } } @@ -40,6 +30,6 @@ func (s SliceMatcher[T]) Matches(x interface{}) bool { } // String describes what the matcher matches. -func (s SliceMatcher[T]) String() string { - return "checks provided matches expected uuid list" +func (s SliceMatcher) String() string { + return "checks provided matches expected string list ignoring order" } diff --git a/internal/scheduler/poolutils_test.go b/internal/scheduler/poolutils_test.go index f1bbefbf786..1809833847b 100644 --- a/internal/scheduler/poolutils_test.go +++ b/internal/scheduler/poolutils_test.go @@ -3,7 +3,6 @@ package scheduler import ( "testing" - "github.com/google/uuid" "github.com/stretchr/testify/assert" "k8s.io/utils/pointer" @@ -19,7 +18,7 @@ var ( executorWithoutPool = &schedulerobjects.Executor{} runWithPool = testfixtures.JobDb.CreateRun( - uuid.UUID{}, + "", queuedJob.Id(), 123, "test-executor", diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index b51004bbbcf..f219d788ca8 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -260,7 +260,7 @@ func (s *Scheduler) cycle(ctx *armadacontext.Context, updateAll bool, leaderToke // (Each pod can produce at most 12KiB of errors; see // https://kubernetes.io/docs/tasks/debug/debug-application/determine-reason-pod-failure/#customizing-the-termination-message) // If so, the scheduler may not be able to progress until a human manually deleted those errors. - failedRunIds := make([]uuid.UUID, 0, len(updatedJobs)) + failedRunIds := make([]string, 0, len(updatedJobs)) for _, job := range updatedJobs { run := job.LatestRun() if run != nil && run.Failed() { @@ -486,7 +486,7 @@ func AppendEventSequencesFromPreemptedJobs(eventSequences []*armadaevents.EventS eventSequences = append(eventSequences, &armadaevents.EventSequence{ Queue: job.Queue(), JobSetName: job.Jobset(), - Events: createEventsForPreemptedJob(jobId, armadaevents.ProtoUuidFromUuid(run.Id()), time), + Events: createEventsForPreemptedJob(jobId, armadaevents.MustProtoUuidFromUuidString(run.Id()), time), }) } return eventSequences, nil @@ -565,8 +565,8 @@ func AppendEventSequencesFromScheduledJobs(eventSequences []*armadaevents.EventS Created: protoutil.ToTimestamp(runCreationTime), Event: &armadaevents.EventSequence_Event_JobRunLeased{ JobRunLeased: &armadaevents.JobRunLeased{ - RunId: armadaevents.ProtoUuidFromUuid(run.Id()), - RunIdStr: run.Id().String(), + RunId: armadaevents.MustProtoUuidFromUuidString(run.Id()), + RunIdStr: run.Id(), JobId: jobId, JobIdStr: job.Id(), ExecutorId: run.Executor(), @@ -591,7 +591,7 @@ func AppendEventSequencesFromScheduledJobs(eventSequences []*armadaevents.EventS // generateUpdateMessages generates EventSequences representing the state changes on updated jobs. // If there are no state changes then an empty slice will be returned. -func (s *Scheduler) generateUpdateMessages(ctx *armadacontext.Context, txn *jobdb.Txn, updatedJobs []*jobdb.Job, jobRunErrors map[uuid.UUID]*armadaevents.Error) ([]*armadaevents.EventSequence, error) { +func (s *Scheduler) generateUpdateMessages(ctx *armadacontext.Context, txn *jobdb.Txn, updatedJobs []*jobdb.Job, jobRunErrors map[string]*armadaevents.Error) ([]*armadaevents.EventSequence, error) { // Generate any eventSequences that came out of synchronising the db state. var events []*armadaevents.EventSequence for _, job := range updatedJobs { @@ -608,7 +608,7 @@ func (s *Scheduler) generateUpdateMessages(ctx *armadacontext.Context, txn *jobd // generateUpdateMessages generates an EventSequence representing the state changes for a single job. // If there are no state changes it returns nil. -func (s *Scheduler) generateUpdateMessagesFromJob(ctx *armadacontext.Context, job *jobdb.Job, jobRunErrors map[uuid.UUID]*armadaevents.Error, txn *jobdb.Txn) (*armadaevents.EventSequence, error) { +func (s *Scheduler) generateUpdateMessagesFromJob(ctx *armadacontext.Context, job *jobdb.Job, jobRunErrors map[string]*armadaevents.Error, txn *jobdb.Txn) (*armadaevents.EventSequence, error) { var events []*armadaevents.EventSequence_Event // Is the job already in a terminal state? If so then don't send any more messages @@ -647,8 +647,8 @@ func (s *Scheduler) generateUpdateMessagesFromJob(ctx *armadacontext.Context, jo Created: s.now(), Event: &armadaevents.EventSequence_Event_JobRunCancelled{ JobRunCancelled: &armadaevents.JobRunCancelled{ - RunId: armadaevents.ProtoUuidFromUuid(lastRun.Id()), - RunIdStr: lastRun.Id().String(), + RunId: armadaevents.MustProtoUuidFromUuidString(lastRun.Id()), + RunIdStr: lastRun.Id(), JobId: jobId, JobIdStr: job.Id(), }, @@ -681,8 +681,8 @@ func (s *Scheduler) generateUpdateMessagesFromJob(ctx *armadacontext.Context, jo Created: s.now(), Event: &armadaevents.EventSequence_Event_JobRunCancelled{ JobRunCancelled: &armadaevents.JobRunCancelled{ - RunId: armadaevents.ProtoUuidFromUuid(lastRun.Id()), - RunIdStr: lastRun.Id().String(), + RunId: armadaevents.MustProtoUuidFromUuidString(lastRun.Id()), + RunIdStr: lastRun.Id(), JobId: jobId, JobIdStr: job.Id(), }, @@ -752,7 +752,7 @@ func (s *Scheduler) generateUpdateMessagesFromJob(ctx *armadacontext.Context, jo if runError == nil { return nil, errors.Errorf( "no run error found for run %s (job id = %s), this must mean we're out of sync with the database", - lastRun.Id().String(), job.Id(), + lastRun.Id(), job.Id(), ) } @@ -795,7 +795,7 @@ func (s *Scheduler) generateUpdateMessagesFromJob(ctx *armadacontext.Context, jo } } else if lastRun.PreemptRequested() && job.PriorityClass().Preemptible { job = job.WithQueued(false).WithFailed(true).WithUpdatedRun(lastRun.WithoutTerminal().WithFailed(true)) - events = append(events, createEventsForPreemptedJob(jobId, armadaevents.ProtoUuidFromUuid(lastRun.Id()), s.clock.Now())...) + events = append(events, createEventsForPreemptedJob(jobId, armadaevents.MustProtoUuidFromUuidString(lastRun.Id()), s.clock.Now())...) } } @@ -880,8 +880,8 @@ func (s *Scheduler) expireJobsIfNecessary(ctx *armadacontext.Context, txn *jobdb Created: s.now(), Event: &armadaevents.EventSequence_Event_JobRunErrors{ JobRunErrors: &armadaevents.JobRunErrors{ - RunId: armadaevents.ProtoUuidFromUuid(run.Id()), - RunIdStr: run.Id().String(), + RunId: armadaevents.MustProtoUuidFromUuidString(run.Id()), + RunIdStr: run.Id(), JobId: jobId, JobIdStr: job.Id(), Errors: []*armadaevents.Error{leaseExpiredError}, diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 3b97262d20b..933f1b36997 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -191,7 +191,7 @@ var returnedOnceLeasedJob = testfixtures.NewJob( 1, true, ).WithUpdatedRun(testfixtures.JobDb.CreateRun( - uuid.New(), + uuid.NewString(), "01h3w2wtdchtc80hgyp782shrv", 0, "testExecutor", @@ -276,7 +276,7 @@ var ( 1, true, ).WithUpdatedRun(testfixtures.JobDb.CreateRun( - uuid.New(), + uuid.NewString(), requeuedJobId, time.Now().Unix(), "testExecutor", @@ -305,34 +305,34 @@ var ( // Test a single scheduler cycle func TestScheduler_TestCycle(t *testing.T) { tests := map[string]struct { - initialJobs []*jobdb.Job // jobs in the jobDb at the start of the cycle - jobUpdates []database.Job // job updates from the database - runUpdates []database.Run // run updates from the database - jobRunErrors map[uuid.UUID]*armadaevents.Error // job run errors in the database - staleExecutor bool // if true then the executorRepository will report the executor as stale - fetchError bool // if true then the jobRepository will throw an error - scheduleError bool // if true then the scheduling algo will throw an error - publishError bool // if true the publisher will throw an error - submitCheckerFailure bool // if true the submit checker will say the job is unschedulable - expectedJobRunLeased []string // ids of jobs we expect to have produced leased messages - expectedJobRunErrors []string // ids of jobs we expect to have produced jobRunErrors messages - expectedJobErrors []string // ids of jobs we expect to have produced jobErrors messages - expectedJobsRunsToPreempt []string // ids of jobs we expect to be preempted by the scheduler - expectedJobRunPreempted []string // ids of jobs we expect to have produced jobRunPreempted messages - expectedJobRunCancelled []string // ids of jobs we expect to have produced jobRunPreempted messages - expectedJobCancelled []string // ids of jobs we expect to have produced cancelled messages - expectedJobRequestCancel []string // ids of jobs we expect to have produced request cancel - expectedJobReprioritised []string // ids of jobs we expect to have produced reprioritised messages - expectedQueued []string // ids of jobs we expect to have produced requeued messages - expectedJobSucceeded []string // ids of jobs we expect to have produced succeeeded messages - expectedLeased []string // ids of jobs we expected to be leased in jobdb at the end of the cycle - expectedRequeued []string // ids of jobs we expected to be requeued in jobdb at the end of the cycle - expectedValidated []string // ids of jobs we expected to have produced submit checked messages - expectedTerminal []string // ids of jobs we expected to be terminal in jobdb at the end of the cycle - expectedJobPriority map[string]uint32 // expected priority of jobs at the end of the cycle - expectedNodeAntiAffinities []string // list of nodes there is expected to be anti affinities for on job scheduling info - expectedJobSchedulingInfoVersion int // expected scheduling info version of jobs at the end of the cycle - expectedQueuedVersion int32 // expected queued version of jobs at the end of the cycle + initialJobs []*jobdb.Job // jobs in the jobDb at the start of the cycle + jobUpdates []database.Job // job updates from the database + runUpdates []database.Run // run updates from the database + jobRunErrors map[string]*armadaevents.Error // job run errors in the database + staleExecutor bool // if true then the executorRepository will report the executor as stale + fetchError bool // if true then the jobRepository will throw an error + scheduleError bool // if true then the scheduling algo will throw an error + publishError bool // if true the publisher will throw an error + submitCheckerFailure bool // if true the submit checker will say the job is unschedulable + expectedJobRunLeased []string // ids of jobs we expect to have produced leased messages + expectedJobRunErrors []string // ids of jobs we expect to have produced jobRunErrors messages + expectedJobErrors []string // ids of jobs we expect to have produced jobErrors messages + expectedJobsRunsToPreempt []string // ids of jobs we expect to be preempted by the scheduler + expectedJobRunPreempted []string // ids of jobs we expect to have produced jobRunPreempted messages + expectedJobRunCancelled []string // ids of jobs we expect to have produced jobRunPreempted messages + expectedJobCancelled []string // ids of jobs we expect to have produced cancelled messages + expectedJobRequestCancel []string // ids of jobs we expect to have produced request cancel + expectedJobReprioritised []string // ids of jobs we expect to have produced reprioritised messages + expectedQueued []string // ids of jobs we expect to have produced requeued messages + expectedJobSucceeded []string // ids of jobs we expect to have produced succeeeded messages + expectedLeased []string // ids of jobs we expected to be leased in jobdb at the end of the cycle + expectedRequeued []string // ids of jobs we expected to be requeued in jobdb at the end of the cycle + expectedValidated []string // ids of jobs we expected to have produced submit checked messages + expectedTerminal []string // ids of jobs we expected to be terminal in jobdb at the end of the cycle + expectedJobPriority map[string]uint32 // expected priority of jobs at the end of the cycle + expectedNodeAntiAffinities []string // list of nodes there is expected to be anti affinities for on job scheduling info + expectedJobSchedulingInfoVersion int // expected scheduling info version of jobs at the end of the cycle + expectedQueuedVersion int32 // expected queued version of jobs at the end of the cycle }{ "Lease a single job already in the db": { initialJobs: []*jobdb.Job{queuedJob}, @@ -457,7 +457,7 @@ func TestScheduler_TestCycle(t *testing.T) { }, runUpdates: []database.Run{ { - RunID: requeuedJob.LatestRun().Id(), + RunID: uuid.MustParse(requeuedJob.LatestRun().Id()), JobID: requeuedJob.Id(), JobSet: "testJobSet", Executor: "testExecutor", @@ -475,7 +475,7 @@ func TestScheduler_TestCycle(t *testing.T) { initialJobs: []*jobdb.Job{leasedJob}, runUpdates: []database.Run{ { - RunID: leasedJob.LatestRun().Id(), + RunID: uuid.MustParse(leasedJob.LatestRun().Id()), JobID: leasedJob.Id(), JobSet: "testJobSet", Executor: "testExecutor", @@ -496,7 +496,7 @@ func TestScheduler_TestCycle(t *testing.T) { initialJobs: []*jobdb.Job{leasedJob}, runUpdates: []database.Run{ { - RunID: leasedJob.LatestRun().Id(), + RunID: uuid.MustParse(leasedJob.LatestRun().Id()), JobID: leasedJob.Id(), JobSet: "testJobSet", Executor: "testExecutor", @@ -516,7 +516,7 @@ func TestScheduler_TestCycle(t *testing.T) { initialJobs: []*jobdb.Job{leasedJob.WithValidated(true)}, runUpdates: []database.Run{ { - RunID: leasedJob.LatestRun().Id(), + RunID: uuid.MustParse(leasedJob.LatestRun().Id()), JobID: leasedJob.Id(), JobSet: "testJobSet", Executor: "testExecutor", @@ -526,7 +526,7 @@ func TestScheduler_TestCycle(t *testing.T) { Serial: 1, }, }, - jobRunErrors: map[uuid.UUID]*armadaevents.Error{ + jobRunErrors: map[string]*armadaevents.Error{ leasedJob.LatestRun().Id(): defaultJobError, }, submitCheckerFailure: true, @@ -538,7 +538,7 @@ func TestScheduler_TestCycle(t *testing.T) { initialJobs: []*jobdb.Job{returnedOnceLeasedJob}, runUpdates: []database.Run{ { - RunID: returnedOnceLeasedJob.LatestRun().Id(), + RunID: uuid.MustParse(returnedOnceLeasedJob.LatestRun().Id()), JobID: returnedOnceLeasedJob.Id(), JobSet: "testJobSet", Executor: "testExecutor", @@ -549,7 +549,7 @@ func TestScheduler_TestCycle(t *testing.T) { Serial: 2, }, }, - jobRunErrors: map[uuid.UUID]*armadaevents.Error{ + jobRunErrors: map[string]*armadaevents.Error{ returnedOnceLeasedJob.LatestRun().Id(): defaultJobError, }, expectedJobErrors: []string{returnedOnceLeasedJob.Id()}, @@ -561,7 +561,7 @@ func TestScheduler_TestCycle(t *testing.T) { // Fail fast should mean there is only ever 1 attempted run runUpdates: []database.Run{ { - RunID: leasedFailFastJob.LatestRun().Id(), + RunID: uuid.MustParse(leasedFailFastJob.LatestRun().Id()), JobID: leasedFailFastJob.Id(), JobSet: "testJobSet", Executor: "testExecutor", @@ -571,7 +571,7 @@ func TestScheduler_TestCycle(t *testing.T) { Serial: 1, }, }, - jobRunErrors: map[uuid.UUID]*armadaevents.Error{ + jobRunErrors: map[string]*armadaevents.Error{ leasedFailFastJob.LatestRun().Id(): defaultJobError, }, expectedJobErrors: []string{leasedFailFastJob.Id()}, @@ -598,7 +598,7 @@ func TestScheduler_TestCycle(t *testing.T) { initialJobs: []*jobdb.Job{preemptibleLeasedJob}, runUpdates: []database.Run{ { - RunID: preemptibleLeasedJob.LatestRun().Id(), + RunID: uuid.MustParse(preemptibleLeasedJob.LatestRun().Id()), JobID: preemptibleLeasedJob.Id(), JobSet: "testJobSet", Executor: "testExecutor", @@ -616,7 +616,7 @@ func TestScheduler_TestCycle(t *testing.T) { initialJobs: []*jobdb.Job{leasedJob}, runUpdates: []database.Run{ { - RunID: leasedJob.LatestRun().Id(), + RunID: uuid.MustParse(leasedJob.LatestRun().Id()), JobID: leasedJob.Id(), JobSet: "testJobSet", Executor: "testExecutor", @@ -740,7 +740,7 @@ func TestScheduler_TestCycle(t *testing.T) { initialJobs: []*jobdb.Job{leasedJob}, runUpdates: []database.Run{ { - RunID: leasedJob.LatestRun().Id(), + RunID: uuid.MustParse(leasedJob.LatestRun().Id()), JobID: leasedJob.Id(), JobSet: "testJobSet", Executor: "testExecutor", @@ -748,7 +748,7 @@ func TestScheduler_TestCycle(t *testing.T) { Serial: 1, }, }, - jobRunErrors: map[uuid.UUID]*armadaevents.Error{ + jobRunErrors: map[string]*armadaevents.Error{ leasedJob.LatestRun().Id(): defaultJobError, }, expectedJobErrors: []string{leasedJob.Id()}, @@ -759,7 +759,7 @@ func TestScheduler_TestCycle(t *testing.T) { initialJobs: []*jobdb.Job{leasedJob}, runUpdates: []database.Run{ { - RunID: leasedJob.LatestRun().Id(), + RunID: uuid.MustParse(leasedJob.LatestRun().Id()), JobID: leasedJob.Id(), JobSet: "testJobSet", Executor: "testExecutor", @@ -1106,7 +1106,7 @@ func TestScheduler_TestSyncState(t *testing.T) { }, runUpdates: []database.Run{ { - RunID: uuid.UUID{}, + RunID: uuid.MustParse(leasedJob.LatestRun().Id()), JobID: queuedJob.Id(), JobSet: queuedJob.Jobset(), Executor: "test-executor", @@ -1121,7 +1121,7 @@ func TestScheduler_TestSyncState(t *testing.T) { expectedUpdatedJobs: []*jobdb.Job{ queuedJob.WithUpdatedRun( testfixtures.JobDb.CreateRun( - uuid.UUID{}, + leasedJob.LatestRun().Id(), queuedJob.Id(), 123, "test-executor", @@ -1164,7 +1164,7 @@ func TestScheduler_TestSyncState(t *testing.T) { }, runUpdates: []database.Run{ { - RunID: leasedJob.LatestRun().Id(), + RunID: uuid.MustParse(leasedJob.LatestRun().Id()), JobID: leasedJob.LatestRun().JobId(), JobSet: leasedJob.Jobset(), Succeeded: true, @@ -1279,17 +1279,17 @@ func (t *testSubmitChecker) Check(_ *armadacontext.Context, jobs []*jobdb.Job) ( type testJobRepository struct { updatedJobs []database.Job updatedRuns []database.Run - errors map[uuid.UUID]*armadaevents.Error + errors map[string]*armadaevents.Error shouldError bool numReceivedPartitions uint32 } -func (t *testJobRepository) FindInactiveRuns(ctx *armadacontext.Context, runIds []uuid.UUID) ([]uuid.UUID, error) { +func (t *testJobRepository) FindInactiveRuns(ctx *armadacontext.Context, runIds []string) ([]string, error) { // TODO implement me panic("implement me") } -func (t *testJobRepository) FetchJobRunLeases(ctx *armadacontext.Context, executor string, maxResults uint, excludedRunIds []uuid.UUID) ([]*database.JobRunLease, error) { +func (t *testJobRepository) FetchJobRunLeases(ctx *armadacontext.Context, executor string, maxResults uint, excludedRunIds []string) ([]*database.JobRunLease, error) { // TODO implement me panic("implement me") } @@ -1301,7 +1301,7 @@ func (t *testJobRepository) FetchJobUpdates(ctx *armadacontext.Context, jobSeria return t.updatedJobs, t.updatedRuns, nil } -func (t *testJobRepository) FetchJobRunErrors(ctx *armadacontext.Context, runIds []uuid.UUID) (map[uuid.UUID]*armadaevents.Error, error) { +func (t *testJobRepository) FetchJobRunErrors(ctx *armadacontext.Context, runIds []string) (map[string]*armadaevents.Error, error) { if t.shouldError { return nil, errors.New("error fetching job run errors") } diff --git a/internal/scheduler/schedulerobjects/executor.go b/internal/scheduler/schedulerobjects/executor.go index 30cf94bbfc4..d13c25bbe0a 100644 --- a/internal/scheduler/schedulerobjects/executor.go +++ b/internal/scheduler/schedulerobjects/executor.go @@ -1,28 +1,15 @@ package schedulerobjects -import ( - "github.com/google/uuid" - "github.com/pkg/errors" -) - -func (m *Executor) AllRuns() ([]uuid.UUID, error) { - runIds := make([]uuid.UUID, 0) +func (m *Executor) AllRuns() ([]string, error) { + runIds := make([]string, 0) // add all runids from nodes for _, node := range m.Nodes { - for runIdStr := range node.StateByJobRunId { - runId, err := uuid.Parse(runIdStr) - if err != nil { - return nil, errors.WithStack(err) - } + for runId := range node.StateByJobRunId { runIds = append(runIds, runId) } } // add all unassigned runids - for _, runIdStr := range m.UnassignedJobRuns { - runId, err := uuid.Parse(runIdStr) - if err != nil { - return nil, errors.WithStack(err) - } + for _, runId := range m.UnassignedJobRuns { runIds = append(runIds, runId) } return runIds, nil diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index 420c5fe2843..f670c8c8208 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -7,7 +7,6 @@ import ( "time" "github.com/benbjohnson/immutable" - "github.com/google/uuid" "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/exp/maps" @@ -642,7 +641,7 @@ func (l *FairSchedulingAlgo) filterLaggingExecutors( Errorf("failed to retrieve runs for executor %s; will not be considered for scheduling", executor.Id) continue } - executorRunIds := make(map[uuid.UUID]bool, len(executorRuns)) + executorRunIds := make(map[string]bool, len(executorRuns)) for _, run := range executorRuns { executorRunIds[run] = true } diff --git a/internal/scheduler/scheduling_algo_test.go b/internal/scheduler/scheduling_algo_test.go index a6424ee3a24..97edbe26be5 100644 --- a/internal/scheduler/scheduling_algo_test.go +++ b/internal/scheduler/scheduling_algo_test.go @@ -487,7 +487,7 @@ func TestSchedule(t *testing.T) { job = job.WithQueued(false).WithNewRun(executor.Id, node.Id, node.Name, node.Pool, job.PriorityClass().Priority) if existingJobs.acknowledged { run := job.LatestRun() - node.StateByJobRunId[run.Id().String()] = schedulerobjects.JobRunState_RUNNING + node.StateByJobRunId[run.Id()] = schedulerobjects.JobRunState_RUNNING } jobsToUpsert = append(jobsToUpsert, job) executorIndexByJobId[job.Id()] = executorIndex diff --git a/internal/scheduler/testfixtures/testfixtures.go b/internal/scheduler/testfixtures/testfixtures.go index 250bcd9c802..967286962f0 100644 --- a/internal/scheduler/testfixtures/testfixtures.go +++ b/internal/scheduler/testfixtures/testfixtures.go @@ -125,7 +125,7 @@ func NewJobDb(resourceListFactory *internaltypes.ResourceListFactory) *jobdb.Job ) // Mock out the clock and uuid provider to ensure consistent ids and timestamps are generated. jobDb.SetClock(NewMockPassiveClock()) - jobDb.SetUUIDProvider(NewMockUUIDProvider()) + jobDb.SetUUIDProvider(NewMockIDProvider()) return jobDb } @@ -893,7 +893,7 @@ func TestQueuedJobDbJob() *jobdb.Job { func TestRunningJobDbJob(startTime int64) *jobdb.Job { return TestQueuedJobDbJob(). WithQueued(false). - WithUpdatedRun(jobdb.MinimalRun(uuid.New(), startTime)) + WithUpdatedRun(jobdb.MinimalRun(uuid.New().String(), startTime)) } func Test1CoreSubmitMsg() *armadaevents.SubmitJob { @@ -963,20 +963,20 @@ func TestExecutor(lastUpdateTime time.Time) *schedulerobjects.Executor { } } -type MockUUIDProvider struct { +type MockIDProvider struct { i uint64 mu sync.Mutex } -func NewMockUUIDProvider() *MockUUIDProvider { - return &MockUUIDProvider{} +func NewMockIDProvider() *MockIDProvider { + return &MockIDProvider{} } -func (p *MockUUIDProvider) New() uuid.UUID { +func (p *MockIDProvider) New() string { p.mu.Lock() defer p.mu.Unlock() p.i += 1 // Increment before write to avoid using the all-zeros UUID. - return UUIDFromInt(p.i) + return UUIDFromInt(p.i).String() } func UUIDFromInt(i uint64) uuid.UUID { diff --git a/pkg/armadaevents/uuid.go b/pkg/armadaevents/uuid.go index 43c4c9a3ce6..66af258ad3e 100644 --- a/pkg/armadaevents/uuid.go +++ b/pkg/armadaevents/uuid.go @@ -102,6 +102,16 @@ func ProtoUuidFromUuidString(uuidString string) (*Uuid, error) { return ProtoUuidFromUuid(id), nil } +// MustProtoUuidFromUuidString parses a UUID string into a proto UUID and returns it. +func MustProtoUuidFromUuidString(uuidString string) *Uuid { + id, err := uuid.Parse(uuidString) + if err != nil { + err = errors.WithStack(err) + panic(err) + } + return ProtoUuidFromUuid(id) +} + func MustUlidStringFromProtoUuid(id *Uuid) string { return strings.ToLower(UlidFromProtoUuid(id).String()) }