Skip to content

Commit

Permalink
Merge branch 'slack-15.0' into bp-pr13856-slack-15.0
Browse files Browse the repository at this point in the history
  • Loading branch information
timvaillancourt authored May 28, 2024
2 parents b3262d7 + bcd0837 commit 801d401
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 3 deletions.
25 changes: 22 additions & 3 deletions go/vt/vttablet/tabletserver/repltracker/repltracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package repltracker

import (
"errors"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -47,6 +49,8 @@ var (
heartbeatLagNsHistogram = stats.NewGenericHistogram("HeartbeatLagNsHistogram",
"Histogram of lag values in nanoseconds", []int64{0, 1e6, 1e7, 1e8, 1e9, 1e10, 1e11, 1e12},
[]string{"0", "1ms", "10ms", "100ms", "1s", "10s", "100s", "1000s", ">1000s"}, "Count", "Total")

errFallback = errors.New("failed to obtain replication lag from poller after attempting to use it as fall-back for heartbeat")
)

// ReplTracker tracks replication lag.
Expand Down Expand Up @@ -133,14 +137,29 @@ func (rt *ReplTracker) Status() (time.Duration, error) {
rt.mu.Lock()
defer rt.mu.Unlock()

fallbackToPoller := false
var heartbeatLag, mysqlLag time.Duration
var heartbeatErr, mysqlErr error

switch {
case rt.isPrimary || rt.mode == tabletenv.Disable:
return 0, nil
case rt.mode == tabletenv.Heartbeat:
return rt.hr.Status()
// This should allow us to migrate safely to using vttablet heartbeat. If using heartbeat fails (e.g. because
// the shard's primary does not yet have them and therefore, either the heartbeat table is missing or it's
// empty), fall back to the poller. Otherwise, use what the heartbeat says.
if heartbeatLag, heartbeatErr = rt.hr.Status(); heartbeatErr == nil {
return heartbeatLag, heartbeatErr
}
fallbackToPoller = true
}
// rt.mode == tabletenv.Poller
return rt.poller.Status()
// rt.mode == tabletenv.Poller or fallback after heartbeat error
mysqlLag, mysqlErr = rt.poller.Status()
if fallbackToPoller && mysqlErr != nil {
return 0, fmt.Errorf("%w: %s", errFallback, mysqlErr)
}

return mysqlLag, mysqlErr
}

// EnableHeartbeat enables or disables writes of heartbeat. This functionality
Expand Down
73 changes: 73 additions & 0 deletions go/vt/vttablet/tabletserver/repltracker/repltracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,76 @@ func TestReplTracker(t *testing.T) {
_, err = rt.Status()
assert.Equal(t, "err", err.Error())
}

func TestStatusHeartbeatFallBack(t *testing.T) {
t.Parallel()

heartbeatErr := errors.New("some error reading heartbeat")
mysqlErr := errors.New("some mysql error")
testCases := []struct {
name string
heartbeatLag tabletenv.Seconds
heartbeatError error
mysqldLag uint
mysqldErr error
expectedError error
expectedLag time.Duration
}{
{
name: "Heartbeat successful",
heartbeatLag: tabletenv.Seconds(5.0),
heartbeatError: nil,
expectedLag: 5 * time.Second,
},
{
name: "Heartbeat failed, mysqld lag successful",
heartbeatError: heartbeatErr,
mysqldLag: 8,
expectedLag: 8 * time.Second,
},
{
name: "Heartbeat & mysqld lag failed",
heartbeatError: heartbeatErr,
mysqldErr: mysqlErr,
expectedError: errFallback,
},
}

for _, testCase := range testCases {
theCase := testCase

t.Run(theCase.name, func(t *testing.T) {
t.Parallel()
config := tabletenv.NewDefaultConfig()
config.ReplicationTracker.Mode = tabletenv.Heartbeat
config.ReplicationTracker.HeartbeatIntervalSeconds = theCase.heartbeatLag
env := tabletenv.NewEnv(config, "ReplTrackerTest")
alias := &topodatapb.TabletAlias{
Cell: "cell",
Uid: 1,
}
mysqld := fakemysqldaemon.NewFakeMysqlDaemon(nil)
mysqld.ReplicationLagSeconds = theCase.mysqldLag
mysqld.Replicating = true
mysqld.ReplicationStatusError = theCase.mysqldErr
target := &querypb.Target{}

rt := NewReplTracker(env, alias)

rt.hr.lastKnownLag = time.Duration(theCase.heartbeatLag) * time.Second
rt.hr.lastKnownError = theCase.heartbeatError
rt.InitDBConfig(target, mysqld)

lag, err := rt.Status()

if theCase.expectedError == nil {
assert.NoError(t, err)
assert.Equal(t, theCase.expectedLag, lag)
} else {
assert.ErrorIs(t, err, theCase.expectedError)
}

})
}

}

0 comments on commit 801d401

Please sign in to comment.