Skip to content

Commit

Permalink
Only set SDKPriorityUpdateHandling if workflow update is being used (#…
Browse files Browse the repository at this point in the history
…1398)

Only set update flag on update
  • Loading branch information
Quinn-With-Two-Ns authored Feb 27, 2024
1 parent 54b2336 commit 34ef0a7
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 8 deletions.
1 change: 0 additions & 1 deletion internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,6 @@ func (weh *workflowExecutionEventHandlerImpl) handleWorkflowExecutionStarted(
// replay sees the _final_ value of applied flags, not intermediate values
// as the value varies by WFT)
weh.sdkFlags.tryUse(SDKFlagProtocolMessageCommand, !weh.isReplay)
weh.sdkFlags.tryUse(SDKPriorityUpdateHandling, !weh.isReplay)

// Invoke the workflow.
weh.workflowDefinition.Execute(weh, attributes.Header, attributes.Input)
Expand Down
12 changes: 11 additions & 1 deletion internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ type (
message string
}

unknownSdkFlagError struct {
message string
}

preparedTask struct {
events []*historypb.HistoryEvent
markers []*historypb.HistoryEvent
Expand Down Expand Up @@ -239,6 +243,10 @@ func (h historyMismatchError) Error() string {
return h.message
}

func (s unknownSdkFlagError) Error() string {
return s.message
}

// Get workflow start event.
func (eh *history) GetWorkflowStartedEvent() (*historypb.HistoryEvent, error) {
events := eh.workflowTask.task.History.Events
Expand Down Expand Up @@ -278,7 +286,9 @@ func (eh *history) isNextWorkflowTaskFailed() (task finishedTask, err error) {
f := sdkFlagFromUint(flag)
if !f.isValid() {
// If a flag is not recognized (value is too high or not defined), it must fail the workflow task
return finishedTask{}, errors.New("could not recognize SDK flag")
return finishedTask{}, unknownSdkFlagError{
message: fmt.Sprintf("unknown SDK flag: %d", flag),
}
}
flags = append(flags, f)
}
Expand Down
2 changes: 2 additions & 0 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,8 @@ func (wtp *workflowTaskPoller) errorToFailWorkflowTask(taskToken []byte, err err
}
} else if _, mismatch := err.(historyMismatchError); mismatch {
cause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR
} else if _, unknown := err.(unknownSdkFlagError); unknown {
cause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR
}

builtRequest := &workflowservice.RespondWorkflowTaskFailedRequest{
Expand Down
1 change: 0 additions & 1 deletion internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,6 @@ func (d *syncWorkflowDefinition) Execute(env WorkflowEnvironment, header *common
state.yield("yield before executing to setup state")
state.unblocked()

// TODO: @shreyassrivatsan - add workflow trace span here
r.workflowResult, r.error = d.workflow.Execute(d.rootCtx, input)
rpp := getWorkflowResultPointerPointer(ctx)
*rpp = r
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ type UpdateWorkflowWithOptionsRequest struct {

// WorkflowUpdateHandle is a handle to a workflow execution update process. The
// update may or may not have completed so an instance of this type functions
// simlar to a Future with respect to the outcome of the update. If the update
// similar to a Future with respect to the outcome of the update. If the update
// is rejected or returns an error, the Get function on this type will return
// that error through the output valuePtr.
// NOTE: Experimental
Expand Down
152 changes: 151 additions & 1 deletion test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
updatepb "go.temporal.io/api/update/v1"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"
"go.uber.org/goleak"
Expand Down Expand Up @@ -1460,7 +1461,7 @@ func (ts *IntegrationTestSuite) TestUpdateValidatorRejected() {
run, err := ts.client.ExecuteWorkflow(ctx,
wfOptions, ts.workflows.UpdateWithValidatorWorkflow)
ts.Nil(err)
_, err = ts.client.QueryWorkflow(ctx, run.GetID(), run.GetRunID(), "__stack_trace")
_, err = ts.client.QueryWorkflow(ctx, run.GetID(), run.GetRunID(), client.QueryTypeStackTrace)
ts.NoError(err)
// Send a bad update request that will get rejected
handler, err := ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "update", "")
Expand All @@ -1472,6 +1473,37 @@ func (ts *IntegrationTestSuite) TestUpdateValidatorRejected() {
ts.NoError(run.Get(ctx, nil))
}

func (ts *IntegrationTestSuite) TestUpdateWorkflowCancelled() {
ctx := context.Background()
wfOptions := ts.startWorkflowOptions("test-update-workflow-cancelled")
run, err := ts.client.ExecuteWorkflow(ctx,
wfOptions, ts.workflows.UpdateCancelableWorkflow)
ts.Nil(err)

// Send a few updates to the workflow
handles := make([]client.WorkflowUpdateHandle, 0, 5)
for i := 0; i < 5; i++ {
handler, err := ts.client.UpdateWorkflowWithOptions(ctx, &client.UpdateWorkflowWithOptionsRequest{
UpdateID: fmt.Sprintf("test-update-%d", i),
WorkflowID: run.GetID(),
RunID: run.GetRunID(),
UpdateName: "update",
WaitPolicy: &updatepb.WaitPolicy{
LifecycleStage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
},
})
ts.NoError(err)
handles = append(handles, handler)
}
// All updates should complete with a cancellation error
ts.NoError(ts.client.CancelWorkflow(ctx, run.GetID(), run.GetRunID()))
for _, handle := range handles {
err = handle.Get(ctx, nil)
ts.NotNil(err.(*temporal.CanceledError))
}
ts.NoError(run.Get(ctx, nil))
}

func (ts *IntegrationTestSuite) TestBasicSession() {
var expected []string
err := ts.executeWorkflow("test-basic-session", ts.workflows.BasicSession, &expected)
Expand Down Expand Up @@ -2620,6 +2652,74 @@ func (ts *IntegrationTestSuite) TestWaitOnUpdate() {
ts.Equal(1, result)
}

func (ts *IntegrationTestSuite) TestUpdateHandlerRegisteredLate() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
options := ts.startWorkflowOptions("test-update-handler-registered-late")
run, err := ts.client.ExecuteWorkflow(ctx,
options,
ts.workflows.UpdateHandlerRegisteredLate)
ts.NoError(err)
// Wait for the workflow to be blocked
ts.waitForQueryTrue(run, "state", 0)
// Send an update before the handler is registered
updateHandle, err := ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "update")
ts.NoError(err)
ts.Error(updateHandle.Get(ctx, nil))
// Unblock the workflow so it can register the handler
ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "unblock", nil)
// Send an update after the handler is registered
updateHandle, err = ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "update")
ts.NoError(err)
ts.NoError(updateHandle.Get(ctx, nil))
// Unblock the workflow so it can complete
ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "unblock", nil)
// Get the result
var result int
ts.NoError(run.Get(ctx, &result))
ts.Equal(1, result)
}

func (ts *IntegrationTestSuite) TestUpdateSDKFlag() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
options := ts.startWorkflowOptions("test-update-SDK-flag")
run, err := ts.client.ExecuteWorkflow(ctx,
options,
ts.workflows.UpdateHandlerRegisteredLate)
ts.NoError(err)
// Wait for the workflow to be blocked
ts.waitForQueryTrue(run, "state", 0)
// Unblock the workflow so it can register the handler
ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "unblock", nil)
// Send an update after the handler is registered
updateHandle, err := ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "update")
ts.NoError(err)
ts.NoError(updateHandle.Get(ctx, nil))
// Unblock the workflow so it can complete
ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "unblock", nil)
// Get the result
var result int
ts.NoError(run.Get(ctx, &result))
ts.Equal(1, result)
// Now test the SDK flag
iter := ts.client.GetWorkflowHistory(ctx, run.GetID(), run.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
flagsSet := make([][]uint32, 0)
for iter.HasNext() {
event, err := iter.Next()
ts.NoError(err)
taskCompleted := event.GetWorkflowTaskCompletedEventAttributes()
if taskCompleted != nil {
flagsSet = append(flagsSet, taskCompleted.GetSdkMetadata().GetLangUsedFlags())
}
}
priorityUpdateHandlingFlag := 4
// The first workflow task should not have the flag set
ts.NotContains(flagsSet[0], priorityUpdateHandlingFlag)
// The second workflow task should have the flag set
ts.NotContains(flagsSet[1], priorityUpdateHandlingFlag)
}

