Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt committed Aug 22, 2024
1 parent 00ac76d commit 3195f98
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 8 deletions.
61 changes: 61 additions & 0 deletions go/vt/events/eventer/eventer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package eventer

import (
"github.com/spf13/pflag"

"vitess.io/vitess/go/vt/log"
topoevents "vitess.io/vitess/go/vt/topotools/events"
)

var (
eventers map[string]Eventer
eventerName string = "log"
)

func RegisterFlags(fs *pflag.FlagSet) {
fs.StringVar(&eventerName, "eventer", eventerName, "the eventer to be used to broadcast internal events")
}

type Source string

var (
Vtctld Source = "vtctld"
Vtorc Source = "vtorc"
)

type Eventer interface {
EmergencyReparentShard(src Source, ev *topoevents.Reparent, err error) error
PlannedReparentShard(src Source, ev *topoevents.Reparent, err error) error
}

func New() Eventer {
if ev, ok := eventers[eventerName]; ok {
return ev
}
return nil
}

type LogEventer struct{}

func NewLogEventer() Eventer { return &LogEventer{} }

func (le *LogEventer) EmergencyReparentShard(src Source, ev *topoevents.Reparent, err error) error {
log.Infof("Received EmergencyReparentShardEvent: source=%s, err=%v, event=%v", src, err, ev)

return nil
}

func (le *LogEventer) PlannedReparentShard(src Source, ev *topoevents.Reparent, err error) error {
log.Infof("Received PlannedReparentShardEvent: source=%s, err=%v, event=%v", src, err, ev)

return nil
}

func RegisterEventer(name string, e Eventer) {
eventers[name] = e
}

func init() {
eventers = make(map[string]Eventer, 0)
RegisterEventer("log", NewLogEventer())
}
5 changes: 4 additions & 1 deletion go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/events/eventer"
hk "vitess.io/vitess/go/vt/hook"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -84,6 +85,7 @@ type VtctldServer struct {
ts *topo.Server
tmc tmclient.TabletManagerClient
ws *workflow.Server
evr eventer.Eventer
}

// NewVtctldServer returns a new VtctldServer for the given topo server.
Expand All @@ -94,6 +96,7 @@ func NewVtctldServer(ts *topo.Server) *VtctldServer {
ts: ts,
tmc: tmc,
ws: workflow.NewServer(ts, tmc),
evr: eventer.New(),
}
}

Expand Down Expand Up @@ -2235,7 +2238,7 @@ func (s *VtctldServer) PlannedReparentShard(ctx context.Context, req *vtctldatap
logstream = append(logstream, e)
})

ev, err := reparentutil.NewPlannedReparenter(s.ts, s.tmc, logger).ReparentShard(ctx,
ev, err := reparentutil.NewPlannedReparenter(s.ts, s.tmc, logger, s.evr).ReparentShard(ctx,
req.Keyspace,
req.Shard,
reparentutil.PlannedReparentOptions{
Expand Down
17 changes: 10 additions & 7 deletions go/vt/vtctl/reparentutil/planned_reparenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"vitess.io/vitess/go/event"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/events/eventer"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
Expand All @@ -41,9 +42,10 @@ import (

// PlannedReparenter performs PlannedReparentShard operations.
type PlannedReparenter struct {
ts *topo.Server
tmc tmclient.TabletManagerClient
logger logutil.Logger
ts *topo.Server
tmc tmclient.TabletManagerClient
logger logutil.Logger
eventer eventer.Eventer
}

// PlannedReparentOptions provides optional parameters to PlannedReparentShard
Expand All @@ -67,11 +69,12 @@ type PlannedReparentOptions struct {
// TabletManagerClient, and logger.
//
// Providing a nil logger instance is allowed.
func NewPlannedReparenter(ts *topo.Server, tmc tmclient.TabletManagerClient, logger logutil.Logger) *PlannedReparenter {
func NewPlannedReparenter(ts *topo.Server, tmc tmclient.TabletManagerClient, logger logutil.Logger, eventer eventer.Eventer) *PlannedReparenter {
pr := PlannedReparenter{
ts: ts,
tmc: tmc,
logger: logger,
ts: ts,
tmc: tmc,
logger: logger,
eventer: eventer,
}

if pr.logger == nil {
Expand Down

0 comments on commit 3195f98

Please sign in to comment.