Skip to content

Commit

Permalink
Enhancement: Optimized the deferred statements retry logic(for depend…
Browse files Browse the repository at this point in the history
…ent ddls) to voluntarily check further iterations of retry is required (#1514) (#1516)

- compare the before and after counts of deferredSqlStmts at each iteration

* Bug fix: failed.sql getting truncated if there are some failed DDLs from post snapshot import phase
  • Loading branch information
sanyamsinghal authored May 15, 2024
1 parent 9325368 commit 6d0f85f
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 27 deletions.
2 changes: 1 addition & 1 deletion migtests/tests/pg/dependent-ddls/env.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export SOURCE_DB_TYPE="postgresql"
export SOURCE_DB_NAME=${SOURCE_DB_NAME:-"deffered_statements"}
export SOURCE_DB_NAME=${SOURCE_DB_NAME:-"deferred_statements"}
export SOURCE_DB_SCHEMA="public"
4 changes: 2 additions & 2 deletions migtests/tests/pg/dependent-ddls/schema.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- this test covers the cases when ddl are depenedent on each other and the order of execution during import schema might be wrong causing does not exist error
-- this has fixed as a part of enhancement in deffered list approach
-- this test covers the cases when ddl are dependent on each other and the order of execution during import schema might be wrong causing does not exist error
-- this has fixed as a part of enhancement in deferred list approach

CREATE FUNCTION public.lower(text[]) RETURNS text[]
LANGUAGE sql IMMUTABLE STRICT
Expand Down
2 changes: 1 addition & 1 deletion yb-voyager/cmd/importData.go
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,7 @@ func executeSqlStmtWithRetries(conn **pgx.Conn, sqlInfo sqlInfo, objType string)
continue
} else if missingRequiredSchemaObject(err) {
log.Infof("deffering execution of SQL: %s", sqlInfo.formattedStmt)
defferedSqlStmts = append(defferedSqlStmts, sqlInfo)
deferredSqlStmts = append(deferredSqlStmts, sqlInfo)
} else if isAlreadyExists(err.Error()) {
// pg_dump generates `CREATE SCHEMA public;` in the schemas.sql. Because the `public`
// schema already exists on the target YB db, the create schema statement fails with
Expand Down
14 changes: 11 additions & 3 deletions yb-voyager/cmd/importSchema.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func importSchema() {
importSchemaInternal(exportDir, []string{"TABLE"}, skipFn)
}

importDefferedStatements()
importDeferredStatements()
log.Info("Schema import is complete.")

dumpStatements(failedSqlStmts, filepath.Join(exportDir, "schema", "failed.sql"))
Expand Down Expand Up @@ -208,7 +208,9 @@ func isYBDatabaseIsColocated(conn *pgx.Conn) bool {

func dumpStatements(stmts []string, filePath string) {
if len(stmts) == 0 {
if utils.FileOrFolderExists(filePath) {
if flagPostSnapshotImport {
// nothing
} else if utils.FileOrFolderExists(filePath) {
err := os.Remove(filePath)
if err != nil {
utils.ErrExit("remove file: %v", err)
Expand All @@ -218,7 +220,13 @@ func dumpStatements(stmts []string, filePath string) {
return
}

file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
var fileMode int
if flagPostSnapshotImport {
fileMode = os.O_WRONLY | os.O_CREATE | os.O_APPEND
} else {
fileMode = os.O_WRONLY | os.O_CREATE | os.O_TRUNC
}
file, err := os.OpenFile(filePath, fileMode, 0644)
if err != nil {
utils.ErrExit("open file: %v", err)
}
Expand Down
52 changes: 32 additions & 20 deletions yb-voyager/cmd/importSchemaYugabyteDB.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cmd

import (
"context"
"fmt"
"path/filepath"
"strings"

Expand All @@ -25,7 +26,7 @@ import (
"golang.org/x/exp/slices"
)

var defferedSqlStmts []sqlInfo
var deferredSqlStmts []sqlInfo
var failedSqlStmts []string

func importSchemaInternal(exportDir string, importObjectList []string,
Expand All @@ -42,44 +43,55 @@ func importSchemaInternal(exportDir string, importObjectList []string,
}

/*
Try re-executing each DDL from deffered list.
Try re-executing each DDL from deferred list.
If fails, silently avoid the error.
Else remove from defferedSQLStmts list
Else remove from deferredSQLStmts list
At the end, add the unsuccessful ones to a failedSqlStmts list and report to the user
*/
func importDefferedStatements() {
if len(defferedSqlStmts) == 0 {
func importDeferredStatements() {
if len(deferredSqlStmts) == 0 {
return
}
log.Infof("Number of statements in defferedSQLStmts list: %d\n", len(defferedSqlStmts))
log.Infof("Number of statements in deferredSQLStmts list: %d\n", len(deferredSqlStmts))

utils.PrintAndLog("\nExecuting the remaining SQL statements...\n\n")
maxIterations := len(defferedSqlStmts)
maxIterations := len(deferredSqlStmts)
conn := newTargetConn()
defer func() { conn.Close(context.Background()) }()

var err error
// max loop iterations to remove all errors
for i := 1; i <= maxIterations && len(defferedSqlStmts) > 0; i++ {
for j := 0; j < len(defferedSqlStmts); {
_, err = conn.Exec(context.Background(), defferedSqlStmts[j].formattedStmt)
for i := 1; i <= maxIterations && len(deferredSqlStmts) > 0; i++ {
beforeDeferredSqlCount := len(deferredSqlStmts)
var failedSqlStmtInIthIteration []string
for j := 0; j < len(deferredSqlStmts); j++ {
_, err = conn.Exec(context.Background(), deferredSqlStmts[j].formattedStmt)
if err == nil {
utils.PrintAndLog("%s\n", utils.GetSqlStmtToPrint(defferedSqlStmts[j].stmt))
utils.PrintAndLog("%s\n", utils.GetSqlStmtToPrint(deferredSqlStmts[j].stmt))
// removing successfully executed SQL
defferedSqlStmts = append(defferedSqlStmts[:j], defferedSqlStmts[j+1:]...)
break // no increment in j
deferredSqlStmts = append(deferredSqlStmts[:j], deferredSqlStmts[j+1:]...)
break
} else {
log.Infof("failed retry of deffered stmt: %s\n%v", utils.GetSqlStmtToPrint(defferedSqlStmts[j].stmt), err)
// fails to execute in final attempt
if i == maxIterations {
errString := "/*\n" + err.Error() + "\n*/\n"
failedSqlStmts = append(failedSqlStmts, errString+defferedSqlStmts[j].formattedStmt)
log.Infof("failed retry of deferred stmt: %s\n%v", utils.GetSqlStmtToPrint(deferredSqlStmts[j].stmt), err)
errString := fmt.Sprintf("/*\n%s\n*/\n", err.Error())
failedSqlStmtInIthIteration = append(failedSqlStmtInIthIteration, errString+deferredSqlStmts[j].formattedStmt)
err = conn.Close(context.Background())
if err != nil {
log.Warnf("error while closing the connection due to failed deferred stmt: %v", err)
}
conn.Close(context.Background())
conn = newTargetConn()
j++
}
}

afterDeferredSqlCount := len(deferredSqlStmts)
if afterDeferredSqlCount == 0 {
log.Infof("all of the deferred statements executed successfully in the %d iteration", i)
} else if beforeDeferredSqlCount == afterDeferredSqlCount {
// no need for further iterations since the deferred list will remain same
log.Infof("none of the deferred statements executed successfully in the %d iteration", i)
failedSqlStmts = failedSqlStmtInIthIteration
break
}
}
}

Expand Down

0 comments on commit 6d0f85f

Please sign in to comment.