Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/corrects replication in clusters of different sizes #5567

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,17 @@ func (adh *adminHandlerImpl) GetReplicationMessages(
return nil, adh.error(errClusterNameNotSet, scope)
}

resp, err = adh.GetHistoryRawClient().GetReplicationMessages(ctx, request)
// Ensures that the request is able to be fulfilled by the current cluster.
// If this cluster receives a request for replication from shards that are
// higher than what it can handle, it'll drop them.
filteredRequest := filterReplicationShards(adh.config.NumHistoryShards, request)
if len(filteredRequest.Tokens) != len(request.Tokens) {
adh.GetLogger().Warn("Warning! Received replication request from a cluster with a greater number of shards."+
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

including the number of shards in current and requesting cluster to the log would be useful

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no obvious way (that I'm aware of) to get the requesting cluster's shard count, else I would

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely a useful log message to have, but unfortunately, it also means that there is a high degree of frequency with which it gets logged. For example, if you are going from 1k shards to 8k (e.g., MySQL to Cassandra), you will get 7k of these logged on every replication cycle.

Could we change it to either be debug or only log once per cluster/etc? I've attached a screenshot of what the logs look like for a very small reproduction case (1 vs 4 shards) I put together in Docker Compose using this PR's patch on top of v1.2.6

image

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for not being on the ball with this one.

That's way too much noise, agreed. when you're running that however, is the active side on the larger cluster? that should be the scenario that's broken and will not work.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for digging into it! In this case, the active side is the lower shard count one (with an unfortunate name due to historical reasons). We actually didn't have any domains set up to replicate - these logs were happening out of the box.

I'm happy to share my code, if that would help with testing. It's a fairly concise docker-compose with 3 clusters (one with fewer shards) set up to replicate

Copy link
Member Author

@davidporter-id-au davidporter-id-au Jan 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah if you could post the config that'd be helpful for me to understand what's going on. I'm blocked from testing it more internally due to some other PR we need to fix, but otherwise I'll be able to take more of a look hopefully later next week

"Some workflows will not be replicated they are active in this larger cluster"+
"and are intended to be replicated here", tag.ClusterName(request.ClusterName))
}

resp, err = adh.GetHistoryRawClient().GetReplicationMessages(ctx, filteredRequest)
if err != nil {
return nil, adh.error(err, scope)
}
Expand Down Expand Up @@ -1810,3 +1820,30 @@ func convertFilterListToMap(filters []*types.DynamicConfigFilter) (map[dc.Filter
}
return newFilters, nil
}

// Ensures that shards received are within the range of the server,
// or they'll be filtered out. This necessarily means that replication requests
// that are for shards that don't exist are dropped.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the replication is fundamentally can't be done completely in such setups, should we even attempt to do the partial replication?

Copy link
Member Author

@davidporter-id-au davidporter-id-au Dec 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be - as far as I can tell, successfully be done when scaling a cluster up, in terms of the number of shards. Just not down. This strikes me as useful.

I don't think there's any value in doing partial replication, it's just hard to prevent it in such scenarios. The poller information doesn't presently convey the shard count, and such an API change seems like overkill.

//
// Replication is therefore the min(clusterA.NumberHistoryShards, clusterB.NumberHistoryShards).
taylanisikdemir marked this conversation as resolved.
Show resolved Hide resolved
// Shards that are above this limit will just not be replicated. This is a
// compromise because I've not found any way to translate shards from higher-shards
// to lower ones.
//
// Simple division (such as divide by 2) doesn't work because each
// replication response returns offsets for the shard (like kafka), so that
// consumers can manage their own offset locations. Any attempts to do
// reductions or translations would require keeping track of the offsets
// of the translated shards.
func filterReplicationShards(maxShards int, req *types.GetReplicationMessagesRequest) *types.GetReplicationMessagesRequest {
var out []*types.ReplicationToken
for _, v := range req.Tokens {
if int(v.ShardID) < maxShards {
out = append(out, v)
}
}
return &types.GetReplicationMessagesRequest{
Tokens: out,
ClusterName: req.ClusterName,
}
}
68 changes: 68 additions & 0 deletions service/frontend/adminHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,3 +1038,71 @@ func Test_UpdateDomainIsolationGroups(t *testing.T) {
})
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving this comment here as a reminder to update documentation https://github.com/uber/Cadence-Docs/pull/181/files

func TestShardFiltering(t *testing.T) {

tests := map[string]struct {
input *types.GetReplicationMessagesRequest
expectedOut *types.GetReplicationMessagesRequest
expectedErr error
}{
"Normal replication - no changes whatsoever expected - replication from 4 shards to 4 shards clusters": {
input: &types.GetReplicationMessagesRequest{
Tokens: []*types.ReplicationToken{
{ShardID: 0, LastRetrievedMessageID: 0, LastProcessedMessageID: 0},
{ShardID: 1, LastRetrievedMessageID: 1, LastProcessedMessageID: 1},
{ShardID: 2, LastRetrievedMessageID: 2, LastProcessedMessageID: 2},
{ShardID: 3, LastRetrievedMessageID: 3, LastProcessedMessageID: 3},
},
ClusterName: "cluster2-with-4-shards",
},
expectedOut: &types.GetReplicationMessagesRequest{
Tokens: []*types.ReplicationToken{
{ShardID: 0, LastRetrievedMessageID: 0, LastProcessedMessageID: 0},
{ShardID: 1, LastRetrievedMessageID: 1, LastProcessedMessageID: 1},
{ShardID: 2, LastRetrievedMessageID: 2, LastProcessedMessageID: 2},
{ShardID: 3, LastRetrievedMessageID: 3, LastProcessedMessageID: 3},
},
ClusterName: "cluster2-with-4-shards",
},
},
"filtering replication - the new cluster's asking for shards that aren't present": {
input: &types.GetReplicationMessagesRequest{
Tokens: []*types.ReplicationToken{
{ShardID: 3, LastRetrievedMessageID: 3, LastProcessedMessageID: 3},
{ShardID: 4, LastRetrievedMessageID: 0, LastProcessedMessageID: 0},
{ShardID: 5, LastRetrievedMessageID: 1, LastProcessedMessageID: 1},
{ShardID: 6, LastRetrievedMessageID: 2, LastProcessedMessageID: 2},
},
ClusterName: "cluster2-with-8-shards",
},
expectedOut: &types.GetReplicationMessagesRequest{
Tokens: []*types.ReplicationToken{
{ShardID: 3, LastRetrievedMessageID: 3, LastProcessedMessageID: 3},
},
ClusterName: "cluster2-with-8-shards",
},
},
}

for name, td := range tests {
t.Run(name, func(t *testing.T) {
goMock := gomock.NewController(t)
mockhistoryclient := history.NewMockClient(goMock)

mockhistoryclient.EXPECT().GetReplicationMessages(gomock.Any(), td.expectedOut).Return(&types.GetReplicationMessagesResponse{}, nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mock line validates GetReplicationMessages is called with tc.expectedOut or does it need Times(1)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think no explicit Times is equivalent to MinTimes(1)... but that's probably worth verifying.


handler := adminHandlerImpl{
config: &Config{NumHistoryShards: 4},
Resource: &resource.Test{
Logger: testlogger.New(t),
MetricsClient: metrics.NewNoopMetricsClient(),
HistoryClient: mockhistoryclient,
},
}

_, err := handler.GetReplicationMessages(context.Background(), td.input)
assert.Equal(t, td.expectedErr, err)
})
}
}