Skip to content

Commit

Permalink
Clean Up Api Proto (#3593)
Browse files Browse the repository at this point in the history
* delete duplicate job msgs

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* remove lots more events!

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* remove lots more events!

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* remove lots more events!

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* fix test

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* fix python

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* fix swagger

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* merge master

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* fix swagger

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* fix c# client

* wip

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* wip

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* wip

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* wip

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

---------

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>
Co-authored-by: Chris Martin <chris@cmartinit.co.uk>
  • Loading branch information
d80tb7 and d80tb7 authored Jun 4, 2024
1 parent 1a4365b commit 1882df1
Show file tree
Hide file tree
Showing 23 changed files with 931 additions and 3,171 deletions.
6 changes: 0 additions & 6 deletions client/python/tests/unit/test_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ def __init__(self, name):
[
("submitted", EventType.submitted),
("queued", EventType.queued),
("duplicate_found", EventType.duplicate_found),
("leased", EventType.leased),
("lease_returned", EventType.lease_returned),
("lease_expired", EventType.lease_expired),
Expand All @@ -48,8 +47,6 @@ def __init__(self, name):
("utilisation", EventType.utilisation),
("ingress_info", EventType.ingress_info),
("reprioritizing", EventType.reprioritizing),
("updated", EventType.updated),
("failedCompressed", EventType.failedCompressed),
],
)
def test_event_class(name, event_type):
Expand All @@ -68,7 +65,6 @@ def test_event_class(name, event_type):
[
"submitted",
"queued",
"duplicate_found",
"leased",
"lease_returned",
"lease_expired",
Expand All @@ -84,8 +80,6 @@ def test_event_class(name, event_type):
"utilisation",
"ingress_info",
"reprioritizing",
"updated",
"failedCompressed",
],
)
def test_unmarshal_event_response(name):
Expand Down
26 changes: 0 additions & 26 deletions internal/armada/event/conversion/conversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ func FromEventSequence(es *armadaevents.EventSequence) ([]*api.EventMessage, err
convertedEvents, err = FromInternalReprioritiseJob(es.UserId, es.Queue, es.JobSetName, *event.Created, esEvent.ReprioritiseJob)
case *armadaevents.EventSequence_Event_ReprioritisedJob:
convertedEvents, err = FromInternalReprioritisedJob(es.UserId, es.Queue, es.JobSetName, *event.Created, esEvent.ReprioritisedJob)
case *armadaevents.EventSequence_Event_JobDuplicateDetected:
convertedEvents, err = FromInternalLogDuplicateDetected(es.Queue, es.JobSetName, *event.Created, esEvent.JobDuplicateDetected)
case *armadaevents.EventSequence_Event_JobRunLeased:
convertedEvents, err = FromInternalLogJobRunLeased(es.Queue, es.JobSetName, *event.Created, esEvent.JobRunLeased)
case *armadaevents.EventSequence_Event_JobRunErrors:
Expand Down Expand Up @@ -219,30 +217,6 @@ func FromInternalReprioritisedJob(userId string, queueName string, jobSetName st
}, nil
}

func FromInternalLogDuplicateDetected(queueName string, jobSetName string, time time.Time, e *armadaevents.JobDuplicateDetected) ([]*api.EventMessage, error) {
jobId, err := armadaevents.UlidStringFromProtoUuid(e.NewJobId)
if err != nil {
return nil, err
}
originalJobId, err := armadaevents.UlidStringFromProtoUuid(e.OldJobId)
if err != nil {
return nil, err
}
return []*api.EventMessage{
{
Events: &api.EventMessage_DuplicateFound{
DuplicateFound: &api.JobDuplicateFoundEvent{
JobId: jobId,
JobSetId: jobSetName,
Queue: queueName,
Created: time,
OriginalJobId: originalJobId,
},
},
},
}, nil
}

