Skip to content

Commit

Permalink
Don't treat missing PartitionConfig data as an error
Browse files Browse the repository at this point in the history
We intentionally set the isolation group to "" when we leak tasks, which causes forwarded tasks to report an error resolving the isolation group.

Additionally update the matching simulation to use the isolation group based load balancer and provide a default for the QpsTrackerInterval. 0s results in a panic on service startup.

Additionally specify an Identity on requests in the matching simulation so that poller history is tracked.
  • Loading branch information
natemort committed Dec 20, 2024
1 parent b22774a commit 61f718f
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 8 deletions.
9 changes: 2 additions & 7 deletions common/partition/default-partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ package partition

import (
"context"
"errors"
"fmt"
"slices"

Expand All @@ -48,10 +47,6 @@ var (
IsolationLeakCauseExpired = metrics.IsolationLeakCause("expired")
)

// ErrInvalidPartitionConfig is returned when the required partitioning configuration
// is missing due to misconfiguration
var ErrInvalidPartitionConfig = errors.New("invalid partition config")

// DefaultWorkflowPartitionConfig Is the default dataset expected to be passed around in the
// execution records for workflows which is used for partitioning. It contains the IsolationGroup
// where the workflow was started, and is expected to be pinned, and a workflow ID for a fallback means
Expand Down Expand Up @@ -80,11 +75,11 @@ func NewDefaultPartitioner(

func (r *defaultPartitioner) GetIsolationGroupByDomainID(ctx context.Context, scope metrics.Scope, pollerInfo PollerInfo, wfPartitionData PartitionConfig) (string, error) {
if wfPartitionData == nil {
return "", ErrInvalidPartitionConfig
return "", nil
}
wfPartition := mapPartitionConfigToDefaultPartitionConfig(wfPartitionData)
if wfPartition.WorkflowStartIsolationGroup == "" || wfPartition.WFID == "" {
return "", ErrInvalidPartitionConfig
return "", nil
}

isolationGroups, err := r.isolationGroupState.IsolationGroupsByDomainID(ctx, pollerInfo.DomainID)
Expand Down
10 changes: 9 additions & 1 deletion host/matching_simulation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestMatchingSimulation(t *testing.T) {
dynamicconfig.MatchingPartitionUpscaleSustainedDuration: clusterConfig.MatchingConfig.SimulationConfig.PartitionUpscaleSustainedDuration,
dynamicconfig.MatchingPartitionDownscaleSustainedDuration: clusterConfig.MatchingConfig.SimulationConfig.PartitionDownscaleSustainedDuration,
dynamicconfig.MatchingAdaptiveScalerUpdateInterval: clusterConfig.MatchingConfig.SimulationConfig.AdaptiveScalerUpdateInterval,
dynamicconfig.MatchingQPSTrackerInterval: clusterConfig.MatchingConfig.SimulationConfig.QPSTrackerInterval,
dynamicconfig.MatchingQPSTrackerInterval: getQpsTrackerInterval(clusterConfig.MatchingConfig.SimulationConfig.QPSTrackerInterval),
dynamicconfig.TaskIsolationDuration: clusterConfig.MatchingConfig.SimulationConfig.TaskIsolationDuration,
}

Expand Down Expand Up @@ -426,6 +426,7 @@ func (s *MatchingSimulationSuite) poll(
Name: tasklist,
Kind: types.TaskListKindNormal.Ptr(),
},
Identity: pollerID,
},
IsolationGroup: pollerConfig.getIsolationGroup(),
})
Expand Down Expand Up @@ -672,6 +673,13 @@ func getPartitionTaskListName(root string, partition int) string {
return fmt.Sprintf("%v%v/%v", common.ReservedTaskListPrefix, root, partition)
}

func getQpsTrackerInterval(duration time.Duration) time.Duration {
if duration == 0 {
return 10 * time.Second
}
return duration
}

func randomlyPickKey(weights map[string]int) string {
// Calculate the total weight
totalWeight := 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ matchingconfig:
localpollwaittime: 10ms
localtaskwaittime: 10ms
taskisolationduration: 1s
tasklistloadbalancerstrategy: isolation
tasks:
- numtaskgenerators: 30
taskspersecond: 500
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ matchingconfig:
localpollwaittime: 10ms
localtaskwaittime: 10ms
taskisolationduration: 1s
tasklistloadbalancerstrategy: isolation
tasks:
- numtaskgenerators: 30
taskspersecond: 500
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ matchingconfig:
localpollwaittime: 10ms
localtaskwaittime: 10ms
taskisolationduration: 1s
tasklistloadbalancerstrategy: isolation
tasks:
- numtaskgenerators: 30
taskspersecond: 500
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ matchingconfig:
localpollwaittime: 10ms
localtaskwaittime: 10ms
taskisolationduration: 1s
tasklistloadbalancerstrategy: isolation
tasks:
- numtaskgenerators: 10
taskspersecond: 180
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ matchingconfig:
localpollwaittime: 10ms
localtaskwaittime: 10ms
taskisolationduration: 1s
tasklistloadbalancerstrategy: isolation
tasks:
- numtaskgenerators: 3
taskspersecond: 50
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ matchingconfig:
localpollwaittime: 10ms
localtaskwaittime: 10ms
taskisolationduration: 1s
tasklistloadbalancerstrategy: isolation
tasks:
- numtaskgenerators: 10
taskspersecond: 180
Expand Down

0 comments on commit 61f718f

Please sign in to comment.