Skip to content

Commit

Permalink
Add support for on_ddl flag
Browse files Browse the repository at this point in the history
Signed-off-by: Jacques Grove <aquarapid@gmail.com>
  • Loading branch information
aquarapid committed Oct 25, 2019
1 parent 3a699c3 commit 2adeaf2
Showing 1 changed file with 38 additions and 13 deletions.
51 changes: 38 additions & 13 deletions vreplgen/vreplgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package main

import (
"bytes"
"flag"
"fmt"
"os"
"strings"
Expand All @@ -31,24 +32,35 @@ import (
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

var onDdl string

func init() {
flag.StringVar(&onDdl, "on_ddl", "ignore", "Set on_ddl value for replication stream - ignore, stop, exec, exec_ignore")
flag.Parse()
}

func main() {
argOffset := 0
if (len(os.Args) > 1 && strings.HasPrefix(os.Args[1], "-")) {
argOffset = 2
}

if (len(os.Args) < (7+argOffset)) {
fmt.Println("Usage: /vreplgen [-on_ddl (ignore|stop|exec|exec_ignore)] <tablet_id> <src_keyspace> <src_shard> <dest_keyspace> <dest_table1> 'filter1' [<dest_table2> 'filter2']...")
os.Exit(1)
}

vtctl := os.Getenv("VTCTLCLIENT")
if (vtctl == "") {
vtctl = "vtctlclient -server localhost:15999"
}
// TODO: DDL ignore or not
if (len(os.Args) < 7) {
fmt.Println("Usage: /vreplgen <tablet_id> <src_keyspace> <src_shard> <dest_keyspace> <dest_table1> 'filter1' [<dest_table2> 'filter2']...")
os.Exit(1)
}
tabletID := os.Args[1]
sourceKeyspace := os.Args[2]
sourceShard := os.Args[3]
destKeyspace := os.Args[4]
tabletID := os.Args[1+argOffset]
sourceKeyspace := os.Args[2+argOffset]
sourceShard := os.Args[3+argOffset]
destKeyspace := os.Args[4+argOffset]
destDbName := "vt_" + destKeyspace
listSize := (len(os.Args) - 5)/2
rules := make([]*binlogdatapb.Rule, listSize)
for i := 5; i < len(os.Args); i = i+2 {
var rules []*binlogdatapb.Rule
for i := 5+argOffset; i < len(os.Args); i = i+2 {
destTable := os.Args[i]
destFilter := os.Args[i+1]
rule := new(binlogdatapb.Rule)
Expand All @@ -59,11 +71,24 @@ func main() {
filter := &binlogdatapb.Filter{
Rules: rules,
}

var onDdlAction binlogdatapb.OnDDLAction
switch onDdl {
case "ignore":
onDdlAction = binlogdatapb.OnDDLAction_IGNORE
case "stop":
onDdlAction = binlogdatapb.OnDDLAction_STOP
case "exec":
onDdlAction = binlogdatapb.OnDDLAction_EXEC
case "exec_ignore":
onDdlAction = binlogdatapb.OnDDLAction_EXEC_IGNORE
}

bls := &binlogdatapb.BinlogSource{
Keyspace: sourceKeyspace,
Shard: sourceShard,
Filter: filter,
OnDdl: binlogdatapb.OnDDLAction_IGNORE,
OnDdl: onDdlAction,
}
val := sqltypes.NewVarBinary(fmt.Sprintf("%v", bls))
var sqlEscaped bytes.Buffer
Expand Down

0 comments on commit 2adeaf2

Please sign in to comment.