diff --git a/vreplgen/vreplgen.go b/vreplgen/vreplgen.go index 6266ecd..7bfef78 100644 --- a/vreplgen/vreplgen.go +++ b/vreplgen/vreplgen.go @@ -23,6 +23,7 @@ package main import ( "bytes" + "flag" "fmt" "os" "strings" @@ -31,24 +32,35 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) +var onDdl string + +func init() { + flag.StringVar(&onDdl, "on_ddl", "ignore", "Set on_ddl value for replication stream - ignore, stop, exec, exec_ignore") + flag.Parse() +} + func main() { + argOffset := 0 + if (len(os.Args) > 1 && strings.HasPrefix(os.Args[1], "-")) { + argOffset = 2 + } + + if (len(os.Args) < (7+argOffset)) { + fmt.Println("Usage: /vreplgen [-on_ddl (ignore|stop|exec|exec_ignore)] 'filter1' [ 'filter2']...") + os.Exit(1) + } + vtctl := os.Getenv("VTCTLCLIENT") if (vtctl == "") { vtctl = "vtctlclient -server localhost:15999" } - // TODO: DDL ignore or not - if (len(os.Args) < 7) { - fmt.Println("Usage: /vreplgen 'filter1' [ 'filter2']...") - os.Exit(1) - } - tabletID := os.Args[1] - sourceKeyspace := os.Args[2] - sourceShard := os.Args[3] - destKeyspace := os.Args[4] + tabletID := os.Args[1+argOffset] + sourceKeyspace := os.Args[2+argOffset] + sourceShard := os.Args[3+argOffset] + destKeyspace := os.Args[4+argOffset] destDbName := "vt_" + destKeyspace - listSize := (len(os.Args) - 5)/2 - rules := make([]*binlogdatapb.Rule, listSize) - for i := 5; i < len(os.Args); i = i+2 { + var rules []*binlogdatapb.Rule + for i := 5+argOffset; i < len(os.Args); i = i+2 { destTable := os.Args[i] destFilter := os.Args[i+1] rule := new(binlogdatapb.Rule) @@ -59,11 +71,24 @@ func main() { filter := &binlogdatapb.Filter{ Rules: rules, } + + var onDdlAction binlogdatapb.OnDDLAction + switch onDdl { + case "ignore": + onDdlAction = binlogdatapb.OnDDLAction_IGNORE + case "stop": + onDdlAction = binlogdatapb.OnDDLAction_STOP + case "exec": + onDdlAction = binlogdatapb.OnDDLAction_EXEC + case "exec_ignore": + onDdlAction = binlogdatapb.OnDDLAction_EXEC_IGNORE + } + bls := &binlogdatapb.BinlogSource{ Keyspace: sourceKeyspace, Shard: sourceShard, Filter: filter, - OnDdl: binlogdatapb.OnDDLAction_IGNORE, + OnDdl: onDdlAction, } val := sqltypes.NewVarBinary(fmt.Sprintf("%v", bls)) var sqlEscaped bytes.Buffer