diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index 681495869d8..8d868e9f49c 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -338,6 +338,7 @@ Flags: --stream_health_buffer_size uint max streaming health entries to buffer per streaming health client (default 20) --table-refresh-interval int interval in milliseconds to refresh tables in status page with refreshRequired class --table_gc_lifecycle string States for a DROP TABLE garbage collection cycle. Default is 'hold,purge,evac,drop', use any subset ('drop' implicitly always included) (default "hold,purge,evac,drop") + --tablet-filter-tags StringMap Specifies a comma-separated list of tablet tags (as key:value pairs) to filter the tablets to watch. --tablet_dir string The directory within the vtdataroot to store vttablet/mysql files. Defaults to being generated by the tablet uid. --tablet_filters strings Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch. --tablet_health_keep_alive duration close streaming tablet health connection if there are no requests for this long (default 5m0s) diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index 8b5ca0a7cf0..16f261194ca 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -196,6 +196,7 @@ Flags: --stderrthreshold severityFlag logs at or above this threshold go to stderr (default 1) --stream_buffer_size int the number of bytes sent from vtgate for each stream call. It's recommended to keep this value in sync with vttablet's query-server-config-stream-buffer-size. (default 32768) --table-refresh-interval int interval in milliseconds to refresh tables in status page with refreshRequired class + --tablet-filter-tags StringMap Specifies a comma-separated list of tablet tags (as key:value pairs) to filter the tablets to watch. --tablet_filters strings Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch. --tablet_grpc_ca string the server ca to use to validate servers when connecting --tablet_grpc_cert string the cert to use to connect diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 46d92c7364e..70799b0f6bc 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -48,6 +48,7 @@ import ( "github.com/spf13/pflag" "golang.org/x/sync/semaphore" + "vitess.io/vitess/go/flagutil" "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" @@ -82,6 +83,9 @@ var ( // tabletFilters are the keyspace|shard or keyrange filters to apply to the full set of tablets. tabletFilters []string + // tabletFilterTags are the tablet tag filters (as key:value pairs) to apply to the full set of tablets. + tabletFilterTags flagutil.StringMapValue + // refreshInterval is the interval at which healthcheck refreshes its list of tablets from topo. refreshInterval = 1 * time.Minute @@ -164,6 +168,7 @@ func init() { func registerDiscoveryFlags(fs *pflag.FlagSet) { fs.StringSliceVar(&tabletFilters, "tablet_filters", []string{}, "Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch.") + fs.Var(&tabletFilterTags, "tablet-filter-tags", "Specifies a comma-separated list of tablet tags (as key:value pairs) to filter the tablets to watch.") fs.Var((*topoproto.TabletTypeListFlag)(&AllowedTabletTypes), "allowed_tablet_types", "Specifies the tablet types this vtgate is allowed to route queries to. Should be provided as a comma-separated set of tablet types.") fs.StringSliceVar(&KeyspacesToWatch, "keyspaces_to_watch", []string{}, "Specifies which keyspaces this vtgate should have access to while routing queries or accessing the vschema.") } @@ -337,13 +342,13 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur loadTabletsTrigger: make(chan struct{}), } var topoWatchers []*TopologyWatcher - var filter TabletFilter cells := strings.Split(cellsToWatch, ",") if cellsToWatch == "" { cells = append(cells, localCell) } for _, c := range cells { + var filters TabletFilters log.Infof("Setting up healthcheck for cell: %v", c) if c == "" { continue @@ -357,11 +362,14 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur if err != nil { log.Exitf("Cannot parse tablet_filters parameter: %v", err) } - filter = fbs + filters = append(filters, fbs) } else if len(KeyspacesToWatch) > 0 { - filter = NewFilterByKeyspace(KeyspacesToWatch) + filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch)) + } + if len(tabletFilterTags) > 0 { + filters = append(filters, NewFilterByTabletTags(tabletFilterTags)) } - topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency)) + topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency)) } hc.topoWatchers = topoWatchers diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index 0b69ecb6a63..64346d524ad 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -274,6 +274,19 @@ type TabletFilter interface { IsIncluded(tablet *topodata.Tablet) bool } +// TabletFilters contains filters for tablets. +type TabletFilters []TabletFilter + +// IsIncluded returns true if a tablet passes all filters. +func (tf TabletFilters) IsIncluded(tablet *topodata.Tablet) bool { + for _, filter := range tf { + if !filter.IsIncluded(tablet) { + return false + } + } + return true +} + // FilterByShard is a filter that filters tablets by // keyspace/shard. type FilterByShard struct { @@ -375,3 +388,32 @@ func (fbk *FilterByKeyspace) IsIncluded(tablet *topodata.Tablet) bool { _, exist := fbk.keyspaces[tablet.Keyspace] return exist } + +// FilterByTabletTags is a filter that filters tablets by tablet tag key/values. +type FilterByTabletTags struct { + tags map[string]string +} + +// NewFilterByTabletTags creates a new FilterByTabletTags. All tablets that match +// all tablet tags will be forwarded to the TopologyWatcher's consumer. +func NewFilterByTabletTags(tabletTags map[string]string) *FilterByTabletTags { + return &FilterByTabletTags{ + tags: tabletTags, + } +} + +// IsIncluded returns true if the tablet's tags match what we expect. +func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodata.Tablet) bool { + if fbtg.tags == nil { + return true + } + if tablet.Tags == nil { + return false + } + for key, val := range fbtg.tags { + if tabletVal, found := tablet.Tags[key]; !found || tabletVal != val { + return false + } + } + return true +} diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index 95c6e44ec43..ad45ef92ebe 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -393,7 +393,7 @@ func TestFilterByKeyspace(t *testing.T) { ctx := utils.LeakCheckContext(t) hc := NewFakeHealthCheck(nil) - f := NewFilterByKeyspace(testKeyspacesToWatch) + f := TabletFilters{NewFilterByKeyspace(testKeyspacesToWatch)} ts := memorytopo.NewServer(ctx, testCell) defer ts.Close() tw := NewTopologyWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5) @@ -476,7 +476,7 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) { defer fhc.Close() topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() - f := NewFilterByKeyspace(testKeyspacesToWatch) + f := TabletFilters{NewFilterByKeyspace(testKeyspacesToWatch)} tw := NewTopologyWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5) counts = checkOpCounts(t, counts, map[string]int64{}) @@ -578,6 +578,33 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) { tw.Stop() } +func TestNewFilterByTabletTags(t *testing.T) { + // no required tags == true + filter := NewFilterByTabletTags(nil) + assert.True(t, filter.IsIncluded(&topodatapb.Tablet{})) + + tags := map[string]string{ + "instance_type": "i3.xlarge", + "some_key": "some_value", + } + filter = NewFilterByTabletTags(tags) + + assert.False(t, filter.IsIncluded(&topodatapb.Tablet{ + Tags: nil, + })) + assert.False(t, filter.IsIncluded(&topodatapb.Tablet{ + Tags: map[string]string{}, + })) + assert.False(t, filter.IsIncluded(&topodatapb.Tablet{ + Tags: map[string]string{ + "instance_type": "i3.xlarge", + }, + })) + assert.True(t, filter.IsIncluded(&topodatapb.Tablet{ + Tags: tags, + })) +} + func TestGetTabletErrorDoesNotRemoveFromHealthcheck(t *testing.T) { ctx := utils.LeakCheckContext(t)