Skip to content

Commit

Permalink
Handle export data status panics (#1874)
Browse files Browse the repository at this point in the history
1. Handling nil pointer and the index out-of-range issues while running export data status before, during, and after export data.
2. Handled panics in `get data-migration report`, when it was run before export was started.
3. Fixed a bug in the `export data status` for partitions case from PG where we club the statuses of all leaf table to root tables's entry, which was not showing the progress of partitions and was just showing the NOT-STARTED for the entire duration of export data and was showing the DONE one it is completed.
4. Fixed the output of the export data status for the Debezium export case of PG partitions with table-list
5. Added expected files to be validated in pg-partitions automation test for table-list case of export-data-status command.
  • Loading branch information
priyanshi-yb authored Nov 13, 2024
1 parent c6a3b8a commit 5320379
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 52 deletions.
5 changes: 5 additions & 0 deletions migtests/scripts/run-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ main() {
expected_file="${TEST_DIR}/export_data_status-report.json"
actual_file="${EXPORT_DIR}/reports/export-data-status-report.json"

if [ "${EXPORT_TABLE_LIST}" != "" ]
then
expected_file="${TEST_DIR}/export-data-status-with-table-list-report.json"
fi

step "Verify export-data-status report"
verify_report ${expected_file} ${actual_file}

Expand Down
37 changes: 37 additions & 0 deletions migtests/tests/pg/partitions/export-data-status-report.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
[
{
"table_name": "customers",
"status": "DONE",
"exported_count": 1000
},
{
"table_name": "sales",
"status": "DONE",
"exported_count": 1000
},
{
"table_name": "emp",
"status": "DONE",
"exported_count": 1000
},
{
"table_name": "p1.sales_region",
"status": "DONE",
"exported_count": 1000
},
{
"table_name": "range_columns_partition_test",
"status": "DONE",
"exported_count": 6
},
{
"table_name": "sales_region",
"status": "DONE",
"exported_count": 1000
},
{
"table_name": "test_partitions_sequences",
"status": "DONE",
"exported_count": 1000
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
[
{
"table_name": "customers (cust_other, cust_part11, cust_part12, cust_part21, cust_part22)",
"status": "DONE",
"exported_count": 1000
},
{
"table_name": "sales (sales_2019_q4, sales_2020_q1, sales_2020_q2)",
"status": "DONE",
"exported_count": 1000
},
{
"table_name": "emp (emp_0, emp_1, emp_2)",
"status": "DONE",
"exported_count": 1000
},
{
"table_name": "p1.sales_region (p2.boston, p2.london, p2.sydney)",
"status": "DONE",
"exported_count": 1000
},
{
"table_name": "range_columns_partition_test (range_columns_partition_test_p0, range_columns_partition_test_p1)",
"status": "DONE",
"exported_count": 6
},
{
"table_name": "sales_region (boston, london, sydney)",
"status": "DONE",
"exported_count": 1000
},
{
"table_name": "test_partitions_sequences (test_partitions_sequences_b, test_partitions_sequences_l, test_partitions_sequences_s)",
"status": "DONE",
"exported_count": 1000
}
]
31 changes: 10 additions & 21 deletions yb-voyager/cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,13 @@ func getLeafPartitionsFromRootTable() map[string][]string {
if err != nil {
utils.ErrExit("get migration status record: %v", err)
}
if !msr.IsExportTableListSet || msr.SourceDBConf.DBType != POSTGRESQL {
if msr.SourceDBConf.DBType != POSTGRESQL {
return leafPartitions
}
tables := msr.TableListExportedFromSource
for leaf, root := range msr.SourceRenameTablesMap {
//Using the SQLName here to avoid creating the NameTuples manually for leafTable case as in a case partition names changes on target
//NameRegistry won't be able to figure out the map of source->target tuples.
leafTable := sqlname.NewSourceNameFromQualifiedName(getQuotedFromUnquoted(leaf))
rootTable := sqlname.NewSourceNameFromQualifiedName(getQuotedFromUnquoted(root))
leaf = leafTable.Qualified.MinQuoted
Expand All @@ -251,6 +253,7 @@ func getLeafPartitionsFromRootTable() map[string][]string {
if !lo.Contains(tables, root) {
continue
}
//Adding a Qualified.MinQuoted to key and values which is similar to NameTuple.ForOutput();
leafPartitions[root] = append(leafPartitions[root], leaf)
}

Expand All @@ -268,6 +271,10 @@ func displayExportedRowCountSnapshot(snapshotViaDebezium bool) {
fmt.Printf("snapshot export report\n")
uitable := uitable.New()

msr, err := metaDB.GetMigrationStatusRecord()
if err != nil {
utils.ErrExit("error getting migration status record: %v", err)
}
leafPartitions := getLeafPartitionsFromRootTable()
if !snapshotViaDebezium {
exportedRowCount := getExportedRowCountSnapshot(exportDir)
Expand All @@ -287,8 +294,9 @@ func displayExportedRowCountSnapshot(snapshotViaDebezium bool) {
utils.ErrExit("lookup table %s in name registry : %v", key, err)
}
displayTableName := table.CurrentName.Unqualified.MinQuoted
//Using the ForOutput() as a key for leafPartitions map as we are populating the map in that way.
partitions := leafPartitions[table.ForOutput()]
if source.DBType == POSTGRESQL && partitions != nil {
if source.DBType == POSTGRESQL && partitions != nil && msr.IsExportTableListSet {
partitions := strings.Join(partitions, ", ")
displayTableName = fmt.Sprintf("%s (%s)", table.CurrentName.Unqualified.MinQuoted, partitions)
}
Expand Down Expand Up @@ -353,21 +361,6 @@ func renameDatafileDescriptor(exportDir string) {
datafileDescriptor.Save()
}

func renameExportSnapshotStatus(exportSnapshotStatusFile *jsonfile.JsonFile[ExportSnapshotStatus]) error {
err := exportSnapshotStatusFile.Update(func(exportSnapshotStatus *ExportSnapshotStatus) {
for i, tableStatus := range exportSnapshotStatus.Tables {
renamedTable, isRenamed := renameTableIfRequired(tableStatus.TableName)
if isRenamed {
exportSnapshotStatus.Tables[i].TableName = renamedTable
}
}
})
if err != nil {
return fmt.Errorf("update export snapshot status: %w", err)
}
return nil
}

func displayImportedRowCountSnapshot(state *ImportDataState, tasks []*ImportFileTask) {
if importerRole == IMPORT_FILE_ROLE {
fmt.Printf("import report\n")
Expand Down Expand Up @@ -898,10 +891,6 @@ func getExportedSnapshotRowsMap(exportSnapshotStatus *ExportSnapshotStatus) (*ut
snapshotStatusMap := utils.NewStructMap[sqlname.NameTuple, []string]()

for _, tableStatus := range exportSnapshotStatus.Tables {
if tableStatus.FileName == "" {
//in case of root table as well in the tablelist during export an entry with empty file name is there
continue
}
nt, err := namereg.NameReg.LookupTableName(tableStatus.TableName)
if err != nil {
return nil, nil, fmt.Errorf("lookup table [%s] from name registry: %v", tableStatus.TableName, err)
Expand Down
5 changes: 0 additions & 5 deletions yb-voyager/cmd/exportData.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,11 +900,6 @@ func exportDataOffline(ctx context.Context, cancel context.CancelFunc, finalTabl
if source.DBType == POSTGRESQL {
//Make leaf partitions data files entry under the name of root table
renameDatafileDescriptor(exportDir)
//Similarly for the export snapshot status file
err = renameExportSnapshotStatus(exportSnapshotStatusFile)
if err != nil {
return fmt.Errorf("rename export snapshot status: %w", err)
}
}
displayExportedRowCountSnapshot(false)

Expand Down
7 changes: 7 additions & 0 deletions yb-voyager/cmd/exportDataStatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ func initializeExportTableMetadata(tableList []sqlname.NameTuple) {
Status: utils.TableMetadataStatusMap[tablesProgressMetadata[key].Status],
ExportedRowCountSnapshot: int64(0),
}
if source.DBType == POSTGRESQL {
//for Postgresql rename the table leaf table names to root table
renamedTable, isRenamed := renameTableIfRequired(key)
if isRenamed {
exportSnapshotStatus.Tables[key].TableName = renamedTable
}
}
}
err := exportSnapshotStatusFile.Create(exportSnapshotStatus)
if err != nil {
Expand Down
74 changes: 48 additions & 26 deletions yb-voyager/cmd/exportDataStatusCommand.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"io/fs"
"path/filepath"
"slices"
"sort"
"strings"

Expand All @@ -28,6 +29,7 @@ import (
"github.com/spf13/cobra"

"github.com/yugabyte/yb-voyager/yb-voyager/src/dbzm"
"github.com/yugabyte/yb-voyager/yb-voyager/src/metadb"
"github.com/yugabyte/yb-voyager/yb-voyager/src/namereg"
"github.com/yugabyte/yb-voyager/yb-voyager/src/utils"
"github.com/yugabyte/yb-voyager/yb-voyager/src/utils/jsonfile"
Expand Down Expand Up @@ -61,12 +63,22 @@ var exportDataStatusCmd = &cobra.Command{
if err != nil {
utils.ErrExit("Failed to get migration status record: %s", err)
}
if msr == nil {
color.Cyan(exportDataStatusMsg)
return
}
useDebezium = msr.IsSnapshotExportedViaDebezium()


source = *msr.SourceDBConf
sqlname.SourceDBType = source.DBType
leafPartitions := getLeafPartitionsFromRootTable()

var rows []*exportTableMigStatusOutputRow
if useDebezium {
rows, err = runExportDataStatusCmdDbzm(streamChanges)
rows, err = runExportDataStatusCmdDbzm(streamChanges, leafPartitions, msr)
} else {
rows, err = runExportDataStatusCmd()
rows, err = runExportDataStatusCmd(msr, leafPartitions)
}
if err != nil {
utils.ErrExit("error: %s\n", err)
Expand Down Expand Up @@ -105,33 +117,35 @@ var InProgressTableSno int

// Note that the `export data status` is running in a separate process. It won't have access to the in-memory state
// held in the main `export data` process.
func runExportDataStatusCmdDbzm(streamChanges bool) ([]*exportTableMigStatusOutputRow, error) {
func runExportDataStatusCmdDbzm(streamChanges bool, leafPartitions map[string][]string, msr *metadb.MigrationStatusRecord) ([]*exportTableMigStatusOutputRow, error) {
exportStatusFilePath := filepath.Join(exportDir, "data", "export_status.json")
status, err := dbzm.ReadExportStatus(exportStatusFilePath)
if err != nil {
utils.ErrExit("Failed to read export status file %s: %v", exportStatusFilePath, err)
}
if status == nil {
return nil, nil
return nil, fmt.Errorf("export data has not started yet. Try running after export has started")
}
InProgressTableSno = status.InProgressTableSno()
var rows []*exportTableMigStatusOutputRow
var row *exportTableMigStatusOutputRow

for _, tableStatus := range status.Tables {
row = getSnapshotExportStatusRow(&tableStatus)
row = getSnapshotExportStatusRow(&tableStatus, leafPartitions, msr)
rows = append(rows, row)
}
return rows, nil
}

func getSnapshotExportStatusRow(tableStatus *dbzm.TableExportStatus) *exportTableMigStatusOutputRow {
func getSnapshotExportStatusRow(tableStatus *dbzm.TableExportStatus, leafPartitions map[string][]string, msr *metadb.MigrationStatusRecord) *exportTableMigStatusOutputRow {
nt, err := namereg.NameReg.LookupTableName(fmt.Sprintf("%s.%s", tableStatus.SchemaName, tableStatus.TableName))
if err != nil {
utils.ErrExit("lookup %s in name registry: %v", tableStatus.TableName, err)
}
//Using the ForOutput() as a key for leafPartitions map as we are populating the map in that way.
displayTableName := getDisplayName(nt, leafPartitions[nt.ForOutput()], msr.IsExportTableListSet)
row := &exportTableMigStatusOutputRow{
TableName: nt.ForMinOutput(),
TableName: displayTableName,
Status: "DONE",
ExportedCount: tableStatus.ExportedRowCountSnapshot,
}
Expand All @@ -145,21 +159,27 @@ func getSnapshotExportStatusRow(tableStatus *dbzm.TableExportStatus) *exportTabl
return row
}

func runExportDataStatusCmd() ([]*exportTableMigStatusOutputRow, error) {
msr, err := metaDB.GetMigrationStatusRecord()
if err != nil {
return nil, fmt.Errorf("error while getting migration status record: %v", err)
func getDisplayName(nt sqlname.NameTuple, partitions []string, isTableListSet bool) string {
displayTableName := nt.ForMinOutput()
//Changing the display of the partition tables in case table-list is set because there can be case where user has passed a subset of leaft tables in the list
if source.DBType == POSTGRESQL && partitions != nil && isTableListSet {
slices.Sort(partitions)
partitions := strings.Join(partitions, ", ")
displayTableName = fmt.Sprintf("%s (%s)", displayTableName, partitions)
}

return displayTableName
}

func runExportDataStatusCmd(msr *metadb.MigrationStatusRecord, leafPartitions map[string][]string) ([]*exportTableMigStatusOutputRow, error) {
tableList := msr.TableListExportedFromSource
source = *msr.SourceDBConf
sqlname.SourceDBType = source.DBType
var outputRows []*exportTableMigStatusOutputRow
exportSnapshotStatusFilePath := filepath.Join(exportDir, "metainfo", "export_snapshot_status.json")
exportSnapshotStatusFile = jsonfile.NewJsonFile[ExportSnapshotStatus](exportSnapshotStatusFilePath)
exportStatusSnapshot, err := exportSnapshotStatusFile.Read()
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
return nil, nil
return nil, fmt.Errorf("export data has not started yet. Try running after export has started")
}
utils.ErrExit("Failed to read export status file %s: %v", exportSnapshotStatusFilePath, err)
}
Expand All @@ -169,20 +189,17 @@ func runExportDataStatusCmd() ([]*exportTableMigStatusOutputRow, error) {
return nil, fmt.Errorf("error while getting exported snapshot rows map: %v", err)
}

leafPartitions := getLeafPartitionsFromRootTable()

for _, tableName := range tableList {
finalFullTableName, err := namereg.NameReg.LookupTableName(tableName)
if err != nil {
return nil, fmt.Errorf("lookup %s in name registry: %v", tableName, err)
}
displayTableName := finalFullTableName.ForMinOutput()
partitions := leafPartitions[finalFullTableName.ForOutput()]
if source.DBType == POSTGRESQL && partitions != nil {
partitions := strings.Join(partitions, ", ")
displayTableName = fmt.Sprintf("%s (%s)", displayTableName, partitions)
//Using the ForOutput() as a key for leafPartitions map as we are populating the map in that way.
displayTableName := getDisplayName(finalFullTableName, leafPartitions[finalFullTableName.ForOutput()], msr.IsExportTableListSet)
snapshotStatus, ok := exportedSnapshotStatus.Get(finalFullTableName)
if !ok {
return nil, fmt.Errorf("snapshot status for table %s is not populated in %q file", finalFullTableName.ForMinOutput(), exportSnapshotStatusFilePath)
}
snapshotStatus, _ := exportedSnapshotStatus.Get(finalFullTableName)
finalStatus := snapshotStatus[0]
if len(snapshotStatus) > 1 { // status for root partition wrt leaf partitions
exportingLeaf := 0
Expand All @@ -199,13 +216,18 @@ func runExportDataStatusCmd() ([]*exportTableMigStatusOutputRow, error) {
}
if exportingLeaf > 0 {
finalStatus = "EXPORTING"
} else if doneLeaf == len(snapshotStatus) {
finalStatus = "DONE"
} else if not_started == len(snapshotStatus) {
finalStatus = "NOT_STARTED"
//In case of partition tables in PG, we are clubbing the status of all leafs and then returning the status
//For root table we are sending NOT_STARTED and if only all leaf partitions will have NOT_STARTED else EXPORTING/DONE
finalStatus = "NOT-STARTED"
} else {
finalStatus = "DONE"
}
}
exportedCount, _ := exportedSnapshotRow.Get(finalFullTableName)
exportedCount, ok := exportedSnapshotRow.Get(finalFullTableName)
if !ok {
return nil, fmt.Errorf("snapshot row count for table %s is not populated in %q file", finalFullTableName.ForMinOutput(), exportSnapshotStatusFilePath)
}
row := &exportTableMigStatusOutputRow{
TableName: displayTableName,
Status: finalStatus,
Expand Down
8 changes: 8 additions & 0 deletions yb-voyager/cmd/getDataMigrationReportCommand.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ limitations under the License.
package cmd

import (
"errors"
"fmt"
"io/fs"
"path/filepath"

"github.com/fatih/color"
Expand Down Expand Up @@ -137,6 +139,9 @@ func getDataMigrationReportCmdFn(msr *metadb.MigrationStatusRecord) {
if err != nil {
utils.ErrExit("Failed to read export status file %s: %v", exportStatusFilePath, err)
}
if dbzmStatus == nil {
utils.ErrExit("Export data has not started yet. Try running after export has started.")
}
dbzmNameTupToRowCount := utils.NewStructMap[sqlname.NameTuple, int64]()

exportSnapshotStatusFilePath := filepath.Join(exportDir, "metainfo", "export_snapshot_status.json")
Expand All @@ -150,6 +155,9 @@ func getDataMigrationReportCmdFn(msr *metadb.MigrationStatusRecord) {
if source.DBType == POSTGRESQL {
exportSnapshotStatus, err = exportSnapshotStatusFile.Read()
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
utils.ErrExit("Export data has not started yet. Try running after export has started.")
}
utils.ErrExit("Failed to read export status file %s: %v", exportSnapshotStatusFilePath, err)
}
exportedPGSnapshotRowsMap, _, err = getExportedSnapshotRowsMap(exportSnapshotStatus)
Expand Down

0 comments on commit 5320379

Please sign in to comment.