diff --git a/changelog/19.0/19.0.0/summary.md b/changelog/19.0/19.0.0/summary.md
index 5d413c25cae..d9f655ecbc2 100644
--- a/changelog/19.0/19.0.0/summary.md
+++ b/changelog/19.0/19.0.0/summary.md
@@ -12,6 +12,7 @@
### Deprecations and Deletions
- The `MYSQL_FLAVOR` environment variable is now removed from all Docker Images.
+- VTTablet metrics for TxThrottler's topology watchers have been deprecated. They will be deleted in the next release.
### Docker
diff --git a/go/vt/discovery/replicationlag.go b/go/vt/discovery/replicationlag.go
index e7afa5ca844..9592440196a 100644
--- a/go/vt/discovery/replicationlag.go
+++ b/go/vt/discovery/replicationlag.go
@@ -111,13 +111,13 @@ func SetMinNumTablets(numTablets int) {
minNumTablets.Set(numTablets)
}
-// IsReplicationLagHigh verifies that the given LegacytabletHealth refers to a tablet with high
+// IsReplicationLagHigh verifies that the given TabletHealth refers to a tablet with high
// replication lag, i.e. higher than the configured discovery_low_replication_lag flag.
func IsReplicationLagHigh(tabletHealth *TabletHealth) bool {
return float64(tabletHealth.Stats.ReplicationLagSeconds) > lowReplicationLag.Get().Seconds()
}
-// IsReplicationLagVeryHigh verifies that the given LegacytabletHealth refers to a tablet with very high
+// IsReplicationLagVeryHigh verifies that the given TabletHealth refers to a tablet with very high
// replication lag, i.e. higher than the configured discovery_high_replication_lag_minimum_serving flag.
func IsReplicationLagVeryHigh(tabletHealth *TabletHealth) bool {
return float64(tabletHealth.Stats.ReplicationLagSeconds) > highReplicationLagMinServing.Get().Seconds()
@@ -153,7 +153,7 @@ func FilterStatsByReplicationLag(tabletHealthList []*TabletHealth) []*TabletHeal
return filterStatsByLag(tabletHealthList)
}
res := filterStatsByLagWithLegacyAlgorithm(tabletHealthList)
- // run the filter again if exactly one tablet is removed,
+ // Run the filter again if exactly one tablet is removed,
// and we have spare tablets.
if len(res) > minNumTablets.Get() && len(res) == len(tabletHealthList)-1 {
res = filterStatsByLagWithLegacyAlgorithm(res)
@@ -164,12 +164,12 @@ func FilterStatsByReplicationLag(tabletHealthList []*TabletHealth) []*TabletHeal
func filterStatsByLag(tabletHealthList []*TabletHealth) []*TabletHealth {
list := make([]tabletLagSnapshot, 0, len(tabletHealthList))
- // filter non-serving tablets and those with very high replication lag
+ // Filter out non-serving tablets and those with very high replication lag.
for _, ts := range tabletHealthList {
if !ts.Serving || ts.LastError != nil || ts.Stats == nil || IsReplicationLagVeryHigh(ts) {
continue
}
- // Pull the current replication lag for a stable sort later.
+ // Save the current replication lag for a stable sort later.
list = append(list, tabletLagSnapshot{
ts: ts,
replag: ts.Stats.ReplicationLagSeconds})
@@ -178,7 +178,7 @@ func filterStatsByLag(tabletHealthList []*TabletHealth) []*TabletHealth {
// Sort by replication lag.
sort.Sort(tabletLagSnapshotList(list))
- // Pick those with low replication lag, but at least minNumTablets tablets regardless.
+ // Pick tablets with low replication lag, but at least minNumTablets tablets regardless.
res := make([]*TabletHealth, 0, len(list))
for i := 0; i < len(list); i++ {
if !IsReplicationLagHigh(list[i].ts) || i < minNumTablets.Get() {
@@ -190,7 +190,7 @@ func filterStatsByLag(tabletHealthList []*TabletHealth) []*TabletHealth {
func filterStatsByLagWithLegacyAlgorithm(tabletHealthList []*TabletHealth) []*TabletHealth {
list := make([]*TabletHealth, 0, len(tabletHealthList))
- // filter non-serving tablets
+ // Filter out non-serving tablets.
for _, ts := range tabletHealthList {
if !ts.Serving || ts.LastError != nil || ts.Stats == nil {
continue
@@ -200,7 +200,7 @@ func filterStatsByLagWithLegacyAlgorithm(tabletHealthList []*TabletHealth) []*Ta
if len(list) <= 1 {
return list
}
- // if all have low replication lag (<=30s), return all tablets.
+ // If all tablets have low replication lag (<=30s), return all of them.
allLowLag := true
for _, ts := range list {
if IsReplicationLagHigh(ts) {
@@ -211,12 +211,12 @@ func filterStatsByLagWithLegacyAlgorithm(tabletHealthList []*TabletHealth) []*Ta
if allLowLag {
return list
}
- // filter those affecting "mean" lag significantly
- // calculate mean for all tablets
+ // We want to filter out tablets that are affecting "mean" lag significantly.
+ // We first calculate the mean across all tablets.
res := make([]*TabletHealth, 0, len(list))
m, _ := mean(list, -1)
for i, ts := range list {
- // calculate mean by excluding ith tablet
+ // Now we calculate the mean by excluding ith tablet
mi, _ := mean(list, i)
if float64(mi) > float64(m)*0.7 {
res = append(res, ts)
@@ -225,9 +225,11 @@ func filterStatsByLagWithLegacyAlgorithm(tabletHealthList []*TabletHealth) []*Ta
if len(res) >= minNumTablets.Get() {
return res
}
- // return at least minNumTablets tablets to avoid over loading,
- // if there is enough tablets with replication lag < highReplicationLagMinServing.
- // Pull the current replication lag for a stable sort.
+
+ // We want to return at least minNumTablets tablets to avoid overloading,
+ // as long as there are enough tablets with replication lag < highReplicationLagMinServing.
+
+ // Save the current replication lag for a stable sort.
snapshots := make([]tabletLagSnapshot, 0, len(list))
for _, ts := range list {
if !IsReplicationLagVeryHigh(ts) {
diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go
index d1bd2d3acf8..b3298f55700 100644
--- a/go/vt/discovery/topology_watcher.go
+++ b/go/vt/discovery/topology_watcher.go
@@ -53,15 +53,15 @@ var (
"Operation", topologyWatcherOpListTablets, topologyWatcherOpGetTablet)
)
-// tabletInfo is used internally by the TopologyWatcher class
+// tabletInfo is used internally by the TopologyWatcher struct.
type tabletInfo struct {
alias string
tablet *topodata.Tablet
}
-// TopologyWatcher polls tablet from a configurable set of tablets
-// periodically. When tablets are added / removed, it calls
-// the LegacyTabletRecorder AddTablet / RemoveTablet interface appropriately.
+// TopologyWatcher polls the topology periodically for changes to
+// the set of tablets. When tablets are added / removed / modified,
+// it calls the AddTablet / RemoveTablet interface appropriately.
type TopologyWatcher struct {
// set at construction time
topoServer *topo.Server
@@ -79,20 +79,21 @@ type TopologyWatcher struct {
// mu protects all variables below
mu sync.Mutex
- // tablets contains a map of alias -> tabletInfo for all known tablets
+ // tablets contains a map of alias -> tabletInfo for all known tablets.
tablets map[string]*tabletInfo
- // topoChecksum stores a crc32 of the tablets map and is exported as a metric
+ // topoChecksum stores a crc32 of the tablets map and is exported as a metric.
topoChecksum uint32
- // lastRefresh records the timestamp of the last topo refresh
+ // lastRefresh records the timestamp of the last refresh of the topology.
lastRefresh time.Time
- // firstLoadDone is true when first load of the topology data is done.
+ // firstLoadDone is true when the initial load of the topology data is complete.
firstLoadDone bool
- // firstLoadChan is closed when the initial loading of topology data is done.
+ // firstLoadChan is closed when the initial load of topology data is complete.
firstLoadChan chan struct{}
}
// NewTopologyWatcher returns a TopologyWatcher that monitors all
-// the tablets in a cell, and starts refreshing.
+// the tablets that it is configured to watch, and reloads them periodically if needed.
+// As of now there is only one implementation: watch all tablets in a cell.
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher {
tw := &TopologyWatcher{
topoServer: topoServer,
@@ -114,14 +115,14 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC
}
// NewCellTabletsWatcher returns a TopologyWatcher that monitors all
-// the tablets in a cell, and starts refreshing.
+// the tablets in a cell, and reloads them as needed.
func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher {
return NewTopologyWatcher(ctx, topoServer, hc, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) {
return tw.topoServer.GetTabletAliasesByCell(ctx, tw.cell)
})
}
-// Start starts the topology watcher
+// Start starts the topology watcher.
func (tw *TopologyWatcher) Start() {
tw.wg.Add(1)
go func(t *TopologyWatcher) {
@@ -140,7 +141,7 @@ func (tw *TopologyWatcher) Start() {
}(tw)
}
-// Stop stops the watcher. It does not clean up the tablets added to LegacyTabletRecorder.
+// Stop stops the watcher. It does not clean up the tablets added to HealthCheck.
func (tw *TopologyWatcher) Stop() {
tw.cancelFunc()
// wait for watch goroutine to finish.
@@ -151,7 +152,7 @@ func (tw *TopologyWatcher) loadTablets() {
var wg sync.WaitGroup
newTablets := make(map[string]*tabletInfo)
- // first get the list of relevant tabletAliases
+ // First get the list of relevant tabletAliases.
tabletAliases, err := tw.getTablets(tw)
topologyWatcherOperations.Add(topologyWatcherOpListTablets, 1)
if err != nil {
@@ -166,7 +167,7 @@ func (tw *TopologyWatcher) loadTablets() {
}
// Accumulate a list of all known alias strings to use later
- // when sorting
+ // when sorting.
tabletAliasStrs := make([]string, 0, len(tabletAliases))
tw.mu.Lock()
@@ -175,7 +176,7 @@ func (tw *TopologyWatcher) loadTablets() {
tabletAliasStrs = append(tabletAliasStrs, aliasStr)
if !tw.refreshKnownTablets {
- // we already have a tabletInfo for this and the flag tells us to not refresh
+ // We already have a tabletInfo for this and the flag tells us to not refresh.
if val, ok := tw.tablets[aliasStr]; ok {
newTablets[aliasStr] = val
continue
@@ -188,7 +189,7 @@ func (tw *TopologyWatcher) loadTablets() {
tw.sem <- 1 // Wait for active queue to drain.
tablet, err := tw.topoServer.GetTablet(tw.ctx, alias)
topologyWatcherOperations.Add(topologyWatcherOpGetTablet, 1)
- <-tw.sem // Done; enable next request to run
+ <-tw.sem // Done; enable next request to run.
if err != nil {
topologyWatcherErrors.Add(topologyWatcherOpGetTablet, 1)
select {
@@ -218,7 +219,7 @@ func (tw *TopologyWatcher) loadTablets() {
continue
}
- // trust the alias from topo and add it if it doesn't exist
+ // Trust the alias from topo and add it if it doesn't exist.
if val, ok := tw.tablets[alias]; ok {
// check if the host and port have changed. If yes, replace tablet.
oldKey := TabletToMapKey(val.tablet)
@@ -230,7 +231,7 @@ func (tw *TopologyWatcher) loadTablets() {
topologyWatcherOperations.Add(topologyWatcherOpReplaceTablet, 1)
}
} else {
- // This is a new tablet record, let's add it to the healthcheck
+ // This is a new tablet record, let's add it to the HealthCheck.
tw.healthcheck.AddTablet(newVal.tablet)
topologyWatcherOperations.Add(topologyWatcherOpAddTablet, 1)
}
@@ -252,8 +253,8 @@ func (tw *TopologyWatcher) loadTablets() {
close(tw.firstLoadChan)
}
- // iterate through the tablets in a stable order and compute a
- // checksum of the tablet map
+ // Iterate through the tablets in a stable order and compute a
+ // checksum of the tablet map.
sort.Strings(tabletAliasStrs)
var buf bytes.Buffer
for _, alias := range tabletAliasStrs {
@@ -269,7 +270,7 @@ func (tw *TopologyWatcher) loadTablets() {
}
-// RefreshLag returns the time since the last refresh
+// RefreshLag returns the time since the last refresh.
func (tw *TopologyWatcher) RefreshLag() time.Duration {
tw.mu.Lock()
defer tw.mu.Unlock()
@@ -277,7 +278,7 @@ func (tw *TopologyWatcher) RefreshLag() time.Duration {
return time.Since(tw.lastRefresh)
}
-// TopoChecksum returns the checksum of the current state of the topo
+// TopoChecksum returns the checksum of the current state of the topo.
func (tw *TopologyWatcher) TopoChecksum() uint32 {
tw.mu.Lock()
defer tw.mu.Unlock()
@@ -286,7 +287,7 @@ func (tw *TopologyWatcher) TopoChecksum() uint32 {
}
// TabletFilter is an interface that can be given to a TopologyWatcher
-// to be applied as an additional filter on the list of tablets returned by its getTablets function
+// to be applied as an additional filter on the list of tablets returned by its getTablets function.
type TabletFilter interface {
// IsIncluded returns whether tablet is included in this filter
IsIncluded(tablet *topodata.Tablet) bool
@@ -300,18 +301,18 @@ type FilterByShard struct {
}
// filterShard describes a filter for a given shard or keyrange inside
-// a keyspace
+// a keyspace.
type filterShard struct {
keyspace string
shard string
keyRange *topodata.KeyRange // only set if shard is also a KeyRange
}
-// NewFilterByShard creates a new FilterByShard on top of an existing
-// LegacyTabletRecorder. Each filter is a keyspace|shard entry, where shard
+// NewFilterByShard creates a new FilterByShard for use by a
+// TopologyWatcher. Each filter is a keyspace|shard entry, where shard
// can either be a shard name, or a keyrange. All tablets that match
-// at least one keyspace|shard tuple will be forwarded to the
-// underlying LegacyTabletRecorder.
+// at least one keyspace|shard tuple will be forwarded by the
+// TopologyWatcher to its consumer.
func NewFilterByShard(filters []string) (*FilterByShard, error) {
m := make(map[string][]*filterShard)
for _, filter := range filters {
@@ -348,8 +349,7 @@ func NewFilterByShard(filters []string) (*FilterByShard, error) {
}, nil
}
-// IsIncluded returns true iff the tablet's keyspace and shard should be
-// forwarded to the underlying LegacyTabletRecorder.
+// IsIncluded returns true iff the tablet's keyspace and shard match what we have.
func (fbs *FilterByShard) IsIncluded(tablet *topodata.Tablet) bool {
canonical, kr, err := topo.ValidateShardName(tablet.Shard)
if err != nil {
@@ -370,15 +370,14 @@ func (fbs *FilterByShard) IsIncluded(tablet *topodata.Tablet) bool {
return false
}
-// FilterByKeyspace is a filter that filters tablets by
-// keyspace
+// FilterByKeyspace is a filter that filters tablets by keyspace.
type FilterByKeyspace struct {
keyspaces map[string]bool
}
// NewFilterByKeyspace creates a new FilterByKeyspace.
// Each filter is a keyspace entry. All tablets that match
-// a keyspace will be forwarded to the underlying LegacyTabletRecorder.
+// a keyspace will be forwarded to the TopologyWatcher's consumer.
func NewFilterByKeyspace(selectedKeyspaces []string) *FilterByKeyspace {
m := make(map[string]bool)
for _, keyspace := range selectedKeyspaces {
@@ -390,8 +389,7 @@ func NewFilterByKeyspace(selectedKeyspaces []string) *FilterByKeyspace {
}
}
-// IsIncluded returns true if the tablet's keyspace should be
-// forwarded to the underlying LegacyTabletRecorder.
+// IsIncluded returns true if the tablet's keyspace matches what we have.
func (fbk *FilterByKeyspace) IsIncluded(tablet *topodata.Tablet) bool {
_, exist := fbk.keyspaces[tablet.Keyspace]
return exist
diff --git a/go/vt/vttablet/tabletserver/txthrottler/mock_topology_watcher_test.go b/go/vt/vttablet/tabletserver/txthrottler/mock_topology_watcher_test.go
deleted file mode 100644
index 163c4c44d4d..00000000000
--- a/go/vt/vttablet/tabletserver/txthrottler/mock_topology_watcher_test.go
+++ /dev/null
@@ -1,58 +0,0 @@
-// Code generated by MockGen. DO NOT EDIT.
-// Source: vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler (interfaces: TopologyWatcherInterface)
-
-// Package txthrottler is a generated GoMock package.
-package txthrottler
-
-import (
- reflect "reflect"
-
- gomock "go.uber.org/mock/gomock"
-)
-
-// MockTopologyWatcherInterface is a mock of TopologyWatcherInterface interface.
-type MockTopologyWatcherInterface struct {
- ctrl *gomock.Controller
- recorder *MockTopologyWatcherInterfaceMockRecorder
-}
-
-// MockTopologyWatcherInterfaceMockRecorder is the mock recorder for MockTopologyWatcherInterface.
-type MockTopologyWatcherInterfaceMockRecorder struct {
- mock *MockTopologyWatcherInterface
-}
-
-// NewMockTopologyWatcherInterface creates a new mock instance.
-func NewMockTopologyWatcherInterface(ctrl *gomock.Controller) *MockTopologyWatcherInterface {
- mock := &MockTopologyWatcherInterface{ctrl: ctrl}
- mock.recorder = &MockTopologyWatcherInterfaceMockRecorder{mock}
- return mock
-}
-
-// EXPECT returns an object that allows the caller to indicate expected use.
-func (m *MockTopologyWatcherInterface) EXPECT() *MockTopologyWatcherInterfaceMockRecorder {
- return m.recorder
-}
-
-// Start mocks base method.
-func (m *MockTopologyWatcherInterface) Start() {
- m.ctrl.T.Helper()
- m.ctrl.Call(m, "Start")
-}
-
-// Start indicates an expected call of Start.
-func (mr *MockTopologyWatcherInterfaceMockRecorder) Start() *gomock.Call {
- mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockTopologyWatcherInterface)(nil).Start))
-}
-
-// Stop mocks base method.
-func (m *MockTopologyWatcherInterface) Stop() {
- m.ctrl.T.Helper()
- m.ctrl.Call(m, "Stop")
-}
-
-// Stop indicates an expected call of Stop.
-func (mr *MockTopologyWatcherInterfaceMockRecorder) Stop() *gomock.Call {
- mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockTopologyWatcherInterface)(nil).Stop))
-}
diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
index 18dede5f30a..f78c65a4587 100644
--- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
+++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
@@ -38,25 +38,20 @@ import (
)
// These vars store the functions used to create the topo server, healthcheck,
-// topology watchers and go/vt/throttler. These are provided here so that they can be overridden
+// and go/vt/throttler. These are provided here so that they can be overridden
// in tests to generate mocks.
type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck
-type topologyWatcherFactoryFunc func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface
type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error)
var (
- healthCheckFactory healthCheckFactoryFunc
- topologyWatcherFactory topologyWatcherFactoryFunc
- throttlerFactory throttlerFactoryFunc
+ healthCheckFactory healthCheckFactoryFunc
+ throttlerFactory throttlerFactoryFunc
)
func resetTxThrottlerFactories() {
healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck {
return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ","))
}
- topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface {
- return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, discovery.NewFilterByKeyspace([]string{keyspace}), cell, refreshInterval, true, topoReadConcurrency)
- }
throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) {
return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now)
}
@@ -149,7 +144,8 @@ type txThrottler struct {
topoServer *topo.Server
// stats
- throttlerRunning *stats.Gauge
+ throttlerRunning *stats.Gauge
+ // TODO(deepthi): deprecated, should be deleted in v20
topoWatchers *stats.GaugesWithSingleLabel
healthChecksReadTotal *stats.CountersWithMultiLabels
healthChecksRecordedTotal *stats.CountersWithMultiLabels
@@ -170,10 +166,9 @@ type txThrottlerStateImpl struct {
// throttleMu serializes calls to throttler.Throttler.Throttle(threadId).
// That method is required to be called in serial for each threadId.
- throttleMu sync.Mutex
- throttler ThrottlerInterface
- stopHealthCheck context.CancelFunc
- topologyWatchers map[string]TopologyWatcherInterface
+ throttleMu sync.Mutex
+ throttler ThrottlerInterface
+ stopHealthCheck context.CancelFunc
healthCheck discovery.HealthCheck
healthCheckChan chan *discovery.TabletHealth
@@ -204,7 +199,7 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler {
config: config,
topoServer: topoServer,
throttlerRunning: env.Exporter().NewGauge(TxThrottlerName+"Running", "transaction throttler running state"),
- topoWatchers: env.Exporter().NewGaugesWithSingleLabel(TxThrottlerName+"TopoWatchers", "transaction throttler topology watchers", "cell"),
+ topoWatchers: env.Exporter().NewGaugesWithSingleLabel(TxThrottlerName+"TopoWatchers", "DEPRECATED: transaction throttler topology watchers", "cell"),
healthChecksReadTotal: env.Exporter().NewCountersWithMultiLabels(TxThrottlerName+"HealthchecksRead", "transaction throttler healthchecks read",
[]string{"cell", "DbType"}),
healthChecksRecordedTotal: env.Exporter().NewCountersWithMultiLabels(TxThrottlerName+"HealthchecksRecorded", "transaction throttler healthchecks recorded",
@@ -322,31 +317,12 @@ func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, t
ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.healthCheckCells)
ts.healthCheckChan = ts.healthCheck.Subscribe()
- ts.topologyWatchers = make(
- map[string]TopologyWatcherInterface, len(ts.healthCheckCells))
- for _, cell := range ts.healthCheckCells {
- ts.topologyWatchers[cell] = topologyWatcherFactory(
- topoServer,
- ts.healthCheck,
- cell,
- target.Keyspace,
- target.Shard,
- discovery.DefaultTopologyWatcherRefreshInterval,
- discovery.DefaultTopoReadConcurrency,
- )
- ts.txThrottler.topoWatchers.Add(cell, 1)
- }
}
func (ts *txThrottlerStateImpl) closeHealthCheckStream() {
if ts.healthCheck == nil {
return
}
- for cell, watcher := range ts.topologyWatchers {
- watcher.Stop()
- ts.txThrottler.topoWatchers.Reset(cell)
- }
- ts.topologyWatchers = nil
ts.stopHealthCheck()
ts.healthCheck.Close()
}
diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
index ea57d37ad8e..268a37437d9 100644
--- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
+++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
@@ -19,7 +19,6 @@ package txthrottler
// Commands to generate the mocks for this test.
//go:generate mockgen -destination mock_healthcheck_test.go -package txthrottler -mock_names "HealthCheck=MockHealthCheck" vitess.io/vitess/go/vt/discovery HealthCheck
//go:generate mockgen -destination mock_throttler_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler ThrottlerInterface
-//go:generate mockgen -destination mock_topology_watcher_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler TopologyWatcherInterface
import (
"context"
@@ -74,16 +73,6 @@ func TestEnabledThrottler(t *testing.T) {
return mockHealthCheck
}
- topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface {
- assert.Equal(t, ts, topoServer)
- assert.Contains(t, []string{"cell1", "cell2"}, cell)
- assert.Equal(t, "keyspace", keyspace)
- assert.Equal(t, "shard", shard)
- result := NewMockTopologyWatcherInterface(mockCtrl)
- result.EXPECT().Stop()
- return result
- }
-
mockThrottler := NewMockThrottlerInterface(mockCtrl)
throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) {
assert.Equal(t, 1, threadCount)
@@ -131,7 +120,6 @@ func TestEnabledThrottler(t *testing.T) {
throttlerStateImpl := throttlerImpl.state.(*txThrottlerStateImpl)
assert.Equal(t, map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: true}, throttlerStateImpl.tabletTypes)
assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get())
- assert.Equal(t, map[string]int64{"cell1": 1, "cell2": 1}, throttlerImpl.topoWatchers.Counts())
assert.False(t, throttlerImpl.Throttle(100, "some_workload"))
assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Counts()["some_workload"])
@@ -162,7 +150,6 @@ func TestEnabledThrottler(t *testing.T) {
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some_workload"])
throttlerImpl.Close()
assert.Zero(t, throttlerImpl.throttlerRunning.Get())
- assert.Equal(t, map[string]int64{"cell1": 0, "cell2": 0}, throttlerImpl.topoWatchers.Counts())
}
func TestFetchKnownCells(t *testing.T) {