Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
remove code that used to handle creation of artifact events
Browse files Browse the repository at this point in the history
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
  • Loading branch information
wild-endeavor committed Jul 25, 2023
1 parent 30adee6 commit 59b147b
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 194 deletions.
126 changes: 0 additions & 126 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1219,128 +1219,6 @@ func (m *ExecutionManager) emitOverallWorkflowExecutionTime(
watch.Observe(*executionModel.ExecutionCreatedAt, terminalEventTime)
}

//// getAliases creates a list of aliases for a given output in a workflow execution. It should be called once per
//// output for a given workflow execution.
//func (m *ExecutionManager) getAliases(workflowID core.Identifier, execID core.WorkflowExecutionIdentifier, typedInterface core.TypedInterface, outputName string) ([]*artifact.Alias, error) {
//
// if v, ok := typedInterface.Outputs.Variables[outputName]; ok {
// defaultAlias := &artifact.Alias{
// Name: fmt.Sprintf("%s/%s", workflowID.Name, outputName),
// Value: execID.Name,
// }
//
// if v.Artifact != nil && len(v.Artifact.Spec.Aliases) > 0 {
// aliases := make([]*artifact.Alias, 0, len(v.Artifact.Spec.Aliases)+1)
// aliases = append(aliases, defaultAlias)
// for _, a := range v.Artifact.Spec.Aliases {
// aliases = append(aliases, &artifact.Alias{
// Name: a.Name,
// Value: a.Value,
// })
// }
// return aliases, nil
// }
//
// // If nothing specified by the user, just return the default alias.
// return []*artifact.Alias{defaultAlias}, nil
// }
// return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "output [%s] not found in workflow interface [%v] for workflow [%v]", outputName, typedInterface, workflowID)
//}
//
//func (m *ExecutionManager) handleArtifactEventEmitting(ctx context.Context, request admin.WorkflowExecutionEventRequest) {
// // Basic error checking
// if request.Event.ExecutionId == nil {
// logger.Warningf(ctx, "nil execution id in event request [%+v]", request)
// return
// }
//
// // TODO: Make this one call to the DB instead of two.
// executionModel, err := m.db.ExecutionRepo().Get(ctx, repositoryInterfaces.Identifier{
// Project: request.Event.ExecutionId.Project,
// Domain: request.Event.ExecutionId.Domain,
// Name: request.Event.ExecutionId.Name,
// })
// ex, err := transformers.FromExecutionModel(ctx, executionModel, nil)
// if ex.Closure.WorkflowId == nil {
// logger.Warningf(ctx, "workflow id is nil for execution [%+v]", ex)
// return
// }
// workflowModel, err := m.db.WorkflowRepo().Get(ctx, repositoryInterfaces.Identifier{
// Project: ex.Closure.WorkflowId.Project,
// Domain: ex.Closure.WorkflowId.Domain,
// Name: ex.Closure.WorkflowId.Name,
// Version: ex.Closure.WorkflowId.Version,
// })
// var workflowInterface core.TypedInterface
// if workflowModel.TypedInterface != nil && len(workflowModel.TypedInterface) > 0 {
// err = proto.Unmarshal(workflowModel.TypedInterface, &workflowInterface)
// if err != nil {
// logger.Errorf(ctx,
// "Artifact eventing - failed to unmarshal TypedInterface for workflow [%+v] with err: %v",
// workflowModel.ID, err)
// return
// }
// }
//
// var outputs *core.LiteralMap
// if request.Event.GetOutputData() != nil {
// fmt.Printf("remove this - Got output data")
// outputs = request.Event.GetOutputData()
// } else if len(request.Event.GetOutputUri()) > 0 {
// fmt.Printf("remove this - Got output URI")
// // GetInputs actually fetches the data, even though this is an output
// outputs, _, err = util.GetInputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(),
// m.storageClient, request.Event.GetOutputUri())
// if err != nil {
// // TODO: metric this
// logger.Warningf(ctx, "Error fetching output literal map %v", request.Event)
// }
// } else {
// logger.Debugf(ctx, "Neither output data nor uri found for %v", request.Event)
// return
// }
// if outputs == nil {
// logger.Debugf(ctx, "Output data was nil for %v", request.Event)
// return
// }
//
// nodeExecutionID := core.NodeExecutionIdentifier{
// NodeId: "end-node",
// ExecutionId: request.Event.ExecutionId,
// }
//
// for k, v := range outputs.Literals {
// // Use input type because workflow outputs are inputs to the end node.
// artifactKeySuffix := common.FlyteURLKeyFromNodeExecutionIDAndOutput(nodeExecutionID, common.ArtifactTypeI, k)
//
// aliases, err := m.getAliases(*ex.Closure.WorkflowId, *request.Event.ExecutionId, workflowInterface, k)
// if err != nil {
// logger.Errorf(ctx, "Failed getting alias for [%s] in workflow [%v], err: %v", k, ex.Closure.WorkflowId, err)
// }
// as := artifact.ArtifactSpec{
// Value: v,
// Source: &artifact.ArtifactSpec_Execution{
// Execution: request.Event.ExecutionId,
// },
// Aliases: aliases,
// }
// ak := core.ArtifactKey{
// Project: request.Event.ExecutionId.Project,
// Domain: request.Event.ExecutionId.Domain,
// Suffix: artifactKeySuffix,
// }
//
// a := artifact.CreateArtifactRequest{
// ArtifactKey: &ak,
// Spec: &as,
// }
// e := event.ArtifactCreateEvent{
// CreateRequest: &a,
// }
// print(fmt.Sprintf("Output %s, becomes artifact request: %v\n", k, e))
// }
//}

