diff --git a/pkg/relt/publisher.go b/pkg/relt/publisher.go index eae4147..ea387a8 100644 --- a/pkg/relt/publisher.go +++ b/pkg/relt/publisher.go @@ -163,7 +163,7 @@ func (p publisher) join(cluster []raft.Server) error { // The created peer will bind to a random address to use // the Raft protocol, and the directory to be used by the // protocol will be placed at the /tmp folder. -func newPublisher(rabbitmq *core) (*publisher, error) { +func newPublisher(rabbitmq *core, o *raft.Observer) (*publisher, error) { raftConf := raft.DefaultConfig() bind, err := GenerateRandomIP() if err != nil { @@ -210,6 +210,7 @@ func newPublisher(rabbitmq *core) (*publisher, error) { context: ctx, done: done, } + ra.RegisterObserver(o) rabbitmq.ctx.spawn(p.start) return p, nil } diff --git a/pkg/relt/rabbitmq.go b/pkg/relt/rabbitmq.go index ed9bb7f..761ed8b 100644 --- a/pkg/relt/rabbitmq.go +++ b/pkg/relt/rabbitmq.go @@ -240,10 +240,20 @@ func newCore(relt Relt) (*core, error) { ask: make(chan bool), } + leaderChan := make(chan raft.Observation) + observer := raft.NewObserver(leaderChan, false, func(o *raft.Observation) bool { + switch leader := o.Data.(type) { + case raft.LeaderObservation: + return len(leader.Leader) > 0 + default: + return false + } + }) + var peers []*publisher var servers []raft.Server for i := 0; i < relt.configuration.Replication; i++ { - peer, err := newPublisher(c) + peer, err := newPublisher(c, observer) if err != nil { return nil, err } @@ -278,5 +288,6 @@ func newCore(relt Relt) (*core, error) { c.publishers = peers c.ctx.spawn(c.start) + <-leaderChan return c, nil }