diff --git a/flyteadmin/go.mod b/flyteadmin/go.mod index 188303ee6..b30095519 100644 --- a/flyteadmin/go.mod +++ b/flyteadmin/go.mod @@ -13,7 +13,7 @@ require ( github.com/cloudevents/sdk-go/v2 v2.8.0 github.com/coreos/go-oidc v2.2.1+incompatible github.com/evanphx/json-patch v4.12.0+incompatible - github.com/flyteorg/flyteidl v1.5.11 + github.com/flyteorg/flyteidl v1.5.14 github.com/flyteorg/flyteplugins v1.0.67 github.com/flyteorg/flytepropeller v1.1.98 github.com/flyteorg/flytestdlib v1.0.22 diff --git a/flyteadmin/go.sum b/flyteadmin/go.sum index ac4ee4c3d..ed4b6ac85 100644 --- a/flyteadmin/go.sum +++ b/flyteadmin/go.sum @@ -293,8 +293,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.5.11 h1:Xcb17YqNstl+dHQsK+o0Ac+1l1U154wXivg28O3C5l0= -github.com/flyteorg/flyteidl v1.5.11/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= +github.com/flyteorg/flyteidl v1.5.14 h1:+3ewipoOp82fPyIVgvvrMq1lorl5Kz3Lh6sh/a9+loI= +github.com/flyteorg/flyteidl v1.5.14/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= github.com/flyteorg/flyteplugins v1.0.67 h1:d2FXpwxQwX/k4YdmhuusykOemHb/cUTPEob4WBmdpjE= github.com/flyteorg/flyteplugins v1.0.67/go.mod h1:HHt4nKDKVwrZPKDsj99dNtDSIJL378xNotYMA3a/TFA= github.com/flyteorg/flytepropeller v1.1.98 h1:Zk2ENYB9VZRT5tFUIFjm+aCkr0TU2EuyJ5gh52fpLoA= diff --git a/flyteadmin/pkg/common/entity.go b/flyteadmin/pkg/common/entity.go index 65d3161cf..cbb96e53d 100644 --- a/flyteadmin/pkg/common/entity.go +++ b/flyteadmin/pkg/common/entity.go @@ -18,6 +18,8 @@ const ( NamedEntityMetadata = "nem" Project = "p" Signal = "s" + AdminTag = "at" + ExecutionAdminTag = "eat" ) // ResourceTypeToEntity maps a resource type to an entity suitable for use with Database filters diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index 8a6b7287b..7ad4fbbd5 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -913,6 +913,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( workflowExecutionID, err) return nil, nil, err } + return ctx, executionModel, nil } @@ -1478,6 +1479,7 @@ func (m *ExecutionManager) ListExecutions( execution.Spec.Inputs = nil execution.Closure.ComputedInputs = nil } + // END TO BE DELETED var token string if len(executionList) == int(request.Limit) { diff --git a/flyteadmin/pkg/manager/impl/testutils/mock_requests.go b/flyteadmin/pkg/manager/impl/testutils/mock_requests.go index 8b8473376..fda9482e1 100644 --- a/flyteadmin/pkg/manager/impl/testutils/mock_requests.go +++ b/flyteadmin/pkg/manager/impl/testutils/mock_requests.go @@ -222,6 +222,7 @@ func GetExecutionRequest() admin.ExecutionCreateRequest { }, RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: "default_raw_output"}, Envs: &admin.Envs{}, + Tags: []string{"tag1", "tag2"}, }, Inputs: &core.LiteralMap{ Literals: map[string]*core.Literal{ diff --git a/flyteadmin/pkg/manager/impl/util/filters.go b/flyteadmin/pkg/manager/impl/util/filters.go index e52bfb8b1..70a65db70 100644 --- a/flyteadmin/pkg/manager/impl/util/filters.go +++ b/flyteadmin/pkg/manager/impl/util/filters.go @@ -62,6 +62,8 @@ var filterFieldEntityPrefix = map[string]common.Entity{ "named_entity_metadata": common.NamedEntityMetadata, "project": common.Project, "signal": common.Signal, + "admin_tag": common.AdminTag, + "execution_admin_tag": common.ExecutionAdminTag, } func parseField(field string, primaryEntity common.Entity) (common.Entity, string) { diff --git a/flyteadmin/pkg/repositories/config/migrations.go b/flyteadmin/pkg/repositories/config/migrations.go index 568f32c26..dab390a04 100644 --- a/flyteadmin/pkg/repositories/config/migrations.go +++ b/flyteadmin/pkg/repositories/config/migrations.go @@ -1097,6 +1097,89 @@ var NoopMigrations = []*gormigrate.Migration{ return nil }, }, + + { + ID: "2023-08-04-admin-tags", + Migrate: func(tx *gorm.DB) error { + type AdminTag struct { + gorm.Model + Name string `gorm:"index:,unique;size:255"` + } + + return tx.AutoMigrate(&AdminTag{}) + }, + Rollback: func(tx *gorm.DB) error { + return nil + }, + }, + + { + ID: "2023-08-04-execution-admin-tags", // A join table used to associate executions with tags + Migrate: func(tx *gorm.DB) error { + type AdminTag struct { + gorm.Model + Name string `gorm:"index:,unique;size:255"` + } + + type ExecutionKey struct { + Project string `gorm:"primary_key;column:execution_project" valid:"length(0|255)"` + Domain string `gorm:"primary_key;column:execution_domain" valid:"length(0|255)"` + Name string `gorm:"primary_key;column:execution_name" valid:"length(0|255)"` + } + + type Execution struct { + ID uint `gorm:"index;autoIncrement;not null"` + CreatedAt time.Time + UpdatedAt time.Time + DeletedAt *time.Time `gorm:"index"` + ExecutionKey + LaunchPlanID uint `gorm:"index"` + WorkflowID uint `gorm:"index"` + TaskID uint `gorm:"index"` + Phase string `valid:"length(0|255)"` + Closure []byte + Spec []byte `gorm:"not null"` + StartedAt *time.Time + // Corresponds to the CreatedAt field in the Execution closure. + // Prefixed with Execution to avoid clashes with gorm.Model CreatedAt + ExecutionCreatedAt *time.Time `gorm:"index:idx_executions_created_at"` + // Corresponds to the UpdatedAt field in the Execution closure + // Prefixed with Execution to avoid clashes with gorm.Model UpdatedAt + ExecutionUpdatedAt *time.Time + Duration time.Duration + // In the case of an aborted execution this string may be non-empty. + // It should be ignored for any other value of phase other than aborted. + AbortCause string `valid:"length(0|255)"` + // Corresponds to the execution mode used to trigger this execution + Mode int32 + // The "parent" execution (if there is one) that is related to this execution. + SourceExecutionID uint + // The parent node execution if this was launched by a node + ParentNodeExecutionID uint + // Cluster where execution was triggered + Cluster string `valid:"length(0|255)"` + // Offloaded location of inputs LiteralMap. These are the inputs evaluated and contain applied defaults. + InputsURI storage.DataReference + // User specified inputs. This map might be incomplete and not include defaults applied + UserInputsURI storage.DataReference + // Execution Error Kind. nullable + ErrorKind *string `gorm:"index"` + // Execution Error Code nullable + ErrorCode *string `valid:"length(0|255)"` + // The user responsible for launching this execution. + // This is also stored in the spec but promoted as a column for filtering. + User string `gorm:"index" valid:"length(0|255)"` + // GORM doesn't save the zero value for ints, so we use a pointer for the State field + State *int32 `gorm:"index;default:0"` + // The resource type of the entity used to launch the execution, one of 'launch_plan' or 'task' + LaunchEntity string + // Tags associated with the execution + Tags []AdminTag `gorm:"many2many:execution_admin_tags;"` + } + + return tx.AutoMigrate(&Execution{}) + }, + }, } var Migrations = append(LegacyMigrations, NoopMigrations...) diff --git a/flyteadmin/pkg/repositories/gormimpl/common.go b/flyteadmin/pkg/repositories/gormimpl/common.go index 69dada2f0..c022bd973 100644 --- a/flyteadmin/pkg/repositories/gormimpl/common.go +++ b/flyteadmin/pkg/repositories/gormimpl/common.go @@ -28,6 +28,8 @@ const taskExecutionTableName = "task_executions" const taskTableName = "tasks" const workflowTableName = "workflows" const descriptionEntityTableName = "description_entities" +const AdminTagsTableName = "admin_tags" +const executionAdminTagsTableName = "execution_admin_tags" const limit = "limit" const filters = "filters" @@ -45,6 +47,8 @@ var entityToTableName = map[common.Entity]string{ common.NamedEntity: "entities", common.NamedEntityMetadata: "named_entity_metadata", common.Signal: "signals", + common.AdminTag: "admin_tags", + common.ExecutionAdminTag: "execution_admin_tags", } var innerJoinExecToNodeExec = fmt.Sprintf( diff --git a/flyteadmin/pkg/repositories/gormimpl/execution_repo.go b/flyteadmin/pkg/repositories/gormimpl/execution_repo.go index e300dcd33..b128a2805 100644 --- a/flyteadmin/pkg/repositories/gormimpl/execution_repo.go +++ b/flyteadmin/pkg/repositories/gormimpl/execution_repo.go @@ -32,7 +32,7 @@ func (r *ExecutionRepo) Create(ctx context.Context, input models.Execution) erro return nil } -func (r *ExecutionRepo) Get(ctx context.Context, input interfaces.Identifier) (models.Execution, error) { +func (r *ExecutionRepo) Get(_ context.Context, input interfaces.Identifier) (models.Execution, error) { var execution models.Execution timer := r.metrics.GetDuration.Start() tx := r.db.Where(&models.Execution{ @@ -66,7 +66,7 @@ func (r *ExecutionRepo) Update(ctx context.Context, execution models.Execution) return nil } -func (r *ExecutionRepo) List(ctx context.Context, input interfaces.ListResourceInput) ( +func (r *ExecutionRepo) List(_ context.Context, input interfaces.ListResourceInput) ( interfaces.ExecutionCollectionOutput, error) { var err error // First validate input. @@ -89,6 +89,13 @@ func (r *ExecutionRepo) List(ctx context.Context, input interfaces.ListResourceI taskTableName, executionTableName, taskTableName)) } + if ok := input.JoinTableEntities[common.AdminTag]; ok { + tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.execution_name = %s.execution_name", + executionAdminTagsTableName, executionTableName, executionAdminTagsTableName)) + tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.id = %s.admin_tag_id", + AdminTagsTableName, AdminTagsTableName, executionAdminTagsTableName)) + } + // Apply filters tx, err = applyScopedFilters(tx, input.InlineFilters, input.MapFilters) if err != nil { diff --git a/flyteadmin/pkg/repositories/gormimpl/execution_repo_test.go b/flyteadmin/pkg/repositories/gormimpl/execution_repo_test.go index daf73f6e5..17cb85777 100644 --- a/flyteadmin/pkg/repositories/gormimpl/execution_repo_test.go +++ b/flyteadmin/pkg/repositories/gormimpl/execution_repo_test.go @@ -270,6 +270,36 @@ func TestListExecutions_Order(t *testing.T) { assert.True(t, mockQuery.Triggered) } +func TestListExecutions_WithTags(t *testing.T) { + executionRepo := NewExecutionRepo(GetDbForTest(t), errors.NewTestErrorTransformer(), mockScope.NewTestScope()) + + executions := make([]map[string]interface{}, 0) + GlobalMock := mocket.Catcher.Reset() + // Only match on queries that include ordering by name + mockQuery := GlobalMock.NewMock().WithQuery(`name asc`) + mockQuery.WithReply(executions) + + sortParameter, _ := common.NewSortParameter(admin.Sort{ + Direction: admin.Sort_ASCENDING, + Key: "name", + }) + vals := []string{"tag1", "tag2"} + tagFilter, err := common.NewRepeatedValueFilter(common.ExecutionAdminTag, common.ValueIn, "admin_tag_name", vals) + assert.NoError(t, err) + _, err = executionRepo.List(context.Background(), interfaces.ListResourceInput{ + SortParameter: sortParameter, + InlineFilters: []common.InlineFilter{ + getEqualityFilter(common.Task, "project", project), + getEqualityFilter(common.Task, "domain", domain), + getEqualityFilter(common.Task, "name", name), + tagFilter, + }, + Limit: 20, + }) + assert.NoError(t, err) + assert.True(t, mockQuery.Triggered) +} + func TestListExecutions_MissingParameters(t *testing.T) { executionRepo := NewExecutionRepo(GetDbForTest(t), errors.NewTestErrorTransformer(), mockScope.NewTestScope()) _, err := executionRepo.List(context.Background(), interfaces.ListResourceInput{ @@ -306,15 +336,17 @@ func TestListExecutionsForWorkflow(t *testing.T) { StartedAt: &executionStartedAt, Duration: time.Hour, LaunchEntity: "launch_plan", + Tags: []models.AdminTag{{Name: "tag1"}, {Name: "tag2"}}, }) executions = append(executions, execution) GlobalMock := mocket.Catcher.Reset() GlobalMock.Logging = true - // Only match on queries that append expected filters - GlobalMock.NewMock().WithQuery(`SELECT "executions"."id","executions"."created_at","executions"."updated_at","executions"."deleted_at","executions"."execution_project","executions"."execution_domain","executions"."execution_name","executions"."launch_plan_id","executions"."workflow_id","executions"."task_id","executions"."phase","executions"."closure","executions"."spec","executions"."started_at","executions"."execution_created_at","executions"."execution_updated_at","executions"."duration","executions"."abort_cause","executions"."mode","executions"."source_execution_id","executions"."parent_node_execution_id","executions"."cluster","executions"."inputs_uri","executions"."user_inputs_uri","executions"."error_kind","executions"."error_code","executions"."user","executions"."state","executions"."launch_entity" FROM "executions" INNER JOIN workflows ON executions.workflow_id = workflows.id INNER JOIN tasks ON executions.task_id = tasks.id WHERE executions.execution_project = $1 AND executions.execution_domain = $2 AND executions.execution_name = $3 AND workflows.name = $4 AND tasks.name = $5 LIMIT 20`).WithReply(executions) - + GlobalMock.NewMock().WithQuery(`SELECT "executions"."id","executions"."created_at","executions"."updated_at","executions"."deleted_at","executions"."execution_project","executions"."execution_domain","executions"."execution_name","executions"."launch_plan_id","executions"."workflow_id","executions"."task_id","executions"."phase","executions"."closure","executions"."spec","executions"."started_at","executions"."execution_created_at","executions"."execution_updated_at","executions"."duration","executions"."abort_cause","executions"."mode","executions"."source_execution_id","executions"."parent_node_execution_id","executions"."cluster","executions"."inputs_uri","executions"."user_inputs_uri","executions"."error_kind","executions"."error_code","executions"."user","executions"."state","executions"."launch_entity" FROM "executions" INNER JOIN workflows ON executions.workflow_id = workflows.id INNER JOIN tasks ON executions.task_id = tasks.id WHERE executions.execution_project = $1 AND executions.execution_domain = $2 AND executions.execution_name = $3 AND workflows.name = $4 AND tasks.name = $5 AND execution_admin_tags.execution_tag_name in ($6,$7) LIMIT 20`).WithReply(executions) + vals := []string{"tag1", "tag2"} + tagFilter, err := common.NewRepeatedValueFilter(common.ExecutionAdminTag, common.ValueIn, "execution_tag_name", vals) + assert.NoError(t, err) collection, err := executionRepo.List(context.Background(), interfaces.ListResourceInput{ InlineFilters: []common.InlineFilter{ getEqualityFilter(common.Execution, "project", project), @@ -322,6 +354,7 @@ func TestListExecutionsForWorkflow(t *testing.T) { getEqualityFilter(common.Execution, "name", "1"), getEqualityFilter(common.Workflow, "name", "workflow_name"), getEqualityFilter(common.Task, "name", "task_name"), + tagFilter, }, Limit: 20, JoinTableEntities: map[common.Entity]bool{ @@ -329,6 +362,7 @@ func TestListExecutionsForWorkflow(t *testing.T) { common.Task: true, }, }) + assert.NoError(t, err) assert.NotEmpty(t, collection) assert.NotEmpty(t, collection.Executions) diff --git a/flyteadmin/pkg/repositories/models/execution.go b/flyteadmin/pkg/repositories/models/execution.go index 1c5e1300d..931a3b720 100644 --- a/flyteadmin/pkg/repositories/models/execution.go +++ b/flyteadmin/pkg/repositories/models/execution.go @@ -3,6 +3,10 @@ package models import ( "time" + "gorm.io/gorm/clause" + + "gorm.io/gorm" + "github.com/flyteorg/flytestdlib/storage" ) @@ -60,4 +64,19 @@ type Execution struct { State *int32 `gorm:"index;default:0"` // The resource type of the entity used to launch the execution, one of 'launch_plan' or 'task' LaunchEntity string + // Tags associated with the execution + Tags []AdminTag `gorm:"many2many:execution_admin_tags;"` +} + +type AdminTag struct { + gorm.Model + Name string `gorm:"index:,unique;size:255"` +} + +func (b *AdminTag) BeforeCreate(tx *gorm.DB) (err error) { + tx.Statement.AddClause(clause.OnConflict{ + Columns: []clause.Column{{Name: "name"}}, // key column + DoUpdates: clause.AssignmentColumns([]string{"name"}), // column needed to be updated + }) + return nil } diff --git a/flyteadmin/pkg/repositories/transformers/execution.go b/flyteadmin/pkg/repositories/transformers/execution.go index de0d986af..abd77413e 100644 --- a/flyteadmin/pkg/repositories/transformers/execution.go +++ b/flyteadmin/pkg/repositories/transformers/execution.go @@ -99,6 +99,11 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e } activeExecution := int32(admin.ExecutionState_EXECUTION_ACTIVE) + tags := make([]models.AdminTag, len(input.RequestSpec.Tags)) + for i, tag := range input.RequestSpec.Tags { + tags[i] = models.AdminTag{Name: tag} + } + executionModel := &models.Execution{ ExecutionKey: models.ExecutionKey{ Project: input.WorkflowExecutionID.Project, @@ -119,6 +124,7 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e User: requestSpec.Metadata.Principal, State: &activeExecution, LaunchEntity: strings.ToLower(input.LaunchEntity.String()), + Tags: tags, } // A reference launch entity can be one of either or a task OR launch plan. Traditionally, workflows are executed // with a reference launch plan which is why this behavior is the default below. diff --git a/flyteadmin/tests/bootstrap.go b/flyteadmin/tests/bootstrap.go index b5a3477b7..11a9b7aba 100644 --- a/flyteadmin/tests/bootstrap.go +++ b/flyteadmin/tests/bootstrap.go @@ -72,6 +72,8 @@ func truncateAllTablesForTestingOnly() { TruncateResources := fmt.Sprintf("TRUNCATE TABLE resources;") TruncateSchedulableEntities := fmt.Sprintf("TRUNCATE TABLE schedulable_entities;") TruncateSchedulableEntitiesSnapshots := fmt.Sprintf("TRUNCATE TABLE schedule_entities_snapshots;") + TruncateAdminTags := fmt.Sprintf("TRUNCATE TABLE admin_tags;") + TruncateExecutionAdminTags := fmt.Sprintf("TRUNCATE TABLE execution_admin_tags;") ctx := context.Background() db, err := repositories.GetDB(ctx, getDbConfig(), getLoggerConfig()) if err != nil { @@ -100,6 +102,8 @@ func truncateAllTablesForTestingOnly() { db.Exec(TruncateResources) db.Exec(TruncateSchedulableEntities) db.Exec(TruncateSchedulableEntitiesSnapshots) + db.Exec(TruncateAdminTags) + db.Exec(TruncateExecutionAdminTags) } func populateWorkflowExecutionForTestingOnly(project, domain, name string) { diff --git a/flyteadmin/tests/execution_test.go b/flyteadmin/tests/execution_test.go index f42133c7f..a8d5e9b82 100644 --- a/flyteadmin/tests/execution_test.go +++ b/flyteadmin/tests/execution_test.go @@ -185,6 +185,14 @@ func populateWorkflowExecutionsForTestingOnly() { db.Exec(`INSERT INTO workflows ("id", "project", "domain", "name", "version", "remote_closure_identifier") ` + `VALUES (4, 'project2', 'domain2', 'name2', 'version1', 's3://foo')`) + // Insert dummy tags + db.Exec(`INSERT INTO admin_tags ("id", "name") ` + `VALUES (1, 'hello')`) + db.Exec(`INSERT INTO admin_tags ("id", "name") ` + `VALUES (2, 'flyte')`) + db.Exec(`INSERT INTO execution_admin_tags ("execution_project", "execution_domain", "execution_name", "admin_tag_id") ` + `VALUES ('project1', 'domain1', 'name1', 1)`) + db.Exec(`INSERT INTO execution_admin_tags ("execution_project", "execution_domain", "execution_name", "admin_tag_id") ` + `VALUES ('project1', 'domain1', 'name1', 2)`) + db.Exec(`INSERT INTO execution_admin_tags ("execution_project", "execution_domain", "execution_name", "admin_tag_id") ` + `VALUES ('project1', 'domain1', 'name3', 2)`) + db.Exec(`INSERT INTO execution_admin_tags ("execution_project", "execution_domain", "execution_name", "admin_tag_id") ` + `VALUES ('project1', 'domain1', 'name4', 1)`) + for _, statement := range insertExecutionStatements { db.Exec(statement) } @@ -209,6 +217,26 @@ func TestListWorkflowExecutions(t *testing.T) { assert.Equal(t, len(resp.Executions), 4) } +func TestListWorkflowExecutionsWithTags(t *testing.T) { + truncateAllTablesForTestingOnly() + populateWorkflowExecutionsForTestingOnly() + + ctx := context.Background() + client, conn := GetTestAdminServiceClient() + defer conn.Close() + + resp, err := client.ListExecutions(ctx, &admin.ResourceListRequest{ + Id: &admin.NamedEntityIdentifier{ + Project: "project1", + Domain: "domain1", + }, + Limit: 5, + Filters: "value_in(admin_tag.name, hello)", + }) + assert.Nil(t, err) + assert.Equal(t, len(resp.Executions), 2) +} + func TestListWorkflowExecutions_Filters(t *testing.T) { truncateAllTablesForTestingOnly() populateWorkflowExecutionsForTestingOnly()