From a77b4af52e95575502f9a19f7ce32d477072fa42 Mon Sep 17 00:00:00 2001 From: Matt Layher Date: Fri, 13 Sep 2024 11:43:42 -0400 Subject: [PATCH] vdiff: factor Summarize out from vtctldclient Factor out buildSingleSummary as vdiff.Summarize, which generates a summary of the VDiff operation based on the VDiffShow RPC response. Fixes #16780. Signed-off-by: Matt Layher --- .../command/vreplication/vdiff/vdiff.go | 255 +--------------- .../command/vreplication/vdiff/vdiff_test.go | 110 ------- go/vt/vttablet/tabletmanager/vdiff/summary.go | 277 ++++++++++++++++++ .../tabletmanager/vdiff/summary_test.go | 134 +++++++++ 4 files changed, 414 insertions(+), 362 deletions(-) create mode 100644 go/vt/vttablet/tabletmanager/vdiff/summary.go create mode 100644 go/vt/vttablet/tabletmanager/vdiff/summary_test.go diff --git a/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go b/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go index 1a0443a45c2..c7a60847682 100644 --- a/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go +++ b/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go @@ -17,13 +17,11 @@ limitations under the License. package vdiff import ( - "encoding/json" "fmt" "html/template" "io" "math" "reflect" - "sort" "strings" "time" @@ -397,34 +395,6 @@ func commandResume(cmd *cobra.Command, args []string) error { return nil } -// tableSummary aggregates the current state of the table diff from all shards. -type tableSummary struct { - TableName string - State vdiff.VDiffState - RowsCompared int64 - MatchingRows int64 - MismatchedRows int64 - ExtraRowsSource int64 - ExtraRowsTarget int64 - LastUpdated string `json:"LastUpdated,omitempty"` -} - -// summary aggregates the current state of the vdiff from all shards. -type summary struct { - Workflow, Keyspace string - State vdiff.VDiffState - UUID string - RowsCompared int64 - HasMismatch bool - Shards string - StartedAt string `json:"StartedAt,omitempty"` - CompletedAt string `json:"CompletedAt,omitempty"` - TableSummaryMap map[string]tableSummary `json:"TableSummary,omitempty"` - Reports map[string]map[string]vdiff.DiffReport `json:"Reports,omitempty"` - Errors map[string]string `json:"Errors,omitempty"` - Progress *vdiff.ProgressReport `json:"Progress,omitempty"` -} - const summaryTextTemplate = ` VDiff Summary for {{.Keyspace}}.{{.Workflow}} ({{.UUID}}) State: {{.State}} @@ -438,7 +408,7 @@ HasMismatch: {{.HasMismatch}} StartedAt: {{.StartedAt}} {{if (eq .State "started")}}Progress: {{printf "%.2f" .Progress.Percentage}}%%{{if .Progress.ETA}}, ETA: {{.Progress.ETA}}{{end}}{{end}} {{if .CompletedAt}}CompletedAt: {{.CompletedAt}}{{end}} -{{range $table := .TableSummaryMap}} +{{range $table := .TableSummaryMap}} Table {{$table.TableName}}: State: {{$table.State}} ProcessedRows: {{$table.RowsCompared}} @@ -447,7 +417,7 @@ Table {{$table.TableName}}: {{if $table.ExtraRowsSource}} ExtraRowsSource: {{$table.ExtraRowsSource}}{{end}} {{if $table.ExtraRowsTarget}} ExtraRowsTarget: {{$table.ExtraRowsTarget}}{{end}} {{end}} - + Use "--format=json" for more detailed output. ` @@ -577,7 +547,7 @@ func buildRecentListings(resp *vtctldatapb.VDiffShowResponse) ([]*listing, error func displayShowSingleSummary(out io.Writer, format, keyspace, workflowName, uuid string, resp *vtctldatapb.VDiffShowResponse, verbose bool) (vdiff.VDiffState, error) { state := vdiff.UnknownState var output string - summary, err := buildSingleSummary(keyspace, workflowName, uuid, resp, verbose) + summary, err := vdiff.Summarize(keyspace, workflowName, uuid, resp, verbose) if err != nil { return state, err } @@ -614,225 +584,6 @@ func displayShowSingleSummary(out io.Writer, format, keyspace, workflowName, uui return state, nil } -func buildSingleSummary(keyspace, workflow, uuid string, resp *vtctldatapb.VDiffShowResponse, verbose bool) (*summary, error) { - summary := &summary{ - Workflow: workflow, - Keyspace: keyspace, - UUID: uuid, - State: vdiff.UnknownState, - RowsCompared: 0, - StartedAt: "", - CompletedAt: "", - HasMismatch: false, - Shards: "", - Reports: make(map[string]map[string]vdiff.DiffReport), - Errors: make(map[string]string), - Progress: nil, - } - - var tableSummaryMap map[string]tableSummary - var reports map[string]map[string]vdiff.DiffReport - // Keep a tally of the states across all tables in all shards. - tableStateCounts := map[vdiff.VDiffState]int{ - vdiff.UnknownState: 0, - vdiff.PendingState: 0, - vdiff.StartedState: 0, - vdiff.StoppedState: 0, - vdiff.ErrorState: 0, - vdiff.CompletedState: 0, - } - // Keep a tally of the summary states across all shards. - shardStateCounts := map[vdiff.VDiffState]int{ - vdiff.UnknownState: 0, - vdiff.PendingState: 0, - vdiff.StartedState: 0, - vdiff.StoppedState: 0, - vdiff.ErrorState: 0, - vdiff.CompletedState: 0, - } - // Keep a tally of the approximate total rows to process as we'll use this for our progress - // report. - totalRowsToCompare := int64(0) - var shards []string - for shard, resp := range resp.TabletResponses { - first := true - if resp != nil && resp.Output != nil { - shards = append(shards, shard) - qr := sqltypes.Proto3ToResult(resp.Output) - if tableSummaryMap == nil { - tableSummaryMap = make(map[string]tableSummary, 0) - reports = make(map[string]map[string]vdiff.DiffReport, 0) - } - for _, row := range qr.Named().Rows { - // Update the global VDiff summary based on the per shard level summary. - // Since these values will be the same for all subsequent rows we only use - // the first row. - if first { - first = false - // Our timestamps are strings in `2022-06-26 20:43:25` format so we sort - // them lexicographically. - // We should use the earliest started_at across all shards. - if sa := row.AsString("started_at", ""); summary.StartedAt == "" || sa < summary.StartedAt { - summary.StartedAt = sa - } - // And we should use the latest completed_at across all shards. - if ca := row.AsString("completed_at", ""); summary.CompletedAt == "" || ca > summary.CompletedAt { - summary.CompletedAt = ca - } - // If we had an error on the shard, then let's add that to the summary. - if le := row.AsString("last_error", ""); le != "" { - summary.Errors[shard] = le - } - // Keep track of how many shards are marked as a specific state. We check - // this combined with the shard.table states to determine the VDiff summary - // state. - shardStateCounts[vdiff.VDiffState(strings.ToLower(row.AsString("vdiff_state", "")))]++ - } - - // Global VDiff summary updates that take into account the per table details - // per shard. - { - summary.RowsCompared += row.AsInt64("rows_compared", 0) - totalRowsToCompare += row.AsInt64("table_rows", 0) - - // If we had a mismatch on any table on any shard then the global VDiff - // summary does too. - if mm, _ := row.ToBool("has_mismatch"); mm { - summary.HasMismatch = true - } - } - - // Table summary information that must be accounted for across all shards. - { - table := row.AsString("table_name", "") - if table == "" { // This occurs when the table diff has not started on 1 or more shards - continue - } - // Create the global VDiff table summary object if it doesn't exist. - if _, ok := tableSummaryMap[table]; !ok { - tableSummaryMap[table] = tableSummary{ - TableName: table, - State: vdiff.UnknownState, - } - - } - ts := tableSummaryMap[table] - // This is the shard level VDiff table state. - sts := vdiff.VDiffState(strings.ToLower(row.AsString("table_state", ""))) - tableStateCounts[sts]++ - - // The error state must be sticky, and we should not override any other - // known state with completed. - switch sts { - case vdiff.CompletedState: - if ts.State == vdiff.UnknownState { - ts.State = sts - } - case vdiff.ErrorState: - ts.State = sts - default: - if ts.State != vdiff.ErrorState { - ts.State = sts - } - } - - diffReport := row.AsString("report", "") - dr := vdiff.DiffReport{} - if diffReport != "" { - err := json.Unmarshal([]byte(diffReport), &dr) - if err != nil { - return nil, err - } - ts.RowsCompared += dr.ProcessedRows - ts.MismatchedRows += dr.MismatchedRows - ts.MatchingRows += dr.MatchingRows - ts.ExtraRowsTarget += dr.ExtraRowsTarget - ts.ExtraRowsSource += dr.ExtraRowsSource - } - if _, ok := reports[table]; !ok { - reports[table] = make(map[string]vdiff.DiffReport) - } - - reports[table][shard] = dr - tableSummaryMap[table] = ts - } - } - } - } - - // The global VDiff summary should progress from pending->started->completed with - // stopped for any shard and error for any table being sticky for the global summary. - // We should only consider the VDiff to be complete if it's completed for every table - // on every shard. - if shardStateCounts[vdiff.StoppedState] > 0 { - summary.State = vdiff.StoppedState - } else if shardStateCounts[vdiff.ErrorState] > 0 || tableStateCounts[vdiff.ErrorState] > 0 { - summary.State = vdiff.ErrorState - } else if tableStateCounts[vdiff.StartedState] > 0 { - summary.State = vdiff.StartedState - } else if tableStateCounts[vdiff.PendingState] > 0 { - summary.State = vdiff.PendingState - } else if tableStateCounts[vdiff.CompletedState] == (len(tableSummaryMap) * len(shards)) { - // When doing shard consolidations/merges, we cannot rely solely on the - // vdiff_table state as there are N sources that we process rows from sequentially - // with each one writing to the shared _vt.vdiff_table record for the target shard. - // So we only mark the vdiff for the shard as completed when we've finished - // processing rows from all of the sources -- which is recorded by marking the - // vdiff done for the shard by setting _vt.vdiff.state = completed. - if shardStateCounts[vdiff.CompletedState] == len(shards) { - summary.State = vdiff.CompletedState - } else { - summary.State = vdiff.StartedState - } - } else { - summary.State = vdiff.UnknownState - } - - // If the vdiff has been started then we can calculate the progress. - if summary.State == vdiff.StartedState { - buildProgressReport(summary, totalRowsToCompare) - } - - sort.Strings(shards) // Sort for predictable output - summary.Shards = strings.Join(shards, ",") - summary.TableSummaryMap = tableSummaryMap - summary.Reports = reports - if !summary.HasMismatch && !verbose { - summary.Reports = nil - summary.TableSummaryMap = nil - } - // If we haven't completed the global VDiff then be sure to reflect that with no - // CompletedAt value. - if summary.State != vdiff.CompletedState { - summary.CompletedAt = "" - } - return summary, nil -} - -func buildProgressReport(summary *summary, rowsToCompare int64) { - report := &vdiff.ProgressReport{} - if summary.RowsCompared >= 1 { - // Round to 2 decimal points. - report.Percentage = math.Round(math.Min((float64(summary.RowsCompared)/float64(rowsToCompare))*100, 100.00)*100) / 100 - } - if math.IsNaN(report.Percentage) { - report.Percentage = 0 - } - pctToGo := math.Abs(report.Percentage - 100.00) - startTime, _ := time.Parse(vdiff.TimestampFormat, summary.StartedAt) - curTime := time.Now().UTC() - runTime := curTime.Unix() - startTime.Unix() - if report.Percentage >= 1 { - // Calculate how long 1% took, on avg, and multiply that by the % left. - eta := time.Unix(((int64(runTime)/int64(report.Percentage))*int64(pctToGo))+curTime.Unix(), 1).UTC() - // Cap the ETA at 1 year out to prevent providing nonsensical ETAs. - if eta.Before(time.Now().UTC().AddDate(1, 0, 0)) { - report.ETA = eta.Format(vdiff.TimestampFormat) - } - } - summary.Progress = report -} - func commandShow(cmd *cobra.Command, args []string) error { format, err := common.GetOutputFormat(cmd) if err != nil { diff --git a/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff_test.go b/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff_test.go index 8742d22abd0..e27c57f47be 100644 --- a/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff_test.go +++ b/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff_test.go @@ -19,7 +19,6 @@ package vdiff import ( "context" "fmt" - "math" "testing" "time" @@ -690,112 +689,3 @@ func TestGetStructNames(t *testing.T) { want := []string{"A", "B"} require.EqualValues(t, want, got) } - -func TestBuildProgressReport(t *testing.T) { - now := time.Now() - type args struct { - summary *summary - rowsToCompare int64 - } - tests := []struct { - name string - args args - want *vdiff.ProgressReport - }{ - { - name: "no progress", - args: args{ - summary: &summary{RowsCompared: 0}, - rowsToCompare: 100, - }, - want: &vdiff.ProgressReport{ - Percentage: 0, - ETA: "", // no ETA - }, - }, - { - name: "one third of the way", - args: args{ - summary: &summary{ - RowsCompared: 33, - StartedAt: now.Add(-10 * time.Second).UTC().Format(vdiff.TimestampFormat), - }, - rowsToCompare: 100, - }, - want: &vdiff.ProgressReport{ - Percentage: 33, - ETA: now.Add(20 * time.Second).UTC().Format(vdiff.TimestampFormat), - }, - }, - { - name: "half way", - args: args{ - summary: &summary{ - RowsCompared: 5000000000, - StartedAt: now.Add(-10 * time.Hour).UTC().Format(vdiff.TimestampFormat), - }, - rowsToCompare: 10000000000, - }, - want: &vdiff.ProgressReport{ - Percentage: 50, - ETA: now.Add(10 * time.Hour).UTC().Format(vdiff.TimestampFormat), - }, - }, - { - name: "full progress", - args: args{ - summary: &summary{ - RowsCompared: 100, - CompletedAt: now.UTC().Format(vdiff.TimestampFormat), - }, - rowsToCompare: 100, - }, - want: &vdiff.ProgressReport{ - Percentage: 100, - ETA: now.UTC().Format(vdiff.TimestampFormat), - }, - }, - { - name: "more than in I_S", - args: args{ - summary: &summary{ - RowsCompared: 100, - CompletedAt: now.UTC().Format(vdiff.TimestampFormat), - }, - rowsToCompare: 50, - }, - want: &vdiff.ProgressReport{ - Percentage: 100, - ETA: now.UTC().Format(vdiff.TimestampFormat), - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - buildProgressReport(tt.args.summary, tt.args.rowsToCompare) - // We always check the percentage - require.Equal(t, int(tt.want.Percentage), int(tt.args.summary.Progress.Percentage)) - - // We only check the ETA if there is one. - if tt.want.ETA != "" { - // Let's check that we're within 1 second to avoid flakes. - wantTime, err := time.Parse(vdiff.TimestampFormat, tt.want.ETA) - require.NoError(t, err) - var timeDiff float64 - if tt.want.Percentage == 100 { - completedTime, err := time.Parse(vdiff.TimestampFormat, tt.args.summary.CompletedAt) - require.NoError(t, err) - timeDiff = math.Abs(completedTime.Sub(wantTime).Seconds()) - } else { - startTime, err := time.Parse(vdiff.TimestampFormat, tt.args.summary.StartedAt) - require.NoError(t, err) - completedTimeUnix := float64(now.UTC().Unix()-startTime.UTC().Unix()) * (100 / tt.want.Percentage) - estimatedTime, err := time.Parse(vdiff.TimestampFormat, tt.want.ETA) - require.NoError(t, err) - timeDiff = math.Abs(estimatedTime.Sub(startTime).Seconds() - completedTimeUnix) - } - require.LessOrEqual(t, timeDiff, 1.0) - } - }) - } -} diff --git a/go/vt/vttablet/tabletmanager/vdiff/summary.go b/go/vt/vttablet/tabletmanager/vdiff/summary.go new file mode 100644 index 00000000000..59536342ec1 --- /dev/null +++ b/go/vt/vttablet/tabletmanager/vdiff/summary.go @@ -0,0 +1,277 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vdiff + +import ( + "encoding/json" + "math" + "sort" + "strings" + "time" + + "vitess.io/vitess/go/sqltypes" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" +) + +// TableSummary aggregates the current state of the table diff from all shards. +type TableSummary struct { + TableName string + State VDiffState + RowsCompared int64 + MatchingRows int64 + MismatchedRows int64 + ExtraRowsSource int64 + ExtraRowsTarget int64 + LastUpdated string `json:"LastUpdated,omitempty"` +} + +// Summary aggregates the current state of the vdiff from all shards. +type Summary struct { + Workflow, Keyspace string + State VDiffState + UUID string + RowsCompared int64 + HasMismatch bool + Shards string + StartedAt string `json:"StartedAt,omitempty"` + CompletedAt string `json:"CompletedAt,omitempty"` + TableSummaryMap map[string]TableSummary `json:"TableSummary,omitempty"` + Reports map[string]map[string]DiffReport `json:"Reports,omitempty"` + Errors map[string]string `json:"Errors,omitempty"` + Progress *ProgressReport `json:"Progress,omitempty"` +} + +// Summarize generates a summary of a VDiff operation based on the VDiffShow RPC +// response. +func Summarize(keyspace, workflow, uuid string, resp *vtctldatapb.VDiffShowResponse, verbose bool) (*Summary, error) { + summary := &Summary{ + Workflow: workflow, + Keyspace: keyspace, + UUID: uuid, + State: UnknownState, + RowsCompared: 0, + StartedAt: "", + CompletedAt: "", + HasMismatch: false, + Shards: "", + Reports: make(map[string]map[string]DiffReport), + Errors: make(map[string]string), + Progress: nil, + } + + var tableSummaryMap map[string]TableSummary + var reports map[string]map[string]DiffReport + // Keep a tally of the states across all tables in all shards. + tableStateCounts := map[VDiffState]int{ + UnknownState: 0, + PendingState: 0, + StartedState: 0, + StoppedState: 0, + ErrorState: 0, + CompletedState: 0, + } + // Keep a tally of the summary states across all shards. + shardStateCounts := map[VDiffState]int{ + UnknownState: 0, + PendingState: 0, + StartedState: 0, + StoppedState: 0, + ErrorState: 0, + CompletedState: 0, + } + // Keep a tally of the approximate total rows to process as we'll use this for our progress + // report. + totalRowsToCompare := int64(0) + var shards []string + for shard, resp := range resp.TabletResponses { + first := true + if resp != nil && resp.Output != nil { + shards = append(shards, shard) + qr := sqltypes.Proto3ToResult(resp.Output) + if tableSummaryMap == nil { + tableSummaryMap = make(map[string]TableSummary, 0) + reports = make(map[string]map[string]DiffReport, 0) + } + for _, row := range qr.Named().Rows { + // Update the global VDiff summary based on the per shard level summary. + // Since these values will be the same for all subsequent rows we only use + // the first row. + if first { + first = false + // Our timestamps are strings in `2022-06-26 20:43:25` format so we sort + // them lexicographically. + // We should use the earliest started_at across all shards. + if sa := row.AsString("started_at", ""); summary.StartedAt == "" || sa < summary.StartedAt { + summary.StartedAt = sa + } + // And we should use the latest completed_at across all shards. + if ca := row.AsString("completed_at", ""); summary.CompletedAt == "" || ca > summary.CompletedAt { + summary.CompletedAt = ca + } + // If we had an error on the shard, then let's add that to the summary. + if le := row.AsString("last_error", ""); le != "" { + summary.Errors[shard] = le + } + // Keep track of how many shards are marked as a specific state. We check + // this combined with the shard.table states to determine the VDiff summary + // state. + shardStateCounts[VDiffState(strings.ToLower(row.AsString("vdiff_state", "")))]++ + } + + // Global VDiff summary updates that take into account the per table details + // per shard. + { + summary.RowsCompared += row.AsInt64("rows_compared", 0) + totalRowsToCompare += row.AsInt64("table_rows", 0) + + // If we had a mismatch on any table on any shard then the global VDiff + // summary does too. + if mm, _ := row.ToBool("has_mismatch"); mm { + summary.HasMismatch = true + } + } + + // Table summary information that must be accounted for across all shards. + { + table := row.AsString("table_name", "") + if table == "" { // This occurs when the table diff has not started on 1 or more shards + continue + } + // Create the global VDiff table summary object if it doesn't exist. + if _, ok := tableSummaryMap[table]; !ok { + tableSummaryMap[table] = TableSummary{ + TableName: table, + State: UnknownState, + } + + } + ts := tableSummaryMap[table] + // This is the shard level VDiff table state. + sts := VDiffState(strings.ToLower(row.AsString("table_state", ""))) + tableStateCounts[sts]++ + + // The error state must be sticky, and we should not override any other + // known state with completed. + switch sts { + case CompletedState: + if ts.State == UnknownState { + ts.State = sts + } + case ErrorState: + ts.State = sts + default: + if ts.State != ErrorState { + ts.State = sts + } + } + + diffReport := row.AsString("report", "") + dr := DiffReport{} + if diffReport != "" { + err := json.Unmarshal([]byte(diffReport), &dr) + if err != nil { + return nil, err + } + ts.RowsCompared += dr.ProcessedRows + ts.MismatchedRows += dr.MismatchedRows + ts.MatchingRows += dr.MatchingRows + ts.ExtraRowsTarget += dr.ExtraRowsTarget + ts.ExtraRowsSource += dr.ExtraRowsSource + } + if _, ok := reports[table]; !ok { + reports[table] = make(map[string]DiffReport) + } + + reports[table][shard] = dr + tableSummaryMap[table] = ts + } + } + } + } + + // The global VDiff summary should progress from pending->started->completed with + // stopped for any shard and error for any table being sticky for the global summary. + // We should only consider the VDiff to be complete if it's completed for every table + // on every shard. + if shardStateCounts[StoppedState] > 0 { + summary.State = StoppedState + } else if shardStateCounts[ErrorState] > 0 || tableStateCounts[ErrorState] > 0 { + summary.State = ErrorState + } else if tableStateCounts[StartedState] > 0 { + summary.State = StartedState + } else if tableStateCounts[PendingState] > 0 { + summary.State = PendingState + } else if tableStateCounts[CompletedState] == (len(tableSummaryMap) * len(shards)) { + // When doing shard consolidations/merges, we cannot rely solely on the + // vdiff_table state as there are N sources that we process rows from sequentially + // with each one writing to the shared _vt.vdiff_table record for the target shard. + // So we only mark the vdiff for the shard as completed when we've finished + // processing rows from all of the sources -- which is recorded by marking the + // vdiff done for the shard by setting _vt.state = completed. + if shardStateCounts[CompletedState] == len(shards) { + summary.State = CompletedState + } else { + summary.State = StartedState + } + } else { + summary.State = UnknownState + } + + // If the vdiff has been started then we can calculate the progress. + if summary.State == StartedState { + buildProgressReport(summary, totalRowsToCompare) + } + + sort.Strings(shards) // Sort for predictable output + summary.Shards = strings.Join(shards, ",") + summary.TableSummaryMap = tableSummaryMap + summary.Reports = reports + if !summary.HasMismatch && !verbose { + summary.Reports = nil + summary.TableSummaryMap = nil + } + // If we haven't completed the global VDiff then be sure to reflect that with no + // CompletedAt value. + if summary.State != CompletedState { + summary.CompletedAt = "" + } + return summary, nil +} + +func buildProgressReport(summary *Summary, rowsToCompare int64) { + report := &ProgressReport{} + if summary.RowsCompared >= 1 { + // Round to 2 decimal points. + report.Percentage = math.Round(math.Min((float64(summary.RowsCompared)/float64(rowsToCompare))*100, 100.00)*100) / 100 + } + if math.IsNaN(report.Percentage) { + report.Percentage = 0 + } + pctToGo := math.Abs(report.Percentage - 100.00) + startTime, _ := time.Parse(TimestampFormat, summary.StartedAt) + curTime := time.Now().UTC() + runTime := curTime.Unix() - startTime.Unix() + if report.Percentage >= 1 { + // Calculate how long 1% took, on avg, and multiply that by the % left. + eta := time.Unix(((int64(runTime)/int64(report.Percentage))*int64(pctToGo))+curTime.Unix(), 1).UTC() + // Cap the ETA at 1 year out to prevent providing nonsensical ETAs. + if eta.Before(time.Now().UTC().AddDate(1, 0, 0)) { + report.ETA = eta.Format(TimestampFormat) + } + } + summary.Progress = report +} diff --git a/go/vt/vttablet/tabletmanager/vdiff/summary_test.go b/go/vt/vttablet/tabletmanager/vdiff/summary_test.go new file mode 100644 index 00000000000..fcdddcb5c61 --- /dev/null +++ b/go/vt/vttablet/tabletmanager/vdiff/summary_test.go @@ -0,0 +1,134 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vdiff + +import ( + "math" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestBuildProgressReport(t *testing.T) { + now := time.Now() + type args struct { + summary *Summary + rowsToCompare int64 + } + tests := []struct { + name string + args args + want *ProgressReport + }{ + { + name: "no progress", + args: args{ + summary: &Summary{RowsCompared: 0}, + rowsToCompare: 100, + }, + want: &ProgressReport{ + Percentage: 0, + ETA: "", // no ETA + }, + }, + { + name: "one third of the way", + args: args{ + summary: &Summary{ + RowsCompared: 33, + StartedAt: now.Add(-10 * time.Second).UTC().Format(TimestampFormat), + }, + rowsToCompare: 100, + }, + want: &ProgressReport{ + Percentage: 33, + ETA: now.Add(20 * time.Second).UTC().Format(TimestampFormat), + }, + }, + { + name: "half way", + args: args{ + summary: &Summary{ + RowsCompared: 5000000000, + StartedAt: now.Add(-10 * time.Hour).UTC().Format(TimestampFormat), + }, + rowsToCompare: 10000000000, + }, + want: &ProgressReport{ + Percentage: 50, + ETA: now.Add(10 * time.Hour).UTC().Format(TimestampFormat), + }, + }, + { + name: "full progress", + args: args{ + summary: &Summary{ + RowsCompared: 100, + CompletedAt: now.UTC().Format(TimestampFormat), + }, + rowsToCompare: 100, + }, + want: &ProgressReport{ + Percentage: 100, + ETA: now.UTC().Format(TimestampFormat), + }, + }, + { + name: "more than in I_S", + args: args{ + summary: &Summary{ + RowsCompared: 100, + CompletedAt: now.UTC().Format(TimestampFormat), + }, + rowsToCompare: 50, + }, + want: &ProgressReport{ + Percentage: 100, + ETA: now.UTC().Format(TimestampFormat), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + buildProgressReport(tt.args.summary, tt.args.rowsToCompare) + // We always check the percentage + require.Equal(t, int(tt.want.Percentage), int(tt.args.summary.Progress.Percentage)) + + // We only check the ETA if there is one. + if tt.want.ETA != "" { + // Let's check that we're within 1 second to avoid flakes. + wantTime, err := time.Parse(TimestampFormat, tt.want.ETA) + require.NoError(t, err) + var timeDiff float64 + if tt.want.Percentage == 100 { + completedTime, err := time.Parse(TimestampFormat, tt.args.summary.CompletedAt) + require.NoError(t, err) + timeDiff = math.Abs(completedTime.Sub(wantTime).Seconds()) + } else { + startTime, err := time.Parse(TimestampFormat, tt.args.summary.StartedAt) + require.NoError(t, err) + completedTimeUnix := float64(now.UTC().Unix()-startTime.UTC().Unix()) * (100 / tt.want.Percentage) + estimatedTime, err := time.Parse(TimestampFormat, tt.want.ETA) + require.NoError(t, err) + timeDiff = math.Abs(estimatedTime.Sub(startTime).Seconds() - completedTimeUnix) + } + require.LessOrEqual(t, timeDiff, 1.0) + } + }) + } +}