Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Handle batched TaskExecutionEvent reasons #615

Merged
merged 5 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ require (

replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a

// TODO: update version references once dependent PR is merged
replace github.com/flyteorg/flyteidl => github.com/andrewwdye/flyteidl v0.0.0-20230921054948-85b9c706f792

// Retracted versions
// This was published in error when attempting to create 1.5.1 Flyte release.
retract v1.1.94
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/andrewwdye/flyteidl v0.0.0-20230921054948-85b9c706f792 h1:mkSxCviDJa5cg8obcdJJ0RhB5TE7v/y2pBhinafwvQc=
github.com/andrewwdye/flyteidl v0.0.0-20230921054948-85b9c706f792/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210826220005-b48c857c3a0e/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY=
Expand Down Expand Up @@ -293,8 +295,6 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.5.14 h1:+3ewipoOp82fPyIVgvvrMq1lorl5Kz3Lh6sh/a9+loI=
github.com/flyteorg/flyteidl v1.5.14/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteplugins v1.0.67 h1:d2FXpwxQwX/k4YdmhuusykOemHb/cUTPEob4WBmdpjE=
github.com/flyteorg/flyteplugins v1.0.67/go.mod h1:HHt4nKDKVwrZPKDsj99dNtDSIJL378xNotYMA3a/TFA=
github.com/flyteorg/flytepropeller v1.1.98 h1:Zk2ENYB9VZRT5tFUIFjm+aCkr0TU2EuyJ5gh52fpLoA=
Expand Down
9 changes: 5 additions & 4 deletions pkg/manager/impl/task_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ import (
"fmt"
"strconv"

"github.com/golang/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/codes"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"
"github.com/flyteorg/flytestdlib/storage"
"github.com/golang/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/codes"

cloudeventInterfaces "github.com/flyteorg/flyteadmin/pkg/async/cloudevent/interfaces"
notificationInterfaces "github.com/flyteorg/flyteadmin/pkg/async/notifications/interfaces"
Expand Down Expand Up @@ -189,7 +190,7 @@ func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, req
return nil, err
}

if request.Event.Phase == core.TaskExecution_RUNNING && request.Event.PhaseVersion == 0 {
if request.Event.Phase == core.TaskExecution_RUNNING && request.Event.PhaseVersion == 0 { // TODO: need to be careful about missing inc/decs
m.metrics.ActiveTaskExecutions.Inc()
} else if common.IsTaskExecutionTerminal(request.Event.Phase) && request.Event.PhaseVersion == 0 {
m.metrics.ActiveTaskExecutions.Dec()
Expand Down
26 changes: 22 additions & 4 deletions pkg/repositories/transformers/task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,19 +153,27 @@ func CreateTaskExecutionModel(ctx context.Context, input CreateTaskExecutionMode
CreatedAt: input.Request.Event.OccurredAt,
Logs: input.Request.Event.Logs,
CustomInfo: input.Request.Event.CustomInfo,
Reason: input.Request.Event.Reason,
TaskType: input.Request.Event.TaskType,
Metadata: metadata,
EventVersion: input.Request.Event.EventVersion,
}

if len(input.Request.Event.Reason) > 0 {
if len(input.Request.Event.Reasons) > 0 {
for _, reason := range input.Request.Event.Reasons {
closure.Reasons = append(closure.Reasons, &admin.Reason{
OccurredAt: reason.OccurredAt,
Message: reason.Reason,
})
}
closure.Reason = input.Request.Event.Reasons[len(input.Request.Event.Reasons)-1].Reason
} else if len(input.Request.Event.Reason) > 0 {
closure.Reasons = []*admin.Reason{
&admin.Reason{
{
OccurredAt: input.Request.Event.OccurredAt,
Message: input.Request.Event.Reason,
},
}
closure.Reason = input.Request.Event.Reason
}

eventPhase := input.Request.Event.Phase
Expand Down Expand Up @@ -386,7 +394,17 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE
}
taskExecutionClosure.UpdatedAt = reportedAt
taskExecutionClosure.Logs = mergeLogs(taskExecutionClosure.Logs, request.Event.Logs)
if len(request.Event.Reason) > 0 {
if len(request.Event.Reasons) > 0 {
for _, reason := range request.Event.Reasons {
taskExecutionClosure.Reasons = append(
taskExecutionClosure.Reasons,
&admin.Reason{
OccurredAt: reason.OccurredAt,
Message: reason.Reason,
})
}
taskExecutionClosure.Reason = request.Event.Reasons[len(request.Event.Reasons)-1].Reason
} else if len(request.Event.Reason) > 0 {
if taskExecutionClosure.Reason != request.Event.Reason {
// by tracking a time-series of reasons we increase the size of the TaskExecutionClosure in scenarios where
// a task reports a large number of unique reasons. if this size increase becomes problematic we this logic
Expand Down
Loading
Loading