func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admin.WorkflowExecutionEventRequest) (
*admin.WorkflowExecutionEventResponse, error) {
err := validation.ValidateCreateWorkflowEventRequest(request, m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes)
Expand Down Expand Up @@ -1426,10 +1304,6 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi
if request.Event.GetOutputData() != nil {
m.userMetrics.WorkflowExecutionOutputBytes.Observe(float64(proto.Size(request.Event.GetOutputData())))
}
//go func() {
// logger.Debugf(ctx, "Emitting workflow success artifact event flow for [%+v]", request)
// m.handleArtifactEventEmitting(ctx, request)
//}()

err = m.publishNotifications(ctx, request, *executionModel)
if err != nil {
Expand Down
68 changes: 0 additions & 68 deletions pkg/manager/impl/task_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,70 +129,6 @@ func (m *TaskExecutionManager) updateTaskExecutionModelState(
return *existingTaskExecution, nil
}

//func (m *TaskExecutionManager) handleArtifactEventEmitting(ctx context.Context, request admin.TaskExecutionEventRequest, taskExecutionID core.TaskExecutionIdentifier) {
//
// taskModel, err := m.db.TaskRepo().Get(ctx, repoInterfaces.Identifier{
// Project: request.Event.TaskId.Project,
// Domain: request.Event.TaskId.Domain,
// Name: request.Event.TaskId.Name,
// Version: request.Event.TaskId.Version,
// })
// if err != nil {
// // TODO: metric this
// logger.Debugf(ctx, "Failed to get task with task id [%+v] with err %v", request.Event.TaskId, err)
// return
// }
// task, err := transformers.FromTaskModel(taskModel)
// task.Closure.CompiledTask.Template.Interface
//
// var outputs *core.LiteralMap
// if request.Event.GetOutputData() != nil {
// fmt.Printf("remove this - Got output data")
// outputs = request.Event.GetOutputData()
// } else if len(request.Event.GetOutputUri()) > 0 {
// fmt.Printf("remove this - Got output URI")
// // GetInputs actually fetches the data, even though this is an output
// outputs, _, err = util.GetInputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(),
// m.storageClient, request.Event.GetOutputUri())
// if err != nil {
// fmt.Printf("Error fetching output literal map %v", request.Event)
// }
// } else {
// fmt.Printf("No output data found for %v\n", request.Event)
// }
// if outputs != nil {
// urls := common.FlyteURLsFromTaskExecutionID(taskExecutionID, false)
// outputURLs := common.AppendLinksForLiteralMap(urls.GetOutputs(), *outputs)
// for k, v := range outputs.Literals {
// as := artifact.ArtifactSpec{
// Value: v,
// // Type, tags and aliases will need to be filled in later
// Source: &artifact.ArtifactSpec_TaskExecution{
// TaskExecution: &taskExecutionID,
// },
// }
// ak := core.ArtifactKey{
// Project: request.Event.ParentNodeExecutionId.ExecutionId.Project,
// Domain: request.Event.ParentNodeExecutionId.ExecutionId.Domain,
// // This will need to be filled in later, will need to pull from task template, or set to
// // something pretty unique, like the task ID.
// Name: "",
// }
//
// a := artifact.CreateArtifactRequest{
// ArtifactKey: &ak,
// Version: request.Event.ParentNodeExecutionId.ExecutionId.Name,
// Uri: outputURLs[k],
// Spec: &as,
// }
// e := event.ArtifactCreateEvent{
// CreateRequest: &a,
// }
// print(fmt.Sprintf("Output %s, becomes artifact request: %v\n", k, e))
// }
// }
//}

func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, request admin.TaskExecutionEventRequest) (
*admin.TaskExecutionEventResponse, error) {
if err := validation.ValidateTaskExecutionRequest(request, m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes); err != nil {
Expand Down Expand Up @@ -266,10 +202,6 @@ func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, req
if request.Event.GetOutputData() != nil {
m.metrics.TaskExecutionOutputBytes.Observe(float64(proto.Size(request.Event.GetOutputData())))
}
//go func() {
// logger.Debugf(ctx, "Emitting task execution artifacts for [%+v] [%+v]", taskExecutionID, request)
// m.handleArtifactEventEmitting(ctx, request, taskExecutionID)
//}()
}

if err = m.notificationClient.Publish(ctx, proto.MessageName(&request), &request); err != nil {
Expand Down

0 comments on commit 59b147b

Please sign in to comment.