Skip to content

Commit

Permalink
Adding in additional header to determine a more stable isolation-group (
Browse files Browse the repository at this point in the history
#1252)

* Adding in additional header as a config option to determine a more stable isolation-group
  • Loading branch information
davidporter-id-au authored Jun 30, 2023
1 parent 407a703 commit 58f9746
Show file tree
Hide file tree
Showing 9 changed files with 753 additions and 6 deletions.
2 changes: 2 additions & 0 deletions evictiontest/workflow_cache_eviction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func callOptions() []interface{} {
gomock.Any(), // feature version
gomock.Any(), // client name
gomock.Any(), // feature flags
gomock.Any(), // isolation group
}
}

Expand Down Expand Up @@ -176,6 +177,7 @@ func (s *CacheEvictionSuite) TestResetStickyOnEviction() {
workflowWorker := internal.NewWorker(s.service, "test-domain", "tasklist", worker.Options{
DisableActivityWorker: true,
Logger: zaptest.NewLogger(s.T()),
IsolationGroup: "zone-1",
})
// this is an arbitrary workflow we use for this test
// NOTE: a simple helloworld that doesn't execute an activity
Expand Down
5 changes: 5 additions & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"context"
"fmt"
"time"
"go.uber.org/cadence/internal/common/isolationgroup"

"github.com/opentracing/opentracing-go"
"github.com/uber-go/tally"
Expand Down Expand Up @@ -358,6 +359,7 @@ type (
ClientOptions struct {
MetricsScope tally.Scope
Identity string
IsolationGroup string
DataConverter DataConverter
Tracer opentracing.Tracer
ContextPropagators []ContextPropagator
Expand Down Expand Up @@ -578,6 +580,9 @@ func NewClient(service workflowserviceclient.Interface, domain string, options *
if options != nil && options.Authorization != nil {
service = auth.NewWorkflowServiceWrapper(service, options.Authorization)
}
if options != nil && options.IsolationGroup != "" {
service = isolationgroup.NewWorkflowServiceWrapper(service, options.IsolationGroup)
}
service = metrics.NewWorkflowServiceWrapper(service, metricScope)
return &workflowClient{
workflowService: service,
Expand Down
295 changes: 295 additions & 0 deletions internal/common/isolationgroup/service_wrapper.go

Large diffs are not rendered by default.

405 changes: 405 additions & 0 deletions internal/common/isolationgroup/service_wrapper_test.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"sync"
"time"

"go.uber.org/cadence/internal/common/isolationgroup"

"github.com/opentracing/opentracing-go"
"github.com/pborman/uuid"
"github.com/uber-go/tally"
Expand Down Expand Up @@ -992,6 +994,9 @@ func newAggregatedWorker(
if options.Authorization != nil {
service = auth.NewWorkflowServiceWrapper(service, options.Authorization)
}
if options.IsolationGroup != "" {
service = isolationgroup.NewWorkflowServiceWrapper(service, options.IsolationGroup)
}
service = metrics.NewWorkflowServiceWrapper(service, workerParams.MetricsScope)
processTestTags(&wOptions, &workerParams)

Expand Down
12 changes: 6 additions & 6 deletions internal/internal_worker_interfaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,11 @@ func (s *InterfacesTestSuite) TestInterface() {

// mocks
s.service.EXPECT().DescribeDomain(gomock.Any(), gomock.Any(), callOptions()...).Return(domainDesc, nil).AnyTimes()
s.service.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), callOptions()...).Return(&m.PollForActivityTaskResponse{}, nil).AnyTimes()
s.service.EXPECT().RespondActivityTaskCompleted(gomock.Any(), gomock.Any(), callOptions()...).Return(nil).AnyTimes()
s.service.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), callOptions()...).Return(&m.PollForDecisionTaskResponse{}, nil).AnyTimes()
s.service.EXPECT().RespondDecisionTaskCompleted(gomock.Any(), gomock.Any(), callOptions()...).Return(nil, nil).AnyTimes()
s.service.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), callOptions()...).Return(&m.StartWorkflowExecutionResponse{}, nil).AnyTimes()
s.service.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), callOptionsWithIsolationGroupHeader()...).Return(&m.PollForActivityTaskResponse{}, nil).AnyTimes()
s.service.EXPECT().RespondActivityTaskCompleted(gomock.Any(), gomock.Any(), callOptionsWithIsolationGroupHeader()...).Return(nil).AnyTimes()
s.service.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), callOptionsWithIsolationGroupHeader()...).Return(&m.PollForDecisionTaskResponse{}, nil).AnyTimes()
s.service.EXPECT().RespondDecisionTaskCompleted(gomock.Any(), gomock.Any(), callOptionsWithIsolationGroupHeader()...).Return(nil, nil).AnyTimes()
s.service.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), callOptionsWithIsolationGroupHeader()...).Return(&m.StartWorkflowExecutionResponse{}, nil).AnyTimes()

registry := newRegistry()
// Launch worker.
Expand Down Expand Up @@ -231,7 +231,7 @@ func (s *InterfacesTestSuite) TestInterface() {
ExecutionStartToCloseTimeout: 10 * time.Second,
DecisionTaskStartToCloseTimeout: 10 * time.Second,
}
workflowClient := NewClient(s.service, domain, nil)
workflowClient := NewClient(s.service, domain, &ClientOptions{IsolationGroup: "zone-2"})
_, err := workflowClient.StartWorkflow(context.Background(), workflowOptions, "workflowType")
s.NoError(err)
}
12 changes: 12 additions & 0 deletions internal/test_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,15 @@ func callOptions() []interface{} {
gomock.Any(), // feature flags
}
}

// this is the mock for yarpcCallOptions, as gomock requires the num of arguments to be the same.
// see getYarpcCallOptions for the default case.
func callOptionsWithIsolationGroupHeader() []interface{} {
return []interface{}{
gomock.Any(), // library version
gomock.Any(), // feature version
gomock.Any(), // client name
gomock.Any(), // feature flags
gomock.Any(), // isolation group header
}
}
3 changes: 3 additions & 0 deletions internal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ type (
// default: default identity that include hostname, groupName and process ID.
Identity string

// Optional: Defines the 'zone' or the failure group that the worker belongs to
IsolationGroup string

// Optional: Metrics to be reported. Metrics emitted by the cadence client are not prometheus compatible by
// default. To ensure metrics are compatible with prometheus make sure to create tally scope with sanitizer
// options set.
Expand Down
20 changes: 20 additions & 0 deletions internal/workflow_replayer_utils.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
// Copyright (c) 2017-2021 Uber Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package internal

import (
Expand Down

0 comments on commit 58f9746

Please sign in to comment.