Skip to content

Commit

Permalink
Update e2e test
Browse files Browse the repository at this point in the history
Revert "Fix vitessio#16565"

Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Aug 13, 2024
1 parent f1826cc commit 58605b3
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 19 deletions.
57 changes: 39 additions & 18 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1228,18 +1228,7 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
for _, tab := range customerTablets {
waitForRowCountInTablet(t, tab, keyspace, workflow, 5)
// Confirm that we updated the stats on the target tablets as expected.
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
require.NoError(t, err)
require.NotEqual(t, "{}", jsVal)
// The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2}
vstreamerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vstreamer`, workflow)).Int()
require.Greater(t, vstreamerThrottledCount, int64(0))
// We only need to do this stat check once.
val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"})
require.NoError(t, err)
throttledCount, err := strconv.ParseInt(val, 10, 64)
require.NoError(t, err)
require.GreaterOrEqual(t, throttledCount, vstreamerThrottledCount)
confirmVReplicationThrottling(t, tab, workflow, sourceThrottlerAppName.String())
}
})
t.Run("unthrottle-app-product", func(t *testing.T) {
Expand Down Expand Up @@ -1274,12 +1263,7 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
for _, tab := range customerTablets {
waitForRowCountInTablet(t, tab, keyspace, workflow, 8)
// Confirm that we updated the stats on the target tablets as expected.
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
require.NoError(t, err)
require.NotEqual(t, "{}", jsVal)
// The JSON value now looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4}
vplayerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vplayer`, workflow)).Int()
require.Greater(t, vplayerThrottledCount, int64(0))
confirmVReplicationThrottling(t, tab, workflow, targetThrottlerAppName.String())
}
})
t.Run("unthrottle-app-customer", func(t *testing.T) {
Expand Down Expand Up @@ -1709,3 +1693,40 @@ func waitForInnoDBHistoryLength(t *testing.T, tablet *cluster.VttabletProcess, e
func releaseInnoDBRowHistory(t *testing.T, dbConn *mysql.Conn) {
execQuery(t, dbConn, "rollback")
}

// confirmVReplicationThrottling confirms that the throttling related metrics reflect that
// the workflow is being throttled as expected, via the expected app name, and that this
// is impacting the lag as expected.
func confirmVReplicationThrottling(t *testing.T, tab *cluster.VttabletProcess, workflow, appname string) {
//time.Sleep(1 * time.Second) // To accrue some lag
const zv = int64(0)

jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
require.NoError(t, err)
require.NotEqual(t, "{}", jsVal)
// The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4}
throttledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.%s`, workflow, appname)).Int()
require.Greater(t, throttledCount, zv, "JSON value: %s", jsVal)

val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"})
require.NoError(t, err)
require.NotEqual(t, "", val)
throttledCountTotal, err := strconv.ParseInt(val, 10, 64)
require.NoError(t, err)
require.GreaterOrEqual(t, throttledCountTotal, throttledCount, "Value: %s", val)

jsVal, err = getDebugVar(t, tab.Port, []string{"VReplicationLagSeconds"})
require.NoError(t, err)
require.NotEqual(t, "{}", jsVal)
// The JSON value looks like this: {"commerce.0.commerce2customer.1": 135}
vreplLagSeconds := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.%s\.*`, tab.Keyspace, workflow)).Int()
require.NoError(t, err)
require.Greater(t, vreplLagSeconds, zv, "JSON value: %s", jsVal)

val, err = getDebugVar(t, tab.Port, []string{"VReplicationLagSecondsMax"})
require.NoError(t, err)
require.NotEqual(t, "", val)
vreplLagSecondsMax, err := strconv.ParseInt(val, 10, 64)
require.NoError(t, err)
require.GreaterOrEqual(t, vreplLagSecondsMax, vreplLagSeconds, "Value: %s", val)
}
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ func primaryVindexesDiffer(ms *vtctldatapb.MaterializeSettings, source, target *
// If neither source nor target have any vindexes, treat the answer to
// the question as trivially false.
if len(sColumnVindexes) == 0 && len(tColumnVindexes) == 0 {
return false
return true
}

sPrimaryVindex := sColumnVindexes[0]
Expand Down

0 comments on commit 58605b3

Please sign in to comment.