From 4ae657a210a7aa2123955310229152a131059f5c Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Thu, 21 Sep 2023 21:27:14 -0700 Subject: [PATCH 1/2] FlyteAdmin will always add `base_exec_id` unless it is already added Reasons: 1. Make it possible to retrieve all executions launched by the same base execution id (even recursively) 2. users could group executions using their own base exec id 3. flytectl get executions or remote list executions can use this label as a filter to retrieve high level progress of all subworkflows Signed-off-by: Ketan Umare --- pkg/manager/impl/execution_manager.go | 21 +++++++++++++++++++++ pkg/manager/impl/execution_manager_test.go | 10 ++++++---- pkg/manager/impl/shared/constants.go | 7 +++++-- 3 files changed, 32 insertions(+), 6 deletions(-) diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index bd57d4fb1..eef2c7a47 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -553,6 +553,10 @@ func (m *ExecutionManager) launchSingleTaskExecution( if err != nil { return nil, nil, err } + labels, err = m.addBaseExecutionLabel(ctx, workflowExecutionID.Name, labels) + if err != nil { + return nil, nil, err + } var annotations map[string]string if executionConfig.Annotations != nil { @@ -810,6 +814,10 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( if err != nil { return nil, nil, err } + labels, err = m.addBaseExecutionLabel(ctx, workflowExecutionID.Name, labels) + if err != nil { + return nil, nil, err + } annotations, err := resolveStringMap(executionConfig.GetAnnotations(), launchPlan.Spec.Annotations, "annotations", m.config.RegistrationValidationConfiguration().GetMaxAnnotationEntries()) if err != nil { return nil, nil, err @@ -1687,6 +1695,19 @@ func (m *ExecutionManager) addProjectLabels(ctx context.Context, projectName str return initialLabels, nil } +// Adds base execution label to execution labels. Base execution label is ignored if a corresponding label is set on the execution already. +// An execution label will exist if Flytepropeller launches a child workflow execution, as it will copy the parent execution's labels. +// This label can later be used to retrieve all executions that were launched from a given execution, no matter how deep in the recursion tree. +func (m *ExecutionManager) addBaseExecutionLabel(_ context.Context, execID string, initialLabels map[string]string) (map[string]string, error) { + if initialLabels == nil { + initialLabels = make(map[string]string) + } + if _, ok := initialLabels[shared.BaseExecutionIDLabelKey]; !ok { + initialLabels[shared.BaseExecutionIDLabelKey] = execID + } + return initialLabels, nil +} + func addStateFilter(filters []common.InlineFilter) ([]common.InlineFilter, error) { var stateFilterExists bool for _, inlineFilter := range filters { diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index 38bac0df1..1a9d5b18e 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -886,8 +886,9 @@ func TestCreateExecutionDynamicLabelsAndAnnotations(t *testing.T) { mockExecutor := workflowengineMocks.WorkflowExecutor{} mockExecutor.OnExecuteMatch(mock.Anything, mock.MatchedBy(func(executionData workflowengineInterfaces.ExecutionData) bool { assert.EqualValues(t, map[string]string{ - "dynamiclabel1": "dynamic1", - "dynamiclabel2": "dynamic2", + "dynamiclabel1": "dynamic1", + "dynamiclabel2": "dynamic2", + shared.BaseExecutionIDLabelKey: "name", }, executionData.ExecutionParameters.Labels) assert.EqualValues(t, map[string]string{ "dynamicannotation3": "dynamic3", @@ -3834,8 +3835,9 @@ func TestCreateExecution_LegacyClient(t *testing.T) { mockExecutor := workflowengineMocks.WorkflowExecutor{} mockExecutor.OnExecuteMatch(mock.Anything, mock.MatchedBy(func(execData workflowengineInterfaces.ExecutionData) bool { assert.EqualValues(t, map[string]string{ - "label1": "1", - "label2": "2", + "label1": "1", + "label2": "2", + shared.BaseExecutionIDLabelKey: "name", }, execData.ExecutionParameters.Labels) assert.EqualValues(t, map[string]string{ "annotation3": "3", diff --git a/pkg/manager/impl/shared/constants.go b/pkg/manager/impl/shared/constants.go index 469bd3e3b..e9bc1b79e 100644 --- a/pkg/manager/impl/shared/constants.go +++ b/pkg/manager/impl/shared/constants.go @@ -33,6 +33,9 @@ const ( Attributes = "attributes" MatchingAttributes = "matching_attributes" // Parent of a node execution in the node executions table - ParentID = "parent_id" - WorkflowClosure = "workflow_closure" + ParentID = "parent_id" + WorkflowClosure = "workflow_closure" + BaseExecutionIDLabelKey = "base_exec_id" + // BaseExecutionIDLabelKey is the label key for the base execution ID and is globally known. The UI, CLI and potentially + // other components use this label key to identify the base execution ID, so DO NOT CHANGE THIS VALUE. ) From 44d1ba64a15aa9a6dc1c1fd667e48531168c9887 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Thu, 21 Sep 2023 21:41:58 -0700 Subject: [PATCH 2/2] lint fixed Signed-off-by: Ketan Umare --- pkg/manager/impl/execution_manager.go | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index eef2c7a47..af8ff8d29 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -553,10 +553,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( if err != nil { return nil, nil, err } - labels, err = m.addBaseExecutionLabel(ctx, workflowExecutionID.Name, labels) - if err != nil { - return nil, nil, err - } + labels = m.addBaseExecutionLabel(ctx, workflowExecutionID.Name, labels) var annotations map[string]string if executionConfig.Annotations != nil { @@ -814,10 +811,8 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( if err != nil { return nil, nil, err } - labels, err = m.addBaseExecutionLabel(ctx, workflowExecutionID.Name, labels) - if err != nil { - return nil, nil, err - } + labels = m.addBaseExecutionLabel(ctx, workflowExecutionID.Name, labels) + annotations, err := resolveStringMap(executionConfig.GetAnnotations(), launchPlan.Spec.Annotations, "annotations", m.config.RegistrationValidationConfiguration().GetMaxAnnotationEntries()) if err != nil { return nil, nil, err @@ -1698,14 +1693,14 @@ func (m *ExecutionManager) addProjectLabels(ctx context.Context, projectName str // Adds base execution label to execution labels. Base execution label is ignored if a corresponding label is set on the execution already. // An execution label will exist if Flytepropeller launches a child workflow execution, as it will copy the parent execution's labels. // This label can later be used to retrieve all executions that were launched from a given execution, no matter how deep in the recursion tree. -func (m *ExecutionManager) addBaseExecutionLabel(_ context.Context, execID string, initialLabels map[string]string) (map[string]string, error) { +func (m *ExecutionManager) addBaseExecutionLabel(_ context.Context, execID string, initialLabels map[string]string) map[string]string { if initialLabels == nil { initialLabels = make(map[string]string) } if _, ok := initialLabels[shared.BaseExecutionIDLabelKey]; !ok { initialLabels[shared.BaseExecutionIDLabelKey] = execID } - return initialLabels, nil + return initialLabels } func addStateFilter(filters []common.InlineFilter) ([]common.InlineFilter, error) {