Skip to content

Commit

Permalink
return only when a raft leader is elected
Browse files Browse the repository at this point in the history
  • Loading branch information
jabolina1 committed Jun 23, 2020
1 parent 5287aa4 commit 03739a0
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
3 changes: 2 additions & 1 deletion pkg/relt/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (p publisher) join(cluster []raft.Server) error {
// The created peer will bind to a random address to use
// the Raft protocol, and the directory to be used by the
// protocol will be placed at the /tmp folder.
func newPublisher(rabbitmq *core) (*publisher, error) {
func newPublisher(rabbitmq *core, o *raft.Observer) (*publisher, error) {
raftConf := raft.DefaultConfig()
bind, err := GenerateRandomIP()
if err != nil {
Expand Down Expand Up @@ -210,6 +210,7 @@ func newPublisher(rabbitmq *core) (*publisher, error) {
context: ctx,
done: done,
}
ra.RegisterObserver(o)
rabbitmq.ctx.spawn(p.start)
return p, nil
}
13 changes: 12 additions & 1 deletion pkg/relt/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,20 @@ func newCore(relt Relt) (*core, error) {
ask: make(chan bool),
}

leaderChan := make(chan raft.Observation)
observer := raft.NewObserver(leaderChan, false, func(o *raft.Observation) bool {
switch leader := o.Data.(type) {
case raft.LeaderObservation:
return len(leader.Leader) > 0
default:
return false
}
})

var peers []*publisher
var servers []raft.Server
for i := 0; i < relt.configuration.Replication; i++ {
peer, err := newPublisher(c)
peer, err := newPublisher(c, observer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -278,5 +288,6 @@ func newCore(relt Relt) (*core, error) {

c.publishers = peers
c.ctx.spawn(c.start)
<-leaderChan
return c, nil
}

0 comments on commit 03739a0

Please sign in to comment.