func FromInternalLogJobRunLeased(queueName string, jobSetName string, time time.Time, e *armadaevents.JobRunLeased) ([]*api.EventMessage, error) {
jobId, err := armadaevents.UlidStringFromProtoUuid(e.JobId)
if err != nil {
Expand Down
33 changes: 0 additions & 33 deletions internal/armada/event/conversion/conversions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,39 +239,6 @@ func TestConvertReprioritised(t *testing.T) {
assert.Equal(t, expected, apiEvents)
}

func TestDuplicateJob(t *testing.T) {
oldJobString := "02f3j0g1md4qx7z5qb148qnh4r"
oldJobProto, _ := armadaevents.ProtoUuidFromUlidString(oldJobString)

duplicate := &armadaevents.EventSequence_Event{
Created: &baseTime,
Event: &armadaevents.EventSequence_Event_JobDuplicateDetected{
JobDuplicateDetected: &armadaevents.JobDuplicateDetected{
NewJobId: jobIdProto,
OldJobId: oldJobProto,
},
},
}

expected := []*api.EventMessage{
{
Events: &api.EventMessage_DuplicateFound{
DuplicateFound: &api.JobDuplicateFoundEvent{
JobId: jobIdString,
JobSetId: jobSetName,
Queue: queue,
Created: baseTime,
OriginalJobId: oldJobString,
},
},
},
}

apiEvents, err := FromEventSequence(toEventSeq(duplicate))
assert.NoError(t, err)
assert.Equal(t, expected, apiEvents)
}

func TestConvertLeased(t *testing.T) {
leased := &armadaevents.EventSequence_Event{
Created: &baseTime,
Expand Down
2 changes: 0 additions & 2 deletions internal/armada/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ func (s *EventServer) Watch(req *api.WatchRequest, stream api.Event_WatchServer)
FromMessageId: req.FromId,
Queue: req.Queue,
ErrorIfMissing: true,
ForceLegacy: req.ForceLegacy,
ForceNew: req.ForceNew,
}
return s.GetJobSetEvents(request, stream)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/armada/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestEventServer_ForceNew(t *testing.T) {
})

require.NoError(t, err)
e := s.GetJobSetEvents(&api.JobSetRequest{Queue: q.Name, Id: jobSetId, Watch: false, ForceNew: true}, stream)
e := s.GetJobSetEvents(&api.JobSetRequest{Queue: q.Name, Id: jobSetId, Watch: false}, stream)
assert.NoError(t, e)
assert.Equal(t, 1, len(stream.sendMessages))
expected := &api.EventMessage_Pending{Pending: &api.JobPendingEvent{
Expand Down
4 changes: 1 addition & 3 deletions internal/jobservice/eventstojobs/event_job_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ func EventsToJobResponse(message api.EventMessage) *js.JobServiceResponse {
switch message.Events.(type) {
case *api.EventMessage_Submitted:
return &js.JobServiceResponse{State: js.JobServiceResponse_SUBMITTED}
case *api.EventMessage_DuplicateFound:
return &js.JobServiceResponse{State: js.JobServiceResponse_DUPLICATE_FOUND}
case *api.EventMessage_Running:
return &js.JobServiceResponse{State: js.JobServiceResponse_RUNNING}
case *api.EventMessage_Failed:
Expand All @@ -29,7 +27,7 @@ func EventsToJobResponse(message api.EventMessage) *js.JobServiceResponse {
// Check if api.EventMessage is terminal event
func IsEventTerminal(message api.EventMessage) bool {
switch message.Events.(type) {
case *api.EventMessage_DuplicateFound, *api.EventMessage_Cancelled, *api.EventMessage_Succeeded, *api.EventMessage_Failed:
case *api.EventMessage_Cancelled, *api.EventMessage_Succeeded, *api.EventMessage_Failed:
return true
default:
return false
Expand Down
20 changes: 2 additions & 18 deletions internal/jobservice/eventstojobs/event_job_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ func TestIsEventResponse(t *testing.T) {
eventMessage: api.EventMessage{Events: &api.EventMessage_Submitted{}},
jobResponse: &jobservice.JobServiceResponse{State: jobservice.JobServiceResponse_SUBMITTED},
},
{
eventMessage: api.EventMessage{Events: &api.EventMessage_DuplicateFound{}},
jobResponse: &jobservice.JobServiceResponse{State: jobservice.JobServiceResponse_DUPLICATE_FOUND},
},
{
eventMessage: api.EventMessage{Events: &api.EventMessage_Running{}},
jobResponse: &jobservice.JobServiceResponse{State: jobservice.JobServiceResponse_RUNNING},
Expand Down Expand Up @@ -61,10 +57,6 @@ func TestIsEventResponse(t *testing.T) {
eventMessage: api.EventMessage{Events: &api.EventMessage_IngressInfo{}},
jobResponse: nil,
},
{
eventMessage: api.EventMessage{Events: &api.EventMessage_Updated{}},
jobResponse: nil,
},
{
eventMessage: api.EventMessage{Events: &api.EventMessage_LeaseExpired{}},
jobResponse: nil,
Expand Down Expand Up @@ -99,7 +91,7 @@ func TestIsEventResponse(t *testing.T) {
},
}
length := len(eventMessages)
assert.Equal(t, length, 19)
assert.Equal(t, length, 17)
for i := range eventMessages {
jobResponse := EventsToJobResponse(eventMessages[i].eventMessage)
assert.Equal(t, jobResponse, eventMessages[i].jobResponse)
Expand All @@ -112,10 +104,6 @@ func TestIsTerminalEvent(t *testing.T) {
eventMessage: api.EventMessage{Events: &api.EventMessage_Submitted{}},
jobServiceEvent: false,
},
{
eventMessage: api.EventMessage{Events: &api.EventMessage_DuplicateFound{}},
jobServiceEvent: true,
},
{
eventMessage: api.EventMessage{Events: &api.EventMessage_Running{}},
jobServiceEvent: false,
Expand Down Expand Up @@ -148,10 +136,6 @@ func TestIsTerminalEvent(t *testing.T) {
eventMessage: api.EventMessage{Events: &api.EventMessage_IngressInfo{}},
jobServiceEvent: false,
},
{
eventMessage: api.EventMessage{Events: &api.EventMessage_Updated{}},
jobServiceEvent: false,
},
{
eventMessage: api.EventMessage{Events: &api.EventMessage_LeaseExpired{}},
jobServiceEvent: false,
Expand Down Expand Up @@ -186,7 +170,7 @@ func TestIsTerminalEvent(t *testing.T) {
},
}
length := len(eventMessages)
assert.Equal(t, length, 19)
assert.Equal(t, length, 17)
for i := range eventMessages {
jobResponse := IsEventTerminal(eventMessages[i].eventMessage)
assert.Equal(t, jobResponse, eventMessages[i].jobServiceEvent)
Expand Down
17 changes: 0 additions & 17 deletions internal/lookoutingesterv2/instructions/instructions.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ func (c *InstructionConverter) convertSequence(
err = c.handleJobRunSucceeded(ts, event.GetJobRunSucceeded(), update)
case *armadaevents.EventSequence_Event_JobRunErrors:
err = c.handleJobRunErrors(ts, event.GetJobRunErrors(), update)
case *armadaevents.EventSequence_Event_JobDuplicateDetected:
err = c.handleJobDuplicateDetected(ts, event.GetJobDuplicateDetected(), update)
case *armadaevents.EventSequence_Event_JobRunPreempted:
err = c.handleJobRunPreempted(ts, event.GetJobRunPreempted(), update)
case *armadaevents.EventSequence_Event_JobRequeued:
Expand Down Expand Up @@ -235,21 +233,6 @@ func (c *InstructionConverter) handleReprioritiseJob(ts time.Time, event *armada
return nil
}

func (c *InstructionConverter) handleJobDuplicateDetected(ts time.Time, event *armadaevents.JobDuplicateDetected, update *model.InstructionSet) error {
jobId, err := armadaevents.UlidStringFromProtoUuid(event.GetNewJobId())
if err != nil {
c.metrics.RecordPulsarMessageError(metrics.PulsarMessageErrorProcessing)
return err
}

jobUpdate := model.UpdateJobInstruction{
JobId: jobId,
Duplicate: pointer.Bool(true),
}
update.JobsToUpdate = append(update.JobsToUpdate, &jobUpdate)
return nil
}

func (c *InstructionConverter) handleCancelledJob(ts time.Time, event *armadaevents.CancelledJob, update *model.InstructionSet) error {
jobId, err := armadaevents.UlidStringFromProtoUuid(event.GetJobId())
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion internal/scheduleringester/instructions.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ func (c *InstructionConverter) dbOperationsFromEventSequence(es *armadaevents.Ev
case *armadaevents.EventSequence_Event_JobValidated:
operationsFromEvent, err = c.handleJobValidated(event.GetJobValidated())
case *armadaevents.EventSequence_Event_ReprioritisedJob,
*armadaevents.EventSequence_Event_JobDuplicateDetected,
*armadaevents.EventSequence_Event_ResourceUtilisation,
*armadaevents.EventSequence_Event_StandaloneIngressInfo:
// These events can all be safely ignored
Expand Down
2 changes: 0 additions & 2 deletions internal/testsuite/eventwatcher/eventwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,6 @@ func isTerminalEvent(msg *api.EventMessage) bool {
return true
case *api.EventMessage_Cancelled:
return true
case *api.EventMessage_DuplicateFound:
return true
}
return false
}
Expand Down
70 changes: 0 additions & 70 deletions pkg/api/api.swagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,15 +662,9 @@ func SwaggerJsonTemplate() string {
" \"cancelling\": {\n" +
" \"$ref\": \"#/definitions/apiJobCancellingEvent\"\n" +
" },\n" +
" \"duplicateFound\": {\n" +
" \"$ref\": \"#/definitions/apiJobDuplicateFoundEvent\"\n" +
" },\n" +
" \"failed\": {\n" +
" \"$ref\": \"#/definitions/apiJobFailedEvent\"\n" +
" },\n" +
" \"failedCompressed\": {\n" +
" \"$ref\": \"#/definitions/apiJobFailedEventCompressed\"\n" +
" },\n" +
" \"ingressInfo\": {\n" +
" \"$ref\": \"#/definitions/apiJobIngressInfoEvent\"\n" +
" },\n" +
Expand Down Expand Up @@ -716,9 +710,6 @@ func SwaggerJsonTemplate() string {
" \"unableToSchedule\": {\n" +
" \"$ref\": \"#/definitions/apiJobUnableToScheduleEvent\"\n" +
" },\n" +
" \"updated\": {\n" +
" \"$ref\": \"#/definitions/apiJobUpdatedEvent\"\n" +
" },\n" +
" \"utilisation\": {\n" +
" \"$ref\": \"#/definitions/apiJobUtilisationEvent\"\n" +
" }\n" +
Expand Down Expand Up @@ -1028,27 +1019,6 @@ func SwaggerJsonTemplate() string {
" }\n" +
" }\n" +
" },\n" +
" \"apiJobDuplicateFoundEvent\": {\n" +
" \"type\": \"object\",\n" +
" \"properties\": {\n" +
" \"created\": {\n" +
" \"type\": \"string\",\n" +
" \"format\": \"date-time\"\n" +
" },\n" +
" \"jobId\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"jobSetId\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"originalJobId\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"queue\": {\n" +
" \"type\": \"string\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"apiJobFailedEvent\": {\n" +
" \"type\": \"object\",\n" +
" \"properties\": {\n" +
Expand Down Expand Up @@ -1105,16 +1075,6 @@ func SwaggerJsonTemplate() string {
" }\n" +
" }\n" +
" },\n" +
" \"apiJobFailedEventCompressed\": {\n" +
" \"type\": \"object\",\n" +
" \"title\": \"Only used internally by Armada\",\n" +
" \"properties\": {\n" +
" \"event\": {\n" +
" \"type\": \"string\",\n" +
" \"format\": \"byte\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"apiJobIngressInfoEvent\": {\n" +
" \"type\": \"object\",\n" +
" \"properties\": {\n" +
Expand Down Expand Up @@ -1584,12 +1544,6 @@ func SwaggerJsonTemplate() string {
" \"errorIfMissing\": {\n" +
" \"type\": \"boolean\"\n" +
" },\n" +
" \"forceLegacy\": {\n" +
" \"type\": \"boolean\"\n" +
" },\n" +
" \"forceNew\": {\n" +
" \"type\": \"boolean\"\n" +
" },\n" +
" \"fromMessageId\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
Expand Down Expand Up @@ -1882,30 +1836,6 @@ func SwaggerJsonTemplate() string {
" }\n" +
" }\n" +
" },\n" +
" \"apiJobUpdatedEvent\": {\n" +
" \"type\": \"object\",\n" +
" \"properties\": {\n" +
" \"created\": {\n" +
" \"type\": \"string\",\n" +
" \"format\": \"date-time\"\n" +
" },\n" +
" \"job\": {\n" +
" \"$ref\": \"#/definitions/apiJob\"\n" +
" },\n" +
" \"jobId\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"jobSetId\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"queue\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"requestor\": {\n" +
" \"type\": \"string\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"apiJobUtilisationEvent\": {\n" +
" \"type\": \"object\",\n" +
" \"properties\": {\n" +
Expand Down
Loading

0 comments on commit 1882df1

Please sign in to comment.