From 5a63652075ee121ce8b6afd789e3194f9e4cc4be Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 1 Aug 2023 11:20:07 -0700 Subject: [PATCH] parameter and literal map construction wrong Signed-off-by: Yee Hing Tong --- flyteadmin_config.yaml | 4 + pkg/artifacts/artifact_client.go | 37 ++++ pkg/artifacts/config.go | 7 + pkg/manager/impl/execution_manager.go | 183 ++++++++++++++---- .../impl/validation/execution_validator.go | 22 +-- pkg/rpc/adminservice/base.go | 12 +- pkg/runtime/application_config_provider.go | 8 + .../interfaces/application_configuration.go | 2 + 8 files changed, 217 insertions(+), 58 deletions(-) create mode 100644 pkg/artifacts/artifact_client.go create mode 100644 pkg/artifacts/config.go diff --git a/flyteadmin_config.yaml b/flyteadmin_config.yaml index ab56c8895..b42b742d6 100644 --- a/flyteadmin_config.yaml +++ b/flyteadmin_config.yaml @@ -60,6 +60,10 @@ flyteadmin: - "metadata" - "admin" useOffloadedWorkflowClosure: false +artifacts: + host: localhost + port: 50051 + insecure: true database: postgres: port: 30001 diff --git a/pkg/artifacts/artifact_client.go b/pkg/artifacts/artifact_client.go new file mode 100644 index 000000000..b29ebc141 --- /dev/null +++ b/pkg/artifacts/artifact_client.go @@ -0,0 +1,37 @@ +package artifacts + +import ( + "context" + "crypto/tls" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/artifact" + "github.com/flyteorg/flytestdlib/logger" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +func NewArtifactConnection(_ context.Context, cfg *Config, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + if opts == nil { + // Initialize opts list to the potential number of options we will add. Initialization optimizes memory + // allocation. + opts = make([]grpc.DialOption, 0, 5) + } + + if cfg.Insecure { + opts = append(opts, grpc.WithInsecure()) + } else { + tlsConfig := &tls.Config{} //nolint + creds := credentials.NewTLS(tlsConfig) + opts = append(opts, grpc.WithTransportCredentials(creds)) + } + + return grpc.Dial(cfg.Host, opts...) +} + +func InitializeArtifactClient(ctx context.Context, cfg *Config, opts ...grpc.DialOption) artifact.ArtifactRegistryClient { + conn, err := NewArtifactConnection(ctx, cfg, opts...) + if err != nil { + logger.Panicf(ctx, "failed to initialize Artifact connection. Err: %s", err.Error()) + panic(err) + } + return artifact.NewArtifactRegistryClient(conn) +} diff --git a/pkg/artifacts/config.go b/pkg/artifacts/config.go new file mode 100644 index 000000000..addeedaf6 --- /dev/null +++ b/pkg/artifacts/config.go @@ -0,0 +1,7 @@ +package artifacts + +type Config struct { + Host string `json:"host"` + Port int `json:"port"` + Insecure bool `json:"insecure"` +} diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index 429cbe188..5fc3f5db1 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -43,6 +43,7 @@ import ( runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" workflowengineInterfaces "github.com/flyteorg/flyteadmin/pkg/workflowengine/interfaces" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/artifact" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "google.golang.org/grpc/codes" @@ -99,6 +100,7 @@ type ExecutionManager struct { cloudEventPublisher notificationInterfaces.Publisher dbEventWriter eventWriter.WorkflowExecutionEventWriter pluginRegistry *plugins.Registry + artifactClient *artifact.ArtifactRegistryClient } func getExecutionContext(ctx context.Context, id *core.WorkflowExecutionIdentifier) context.Context { @@ -450,7 +452,7 @@ func (m *ExecutionManager) getClusterAssignment(ctx context.Context, request *ad func (m *ExecutionManager) launchSingleTaskExecution( ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) ( - context.Context, *models.Execution, map[string]*core.ArtifactID, error) { + context.Context, *models.Execution, error) { taskModel, err := m.db.TaskRepo().Get(ctx, repositoryInterfaces.Identifier{ Project: request.Spec.LaunchPlan.Project, @@ -459,11 +461,11 @@ func (m *ExecutionManager) launchSingleTaskExecution( Version: request.Spec.LaunchPlan.Version, }) if err != nil { - return nil, nil, nil, err + return nil, nil, err } task, err := transformers.FromTaskModel(taskModel) if err != nil { - return nil, nil, nil, err + return nil, nil, err } // Prepare a skeleton workflow @@ -472,15 +474,15 @@ func (m *ExecutionManager) launchSingleTaskExecution( util.CreateOrGetWorkflowModel(ctx, request, m.db, m.workflowManager, m.namedEntityManager, taskIdentifier, &task) if err != nil { logger.Debugf(ctx, "Failed to created skeleton workflow for [%+v] with err: %v", taskIdentifier, err) - return nil, nil, nil, err + return nil, nil, err } workflow, err := transformers.FromWorkflowModel(*workflowModel) if err != nil { - return nil, nil, nil, err + return nil, nil, err } closure, err := util.FetchAndGetWorkflowClosure(ctx, m.storageClient, workflowModel.RemoteClosureIdentifier) if err != nil { - return nil, nil, nil, err + return nil, nil, err } closure.CreatedAt = workflow.Closure.CreatedAt workflow.Closure = closure @@ -488,10 +490,10 @@ func (m *ExecutionManager) launchSingleTaskExecution( launchPlan, err := util.CreateOrGetLaunchPlan(ctx, m.db, m.config, taskIdentifier, workflow.Closure.CompiledWorkflow.Primary.Template.Interface, workflowModel.ID, request.Spec) if err != nil { - return nil, nil, nil, err + return nil, nil, err } - executionInputs, resolvedArtifactMap, err := validation.CheckAndFetchInputsForExecution( + executionInputs, err := validation.CheckAndFetchInputsForExecution( request.Inputs, launchPlan.Spec.FixedInputs, launchPlan.Closure.ExpectedInputs, @@ -500,7 +502,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( logger.Debugf(ctx, "Failed to CheckAndFetchInputsForExecution with request.Inputs: %+v"+ "fixed inputs: %+v and expected inputs: %+v with err %v", request.Inputs, launchPlan.Spec.FixedInputs, launchPlan.Closure.ExpectedInputs, err) - return nil, nil, nil, err + return nil, nil, err } name := util.GetExecutionName(request) @@ -518,14 +520,14 @@ func (m *ExecutionManager) launchSingleTaskExecution( requestSpec.Metadata = &admin.ExecutionMetadata{} } requestSpec.Metadata.Principal = getUser(ctx) - requestSpec.Metadata.ArtifactIds = resolvedArtifactMap + //requestSpec.Metadata.ArtifactIds = resolvedArtifactMap // Get the node execution (if any) that launched this execution var parentNodeExecutionID uint var sourceExecutionID uint parentNodeExecutionID, sourceExecutionID, err = m.getInheritedExecMetadata(ctx, requestSpec, &workflowExecutionID) if err != nil { - return nil, nil, nil, err + return nil, nil, err } // Dynamically assign task resource defaults. @@ -539,15 +541,15 @@ func (m *ExecutionManager) launchSingleTaskExecution( inputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, request.Inputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.Inputs) if err != nil { - return nil, nil, nil, err + return nil, nil, err } userInputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, request.Inputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.UserInputs) if err != nil { - return nil, nil, nil, err + return nil, nil, err } executionConfig, err := m.getExecutionConfig(ctx, &request, nil) if err != nil { - return nil, nil, nil, err + return nil, nil, err } var labels map[string]string @@ -557,7 +559,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( labels, err = m.addProjectLabels(ctx, request.Project, labels) if err != nil { - return nil, nil, nil, err + return nil, nil, err } var annotations map[string]string @@ -572,7 +574,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( clusterAssignment, err := m.getClusterAssignment(ctx, &request) if err != nil { - return nil, nil, nil, err + return nil, nil, err } executionParameters := workflowengineInterfaces.ExecutionParameters{ @@ -590,7 +592,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, workflowExecutionID.Name, "") if err != nil { - return nil, nil, nil, err + return nil, nil, err } if overrides != nil { executionParameters.TaskPluginOverrides = overrides @@ -615,7 +617,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( m.systemMetrics.PropellerFailures.Inc() logger.Infof(ctx, "Failed to execute workflow %+v with execution id %+v and inputs %+v with err %v", request, workflowExecutionID, request.Inputs, err) - return nil, nil, nil, err + return nil, nil, err } executionCreatedAt := time.Now() acceptanceDelay := executionCreatedAt.Sub(requestedAt) @@ -656,10 +658,10 @@ func (m *ExecutionManager) launchSingleTaskExecution( if err != nil { logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v", workflowExecutionID, err) - return nil, nil, nil, err + return nil, nil, err } m.userMetrics.WorkflowExecutionInputBytes.Observe(float64(proto.Size(request.Inputs))) - return ctx, executionModel, resolvedArtifactMap, nil + return ctx, executionModel, nil } func resolveAuthRole(request *admin.ExecutionCreateRequest, launchPlan *admin.LaunchPlan) *admin.AuthRole { @@ -707,14 +709,78 @@ func resolveSecurityCtx(ctx context.Context, executionConfigSecurityCtx *core.Se } } +// ResolveLiteralMapArtifacts will go through the input literal map and resolve any artifact references to their +// literal values. +func (m *ExecutionManager) ResolveLiteralMapArtifacts(ctx context.Context, inputs *core.LiteralMap) (*core.LiteralMap, []*core.ArtifactID, error) { + // only top level replace for now. + var artifactIDs []*core.ArtifactID + outputs := map[string]*core.Literal{} + for k, v := range inputs.Literals { + if v.GetArtifactId() != nil { + if m.artifactClient == nil { + return nil, nil, errors.NewFlyteAdminErrorf(codes.Internal, "artifact client is not initialized") + } + + x := *m.artifactClient + req := &artifact.GetArtifactRequest{ + Identifier: &artifact.GetArtifactRequest_ArtifactId{ArtifactId: v.GetArtifactId()}, + Details: false, + } + resp, err := x.GetArtifact(ctx, req) + if err != nil { + return nil, nil, err + } + artifactIDs = append(artifactIDs, resp.Artifact.GetArtifactId()) + logger.Debugf(ctx, "Resolved artifact for [%s] to [%+v]", k, resp.GetArtifact().ArtifactId) + outputs[k] = resp.Artifact.Spec.Value + } else { + outputs[k] = v + } + } + lm := &core.LiteralMap{Literals: outputs} + return lm, artifactIDs, nil + +} + +// ResolveParameterMapArtifacts will go through the parameter map, and resolve any artifact queries. +func (m *ExecutionManager) ResolveParameterMapArtifacts(ctx context.Context, inputs *core.ParameterMap) (*core.ParameterMap, []*core.ArtifactID, error) { + // only top level replace for now. + var artifactIDs []*core.ArtifactID + outputs := map[string]*core.Parameter{} + x := *m.artifactClient + + for k, v := range inputs.Parameters { + if v.GetArtifactQuery() != nil { + if m.artifactClient == nil { + return nil, nil, errors.NewFlyteAdminErrorf(codes.Internal, "artifact client is not initialized, can't resolve queries") + } + + req := &artifact.GetArtifactRequest{ + Identifier: &artifact.GetArtifactRequest_Query{Query: v.GetArtifactQuery()}, + Details: false, + } + + resp, err := x.GetArtifact(ctx, req) + if err != nil { + return nil, nil, err + } + artifactIDs = append(artifactIDs, resp.Artifact.GetArtifactId()) + logger.Debugf(ctx, "Resolved query for [%s] to [%+v]", k, resp.Artifact.ArtifactId) + outputs[k] = &core.Parameter{ + Var: v.Var, + Behavior: &core.Parameter_Default{Default: resp.Artifact.Spec.Value}, + } + } else { + outputs[k] = v + } + } + pm := &core.ParameterMap{Parameters: outputs} + return pm, artifactIDs, nil +} + func (m *ExecutionManager) launchExecutionAndPrepareModel( ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) ( - context.Context, *models.Execution, map[string]*core.ArtifactID, error) { - - // Resolve artifacts. - // two sources of artifacts: launch plan and create execute request. - // - within the launch plan, the artifact will be in the Parameter map, and can come in the Literal, - // or as an ArtifactQuery. + context.Context, *models.Execution, []*core.ArtifactID, error) { err := validation.ValidateExecutionRequest(ctx, request, m.db, m.config.ApplicationConfiguration()) if err != nil { @@ -723,7 +789,10 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( } if request.Spec.LaunchPlan.ResourceType == core.ResourceType_TASK { logger.Debugf(ctx, "Launching single task execution with [%+v]", request.Spec.LaunchPlan) - return m.launchSingleTaskExecution(ctx, request, requestedAt) + // When tasks can have defaults this will need to handle Artifacts as well. + ctx, model, err := m.launchSingleTaskExecution(ctx, request, requestedAt) + // Since single task doesn't leverage artifact yet, inject a nil artifact list. + return ctx, model, nil, err } launchPlanModel, err := util.GetLaunchPlanModel(ctx, m.db, *request.Spec.LaunchPlan) @@ -736,18 +805,51 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( logger.Debugf(ctx, "Failed to transform launch plan model %+v with err %v", launchPlanModel, err) return nil, nil, nil, err } + + // Resolve artifacts. + // two sources of artifacts: launch plan and create execute request. + // - within the create request, artifacts will be pinned in the Literals. + // - within the launch plan, the artifact will be in the Parameter map, and can come in the Literal, + // or as an ArtifactQuery. + // All three components that comprise the inputs may contain artifacts. + var foundArtifacts []*core.ArtifactID + resolvedRequestInputs, as, err := m.ResolveLiteralMapArtifacts(ctx, request.Inputs) + if err != nil { + logger.Errorf(ctx, "Error looking up request.Inputs for artifacts: %v", err) + return nil, nil, nil, err + } + foundArtifacts = append(foundArtifacts, as...) + resolvedFixedInputs, as, err := m.ResolveLiteralMapArtifacts(ctx, launchPlan.Spec.FixedInputs) + if err != nil { + logger.Errorf(ctx, "Error looking up launch plan fixed inputs for artifacts: %v", err) + return nil, nil, nil, err + } + foundArtifacts = append(foundArtifacts, as...) + resolvedExpectedInputs, as, err := m.ResolveParameterMapArtifacts(ctx, launchPlan.Closure.ExpectedInputs) + if err != nil { + logger.Errorf(ctx, "Error looking up launch plan closure parameter map: %v", err) + return nil, nil, nil, err + } + foundArtifacts = append(foundArtifacts, as...) + if len(foundArtifacts) > 0 { + logger.Debugf(ctx, "Resolved request.Inputs from [%+v] to [%+v]", request.Inputs, resolvedRequestInputs) + logger.Debugf(ctx, "Resolved launch plan fixed inputs from [%+v] to [%+v]", launchPlan.Spec.FixedInputs, resolvedFixedInputs) + logger.Debugf(ctx, "Resolved launch plan closure expected inputs from [%+v] to [%+v]", launchPlan.Closure.ExpectedInputs, resolvedExpectedInputs) + logger.Debugf(ctx, "Found artifacts: %v", foundArtifacts) + } + // Artifacts retrieved will need to be stored somewhere to ensure that we can re-emit events if necessary // in the future, and also to make sure that relaunch and recover can use it if necessary. - executionInputs, resolvedArtifactMap, err := validation.CheckAndFetchInputsForExecution( - request.Inputs, - launchPlan.Spec.FixedInputs, - launchPlan.Closure.ExpectedInputs, + executionInputs, err := validation.CheckAndFetchInputsForExecution( + resolvedRequestInputs, + resolvedFixedInputs, + resolvedExpectedInputs, ) if err != nil { - logger.Debugf(ctx, "Failed to CheckAndFetchInputsForExecution with request.Inputs: %+v"+ + logger.Debugf(ctx, "Failed to CheckAndFetchInputsForExecution with resolvedRequestInputs: %+v"+ "fixed inputs: %+v and expected inputs: %+v with err %v", - request.Inputs, launchPlan.Spec.FixedInputs, launchPlan.Closure.ExpectedInputs, err) + resolvedRequestInputs, resolvedFixedInputs, resolvedExpectedInputs, err) return nil, nil, nil, err } @@ -781,7 +883,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( requestSpec.Metadata = &admin.ExecutionMetadata{} } requestSpec.Metadata.Principal = getUser(ctx) - requestSpec.Metadata.ArtifactIds = resolvedArtifactMap + requestSpec.Metadata.ArtifactIds = foundArtifacts // Get the node and parent execution (if any) that launched this execution var parentNodeExecutionID uint @@ -804,7 +906,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( if err != nil { return nil, nil, nil, err } - userInputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, request.Inputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.UserInputs) + userInputsURI, err := common.OffloadLiteralMap(ctx, m.storageClient, resolvedRequestInputs, workflowExecutionID.Project, workflowExecutionID.Domain, workflowExecutionID.Name, shared.UserInputs) if err != nil { return nil, nil, nil, err } @@ -923,7 +1025,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( workflowExecutionID, err) return nil, nil, nil, err } - return ctx, executionModel, resolvedArtifactMap, nil + return ctx, executionModel, foundArtifacts, nil } // Inserts an execution model into the database store and emits platform metrics. @@ -947,7 +1049,8 @@ func (m *ExecutionManager) createExecutionModel( return &workflowExecutionIdentifier, nil } -func (m *ExecutionManager) handleArtifactEvents(artifactIDs map[string]*core.ArtifactID, wfExecID core.WorkflowExecutionIdentifier) { +func (m *ExecutionManager) handleArtifactEvents(artifactIDs []*core.ArtifactID, wfExecID core.WorkflowExecutionIdentifier) { + // todo: Proper events need to be created for this. if artifactIDs != nil { fmt.Printf("WF exec used %v", wfExecID.String()) for _, artifactID := range artifactIDs { @@ -967,7 +1070,7 @@ func (m *ExecutionManager) CreateExecution( } var executionModel *models.Execution var err error - var artifactIDs map[string]*core.ArtifactID + var artifactIDs []*core.ArtifactID ctx, executionModel, artifactIDs, err = m.launchExecutionAndPrepareModel(ctx, request, requestedAt) if err != nil { return nil, err @@ -1659,7 +1762,8 @@ func NewExecutionManager(db repositoryInterfaces.Repository, pluginRegistry *plu publisher notificationInterfaces.Publisher, urlData dataInterfaces.RemoteURLInterface, workflowManager interfaces.WorkflowInterface, namedEntityManager interfaces.NamedEntityInterface, eventPublisher notificationInterfaces.Publisher, cloudEventPublisher cloudeventInterfaces.Publisher, - eventWriter eventWriter.WorkflowExecutionEventWriter) interfaces.ExecutionInterface { + eventWriter eventWriter.WorkflowExecutionEventWriter, artifactClient *artifact.ArtifactRegistryClient) interfaces.ExecutionInterface { + queueAllocator := executions.NewQueueAllocator(config, db) systemMetrics := newExecutionSystemMetrics(systemScope) @@ -1692,6 +1796,7 @@ func NewExecutionManager(db repositoryInterfaces.Repository, pluginRegistry *plu cloudEventPublisher: cloudEventPublisher, dbEventWriter: eventWriter, pluginRegistry: pluginRegistry, + artifactClient: artifactClient, } } diff --git a/pkg/manager/impl/validation/execution_validator.go b/pkg/manager/impl/validation/execution_validator.go index 6920f44e4..a1c2a1514 100644 --- a/pkg/manager/impl/validation/execution_validator.go +++ b/pkg/manager/impl/validation/execution_validator.go @@ -79,11 +79,10 @@ func ValidateExecutionRequest(ctx context.Context, request admin.ExecutionCreate // CheckAndFetchInputsForExecution will merge inputs and also resolve any artifacts that are required. // A map will be returned for all artifacts used. func CheckAndFetchInputsForExecution( - userInputs *core.LiteralMap, fixedInputs *core.LiteralMap, expectedInputs *core.ParameterMap) (*core.LiteralMap, map[string]*core.ArtifactID, error) { + userInputs *core.LiteralMap, fixedInputs *core.LiteralMap, expectedInputs *core.ParameterMap) (*core.LiteralMap, error) { executionInputMap := map[string]*core.Literal{} expectedInputMap := map[string]*core.Parameter{} - resolvedArtifactMap := map[string]*core.ArtifactID{} if expectedInputs != nil && len(expectedInputs.GetParameters()) > 0 { expectedInputMap = expectedInputs.GetParameters() @@ -92,7 +91,7 @@ func CheckAndFetchInputsForExecution( if userInputs != nil && len(userInputs.GetLiterals()) > 0 { for name, value := range userInputs.GetLiterals() { if _, ok := expectedInputMap[name]; !ok { - return nil, resolvedArtifactMap, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "invalid input %s", name) + return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "invalid input %s", name) } executionInputMap[name] = value } @@ -101,7 +100,7 @@ func CheckAndFetchInputsForExecution( for name, expectedInput := range expectedInputMap { if _, ok := executionInputMap[name]; !ok { if expectedInput.GetRequired() { - return nil, resolvedArtifactMap, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "%s %s missing", shared.ExpectedInputs, name) + return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "%s %s missing", shared.ExpectedInputs, name) } // Look up from Artifact service if necessary if expectedInput.GetArtifactQuery() != nil { @@ -114,7 +113,7 @@ func CheckAndFetchInputsForExecution( } else { inputType := validators.LiteralTypeForLiteral(executionInputMap[name]) if !validators.AreTypesCastable(inputType, expectedInput.GetVar().GetType()) { - return nil, resolvedArtifactMap, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "invalid %s input wrong type. Expected %s, but got %s", name, expectedInput.GetVar().GetType(), inputType) + return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "invalid %s input wrong type. Expected %s, but got %s", name, expectedInput.GetVar().GetType(), inputType) } } } @@ -122,24 +121,15 @@ func CheckAndFetchInputsForExecution( if fixedInputs != nil && len(fixedInputs.GetLiterals()) > 0 { for name, fixedInput := range fixedInputs.GetLiterals() { if _, ok := executionInputMap[name]; ok { - return nil, resolvedArtifactMap, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "%s %s cannot be overridden", shared.FixedInputs, name) + return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "%s %s cannot be overridden", shared.FixedInputs, name) } executionInputMap[name] = fixedInput } } - // Resolve any artifacts that are required. - for name, input := range executionInputMap { - if input.GetArtifactId() != nil { - resolvedArtifactMap[name] = input.GetArtifactId() - // Replace the reference with the actual literal - // executionInputMap[name] = artifactService.GetArtifact(input.GetArtifactId()) - } - } - return &core.LiteralMap{ Literals: executionInputMap, - }, resolvedArtifactMap, nil + }, nil } func CheckValidExecutionID(executionID, fieldName string) error { diff --git a/pkg/rpc/adminservice/base.go b/pkg/rpc/adminservice/base.go index 4e3ee7f8a..d206e182c 100644 --- a/pkg/rpc/adminservice/base.go +++ b/pkg/rpc/adminservice/base.go @@ -14,10 +14,11 @@ import ( eventWriter "github.com/flyteorg/flyteadmin/pkg/async/events/implementations" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service" - "github.com/flyteorg/flyteadmin/pkg/manager/impl/resources" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/artifact" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service" + artifactClient "github.com/flyteorg/flyteadmin/pkg/artifacts" "github.com/flyteorg/flyteadmin/pkg/async/notifications" "github.com/flyteorg/flyteadmin/pkg/async/schedule" "github.com/flyteorg/flyteadmin/pkg/data" @@ -142,9 +143,14 @@ func NewAdminServer(ctx context.Context, pluginRegistry *plugins.Registry, confi executionEventWriter.Run() }() + var artifactsClient *artifact.ArtifactRegistryClient + if configuration.ApplicationConfiguration().GetArtifactsConfig() != nil { + c := artifactClient.InitializeArtifactClient(ctx, configuration.ApplicationConfiguration().GetArtifactsConfig()) + artifactsClient = &c + } executionManager := manager.NewExecutionManager(repo, pluginRegistry, configuration, dataStorageClient, adminScope.NewSubScope("execution_manager"), adminScope.NewSubScope("user_execution_metrics"), - publisher, urlData, workflowManager, namedEntityManager, eventPublisher, cloudEventPublisher, executionEventWriter) + publisher, urlData, workflowManager, namedEntityManager, eventPublisher, cloudEventPublisher, executionEventWriter, artifactsClient) versionManager := manager.NewVersionManager() scheduledWorkflowExecutor := workflowScheduler.GetWorkflowExecutor(executionManager, launchPlanManager) diff --git a/pkg/runtime/application_config_provider.go b/pkg/runtime/application_config_provider.go index 3b8b0a270..b708ba837 100644 --- a/pkg/runtime/application_config_provider.go +++ b/pkg/runtime/application_config_provider.go @@ -1,6 +1,7 @@ package runtime import ( + artifactsClient "github.com/flyteorg/flyteadmin/pkg/artifacts" "github.com/flyteorg/flyteadmin/pkg/common" "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" "github.com/flyteorg/flytestdlib/config" @@ -15,6 +16,7 @@ const notifications = "notifications" const domains = "domains" const externalEvents = "externalEvents" const cloudEvents = "cloudEvents" +const artifact = "artifacts" const metricPort = 10254 const KB = 1024 @@ -84,6 +86,8 @@ var cloudEventsConfig = config.MustRegisterSection(cloudEvents, &interfaces.Clou Type: common.Local, }) +var artifactsConfig = config.MustRegisterSection(artifact, &artifactsClient.Config{}) + // Implementation of an interfaces.ApplicationConfiguration type ApplicationConfigurationProvider struct{} @@ -119,6 +123,10 @@ func (p *ApplicationConfigurationProvider) GetCloudEventsConfig() *interfaces.Cl return cloudEventsConfig.GetConfig().(*interfaces.CloudEventsConfig) } +func (p *ApplicationConfigurationProvider) GetArtifactsConfig() *artifactsClient.Config { + return artifactsConfig.GetConfig().(*artifactsClient.Config) +} + func NewApplicationConfigurationProvider() interfaces.ApplicationConfiguration { return &ApplicationConfigurationProvider{} } diff --git a/pkg/runtime/interfaces/application_configuration.go b/pkg/runtime/interfaces/application_configuration.go index aeb0ab34e..8b59d1404 100644 --- a/pkg/runtime/interfaces/application_configuration.go +++ b/pkg/runtime/interfaces/application_configuration.go @@ -1,6 +1,7 @@ package interfaces import ( + artifactsClient "github.com/flyteorg/flyteadmin/pkg/artifacts" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytestdlib/config" @@ -592,4 +593,5 @@ type ApplicationConfiguration interface { GetDomainsConfig() *DomainsConfig GetExternalEventsConfig() *ExternalEventsConfig GetCloudEventsConfig() *CloudEventsConfig + GetArtifactsConfig() *artifactsClient.Config }