Skip to content

Commit

Permalink
Scheduler database to use string run ids (#3946)
Browse files Browse the repository at this point in the history
* scheduler database to use string run ids

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* fix some tests

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* fix some tests

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* lint

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* JobIdFromEvent to return a string

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

---------

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>
  • Loading branch information
d80tb7 authored Sep 12, 2024
1 parent 85d2ce3 commit eec6b6b
Show file tree
Hide file tree
Showing 29 changed files with 896 additions and 731 deletions.
7 changes: 3 additions & 4 deletions internal/common/eventutil/eventutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,11 @@ func UnmarshalEventSequence(ctx *armadacontext.Context, payload []byte) (*armada
func ShortSequenceString(sequence *armadaevents.EventSequence) string {
s := ""
for _, event := range sequence.Events {
jobId, _ := armadaevents.JobIdFromEvent(event)
jobIdString, err := armadaevents.UlidStringFromProtoUuid(jobId)
jobId, err := armadaevents.JobIdFromEvent(event)
if err != nil {
jobIdString = ""
jobId = ""
}
s += fmt.Sprintf("[%T (job %s)] ", event.Event, jobIdString)
s += fmt.Sprintf("[%T (job %s)] ", event.Event, jobId)
}
return s
}
Expand Down
34 changes: 0 additions & 34 deletions internal/common/ingest/testfixtures/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@ const (
)

var (
JobIdProto, _ = armadaevents.ProtoUuidFromUlidString(JobIdString)
RunIdProto = armadaevents.ProtoUuidFromUuid(uuid.MustParse(RunIdString))
PartitionMarkerGroupIdProto = armadaevents.ProtoUuidFromUuid(uuid.MustParse(PartitionMarkerGroupIdString))
JobIdUuid = armadaevents.UuidFromProtoUuid(JobIdProto)
RunIdUuid = armadaevents.UuidFromProtoUuid(RunIdProto)
PartitionMarkerGroupIdUuid = armadaevents.UuidFromProtoUuid(PartitionMarkerGroupIdProto)
PriorityClassName = "test-priority"
Groups = []string{"group1", "group2"}
Expand Down Expand Up @@ -84,7 +80,6 @@ var Submit = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_SubmitJob{
SubmitJob: &armadaevents.SubmitJob{
JobId: JobIdProto,
JobIdStr: JobIdString,
Priority: Priority,
AtMostOnce: true,
Expand Down Expand Up @@ -137,9 +132,7 @@ var Assigned = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_JobRunAssigned{
JobRunAssigned: &armadaevents.JobRunAssigned{
RunId: RunIdProto,
RunIdStr: RunIdString,
JobId: JobIdProto,
JobIdStr: JobIdString,
ResourceInfos: []*armadaevents.KubernetesResourceInfo{
{
Expand All @@ -164,9 +157,7 @@ var Leased = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_JobRunLeased{
JobRunLeased: &armadaevents.JobRunLeased{
RunId: RunIdProto,
RunIdStr: RunIdString,
JobId: JobIdProto,
JobIdStr: JobIdString,
ExecutorId: ExecutorId,
NodeId: NodeName,
Expand All @@ -191,9 +182,7 @@ var Running = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_JobRunRunning{
JobRunRunning: &armadaevents.JobRunRunning{
RunId: RunIdProto,
RunIdStr: RunIdString,
JobId: JobIdProto,
JobIdStr: JobIdString,
ResourceInfos: []*armadaevents.KubernetesResourceInfo{
{
Expand All @@ -213,9 +202,7 @@ var JobRunSucceeded = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_JobRunSucceeded{
JobRunSucceeded: &armadaevents.JobRunSucceeded{
RunId: RunIdProto,
RunIdStr: RunIdString,
JobId: JobIdProto,
JobIdStr: JobIdString,
},
},
Expand All @@ -225,9 +212,7 @@ var JobRunCancelled = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_JobRunCancelled{
JobRunCancelled: &armadaevents.JobRunCancelled{
RunId: RunIdProto,
RunIdStr: RunIdString,
JobId: JobIdProto,
JobIdStr: JobIdString,
},
},
Expand All @@ -237,9 +222,7 @@ var LeaseReturned = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_JobRunErrors{
JobRunErrors: &armadaevents.JobRunErrors{
JobId: JobIdProto,
JobIdStr: JobIdString,
RunId: RunIdProto,
RunIdStr: RunIdString,
Errors: []*armadaevents.Error{
{
Expand All @@ -260,7 +243,6 @@ var JobCancelRequested = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_CancelJob{
CancelJob: &armadaevents.CancelJob{
JobId: JobIdProto,
JobIdStr: JobIdString,
},
},
Expand All @@ -277,7 +259,6 @@ var JobCancelled = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_CancelledJob{
CancelledJob: &armadaevents.CancelledJob{
JobId: JobIdProto,
JobIdStr: JobIdString,
},
},
Expand All @@ -287,7 +268,6 @@ var JobValidated = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_JobValidated{
JobValidated: &armadaevents.JobValidated{
JobId: JobIdProto,
JobIdStr: JobIdString,
Pools: []string{"cpu"},
},
Expand All @@ -298,7 +278,6 @@ var JobRequeued = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_JobRequeued{
JobRequeued: &armadaevents.JobRequeued{
JobId: JobIdProto,
JobIdStr: JobIdString,
SchedulingInfo: &schedulerobjects.JobSchedulingInfo{
Lifetime: 0,
Expand Down Expand Up @@ -348,7 +327,6 @@ var JobReprioritiseRequested = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_ReprioritiseJob{
ReprioritiseJob: &armadaevents.ReprioritiseJob{
JobId: JobIdProto,
JobIdStr: JobIdString,
Priority: NewPriority,
},
Expand All @@ -368,7 +346,6 @@ var JobReprioritised = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_ReprioritisedJob{
ReprioritisedJob: &armadaevents.ReprioritisedJob{
JobId: JobIdProto,
JobIdStr: JobIdString,
Priority: NewPriority,
},
Expand All @@ -379,7 +356,6 @@ var JobPreemptionRequested = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_JobPreemptionRequested{
JobPreemptionRequested: &armadaevents.JobPreemptionRequested{
JobId: JobIdProto,
JobIdStr: JobIdString,
},
},
Expand All @@ -389,9 +365,7 @@ var JobRunPreempted = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_JobRunPreempted{
JobRunPreempted: &armadaevents.JobRunPreempted{
PreemptedJobId: JobIdProto,
PreemptedJobIdStr: JobIdString,
PreemptedRunId: RunIdProto,
PreemptedRunIdStr: RunIdString,
},
},
Expand All @@ -401,9 +375,7 @@ var JobRunFailed = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_JobRunErrors{
JobRunErrors: &armadaevents.JobRunErrors{
JobId: JobIdProto,
JobIdStr: JobIdString,
RunId: RunIdProto,
RunIdStr: RunIdString,
Errors: []*armadaevents.Error{
{
Expand All @@ -428,9 +400,7 @@ var JobRunUnschedulable = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_JobRunErrors{
JobRunErrors: &armadaevents.JobRunErrors{
JobId: JobIdProto,
JobIdStr: JobIdString,
RunId: RunIdProto,
RunIdStr: RunIdString,
Errors: []*armadaevents.Error{
{
Expand All @@ -454,7 +424,6 @@ var JobPreempted = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_JobErrors{
JobErrors: &armadaevents.JobErrors{
JobId: JobIdProto,
JobIdStr: JobIdString,
Errors: []*armadaevents.Error{
{
Expand All @@ -472,7 +441,6 @@ var JobRejected = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_JobErrors{
JobErrors: &armadaevents.JobErrors{
JobId: JobIdProto,
JobIdStr: JobIdString,
Errors: []*armadaevents.Error{
{
Expand All @@ -492,7 +460,6 @@ var JobFailed = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_JobErrors{
JobErrors: &armadaevents.JobErrors{
JobId: JobIdProto,
JobIdStr: JobIdString,
Errors: []*armadaevents.Error{
{
Expand All @@ -516,7 +483,6 @@ var JobSucceeded = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_JobSucceeded{
JobSucceeded: &armadaevents.JobSucceeded{
JobId: JobIdProto,
JobIdStr: JobIdString,
},
},
Expand Down
7 changes: 6 additions & 1 deletion internal/executor/service/lease_requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"io"

armadaslices "github.com/armadaproject/armada/internal/common/slices"

grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -60,7 +62,10 @@ func (requester *JobLeaseRequester) LeaseJobRuns(ctx *armadacontext.Context, req
Resources: request.AvailableResource.ToProtoMap(),
Nodes: request.Nodes,
UnassignedJobRunIds: request.UnassignedJobRunIds,
MaxJobsToLease: request.MaxJobsToLease,
UnassignedJobRunIdsStr: armadaslices.Map(request.UnassignedJobRunIds, func(uuid *armadaevents.Uuid) string {
return armadaevents.MustUuidStringFromProtoUuid(uuid)
}),
MaxJobsToLease: request.MaxJobsToLease,
}
if err := stream.Send(leaseRequest); err != nil {
return nil, errors.WithStack(err)
Expand Down
6 changes: 5 additions & 1 deletion internal/executor/service/lease_requester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/mocks"
armadaresource "github.com/armadaproject/armada/internal/common/resource"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/executor/context/fake"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/armadaevents"
Expand Down Expand Up @@ -108,7 +109,10 @@ func TestLeaseJobRuns_Send(t *testing.T) {
Resources: leaseRequest.AvailableResource.ToProtoMap(),
Nodes: leaseRequest.Nodes,
UnassignedJobRunIds: leaseRequest.UnassignedJobRunIds,
MaxJobsToLease: leaseRequest.MaxJobsToLease,
UnassignedJobRunIdsStr: armadaslices.Map(leaseRequest.UnassignedJobRunIds, func(uuid *armadaevents.Uuid) string {
return armadaevents.MustUuidStringFromProtoUuid(uuid)
}),
MaxJobsToLease: leaseRequest.MaxJobsToLease,
}

jobRequester, mockExecutorApiClient, mockStream := setup(t)
Expand Down
18 changes: 3 additions & 15 deletions internal/lookoutingesterv2/instructions/instructions.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,18 +217,12 @@ func (c *InstructionConverter) handleReprioritiseJob(ts time.Time, event *armada
}

func (c *InstructionConverter) handleCancelledJob(ts time.Time, event *armadaevents.CancelledJob, update *model.InstructionSet) error {
jobId, err := armadaevents.UlidStringFromProtoUuid(event.GetJobId())
if err != nil {
c.metrics.RecordPulsarMessageError(metrics.PulsarMessageErrorProcessing)
return err
}

var reason *string
if event.Reason != "" {
reason = &event.Reason
}
jobUpdate := model.UpdateJobInstruction{
JobId: jobId,
JobId: event.GetJobIdStr(),
State: pointer.Int32(int32(lookout.JobCancelledOrdinal)),
Cancelled: &ts,
CancelReason: reason,
Expand Down Expand Up @@ -283,27 +277,21 @@ func (c *InstructionConverter) handleJobErrors(ts time.Time, event *armadaevents
}

func (c *InstructionConverter) handleJobRunRunning(ts time.Time, event *armadaevents.JobRunRunning, update *model.InstructionSet) error {
runId, err := armadaevents.UuidStringFromProtoUuid(event.GetRunId())
if err != nil {
c.metrics.RecordPulsarMessageError(metrics.PulsarMessageErrorProcessing)
return err
}

// Update Job
job := model.UpdateJobInstruction{
JobId: event.JobIdStr,
State: pointer.Int32(int32(lookout.JobRunningOrdinal)),
LastTransitionTime: &ts,
LastTransitionTimeSeconds: pointer.Int64(ts.Unix()),
LatestRunId: &runId,
LatestRunId: &event.RunIdStr,
}

update.JobsToUpdate = append(update.JobsToUpdate, &job)

// Update Job Run
node := getNode(event.ResourceInfos)
jobRun := model.UpdateJobRunInstruction{
RunId: runId,
RunId: event.RunIdStr,
Node: node,
Started: &ts,
JobRunState: pointer.Int32(lookout.JobRunRunningOrdinal),
Expand Down
28 changes: 2 additions & 26 deletions internal/lookoutingesterv2/instructions/instructions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,38 +404,14 @@ func TestConvert(t *testing.T) {
MessageIds: []pulsar.MessageID{pulsarutils.NewMessageId(1)},
},
},
"invalid event without job id or run id": {
events: &ingest.EventSequencesWithIds{
EventSequences: []*armadaevents.EventSequence{
testfixtures.NewEventSequence(&armadaevents.EventSequence_Event{
Created: testfixtures.BaseTimeProto,
Event: &armadaevents.EventSequence_Event_JobRunRunning{
JobRunRunning: &armadaevents.JobRunRunning{},
},
}),
testfixtures.NewEventSequence(submit),
},
MessageIds: []pulsar.MessageID{
pulsarutils.NewMessageId(1),
pulsarutils.NewMessageId(2),
},
},
expected: &model.InstructionSet{
JobsToCreate: []*model.CreateJobInstruction{expectedSubmit},
MessageIds: []pulsar.MessageID{
pulsarutils.NewMessageId(1),
pulsarutils.NewMessageId(2),
},
},
},
"invalid event without created time": {
events: &ingest.EventSequencesWithIds{
EventSequences: []*armadaevents.EventSequence{
testfixtures.NewEventSequence(&armadaevents.EventSequence_Event{
Event: &armadaevents.EventSequence_Event_JobRunRunning{
JobRunRunning: &armadaevents.JobRunRunning{
RunId: testfixtures.RunIdProto,
JobId: testfixtures.JobIdProto,
RunIdStr: testfixtures.RunIdString,
JobIdStr: testfixtures.JobIdString,
ResourceInfos: []*armadaevents.KubernetesResourceInfo{
{
Info: &armadaevents.KubernetesResourceInfo_PodInfo{
Expand Down
13 changes: 7 additions & 6 deletions internal/scheduler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,13 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns
err := stream.Send(&executorapi.LeaseStreamMessage{
Event: &executorapi.LeaseStreamMessage_Lease{
Lease: &executorapi.JobRunLease{
JobRunId: armadaevents.ProtoUuidFromUuid(lease.RunID),
Queue: lease.Queue,
Jobset: lease.JobSet,
User: lease.UserID,
Groups: groups,
Job: submitMsg,
JobRunId: armadaevents.MustProtoUuidFromUuidString(lease.RunID),
JobRunIdStr: lease.RunID,
Queue: lease.Queue,
Jobset: lease.JobSet,
User: lease.UserID,
Groups: groups,
Job: submitMsg,
},
},
})
Expand Down
Loading

0 comments on commit eec6b6b

Please sign in to comment.