func (ts *IntegrationTestSuite) TestUpdateOrdering() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
Expand Down Expand Up @@ -2715,6 +2815,56 @@ func (ts *IntegrationTestSuite) TestUpdateAlwaysHandled() {
ts.Equal(1, result)
}

func (ts *IntegrationTestSuite) TestUpdateRejected() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
options := ts.startWorkflowOptions("test-update-rejected")
options.StartDelay = time.Hour
run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.UpdateRejectedWithOtherGoRoutine)
ts.NoError(err)
// Send an update we expect to be rejected before the first workflow task
handle, err := ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "update")
ts.NoError(err)
ts.Error(handle.Get(ctx, nil))
ts.NoError(run.Get(ctx, nil))
}

func (ts *IntegrationTestSuite) TestUpdateSettingHandlerInGoroutine() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
options := ts.startWorkflowOptions("test-update-setting-handler-in-goroutine")
options.StartDelay = time.Hour
run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.UpdateSettingHandlerInGoroutine)
ts.NoError(err)
// Send an update handler in a workflow goroutine, this should be accepted
handle, err := ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "update")
ts.NoError(err)
ts.NoError(handle.Get(ctx, nil))
ts.NoError(run.Get(ctx, nil))
}

func (ts *IntegrationTestSuite) TestUpdateSettingHandlerInHandler() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
options := ts.startWorkflowOptions("test-update-setting-handler-in-handler")
options.StartDelay = time.Hour
run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.UpdateSettingHandlerInHandler)
ts.NoError(err)
// Expect this to fail because the handler is not set yet
handle, err := ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "inner update")
ts.NoError(err)
ts.Error(handle.Get(ctx, nil))
// Send an update that should register a new handler for "inner update"
handle, err = ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "update")
ts.NoError(err)
ts.NoError(handle.Get(ctx, nil))
// Expect this to succeed because the handler is set now
handle, err = ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "inner update")
ts.NoError(err)
ts.NoError(handle.Get(ctx, nil))
ts.NoError(run.Get(ctx, nil))
}

func (ts *IntegrationTestSuite) TestSessionOnWorkerFailure() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
Expand Down
Loading

0 comments on commit 34ef0a7

Please sign in to comment.