-
Notifications
You must be signed in to change notification settings - Fork 542
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
How To Properly Add and Sync A New Node in an Existing Raft Network #357
Comments
Data synchronization is asynchronous. You will know that it's complete when you can read index on the shard from the new node. I use a function like this to block until the shard is ready after calling // readIndex blocks until it can read from a shard, indicating that the local replica is up to date.
func (a *Agent) readIndex(ctx context.Context, shardID uint64) (err error) {
var rs *dragonboat.RequestState
for {
rs, err = a.host.ReadIndex(shardID, time.Second)
if err != nil || rs == nil {
a.log.Infof(`[%05x] Error reading shard index: %s: %v`, shardID, a.HostID(), err)
select {
case <-ctx.Done():
return
case <-a.clock.After(time.Second):
}
continue
}
res := <-rs.ResultC()
rs.Release()
if !res.Completed() {
a.log.Infof(`[%05x] Waiting for other nodes`, shardID)
select {
case <-ctx.Done():
return
case <-a.clock.After(time.Second):
}
continue
}
break
}
return
} Dragonboat v4 has a new WaitReady replica config option that effectively does the same thing. |
Thank you so much for the response! So is it the general flow, can you confirm please?
Am I correct in the steps above? thanks in advance! |
I haven't explored the transition from non-voting to member but that sounds about right. |
Hey @kevburnsjr , Here is how I tested it on a raft cluster with 3 initial members:
err = nh.StartOnDiskReplica(initialMembers, !isInitialMember, func(clusterId, nodeId uint64) statemachine.IOnDiskStateMachine {
log.Infof("creating disk state machine for cluster=%d and node=%d", clusterId, nodeId)
if clusterId != s.config.ClusterID {
log.Fatalf("not compatible cluster version %d != %d", clusterId, s.config.ClusterID)
}
return s
}, nhc)
if err != nil {
return errors.Wrapf(err, "failed to start on-disk raft cluster")
}
if err := s.readIndex(context.Background(), s.config.ClusterID); err != nil {
return errors.Wrap(err, "failed to sync new node to the current state of the cluster")
}
func (s *Service) readIndex(ctx context.Context, shardID uint64) error {
for {
rs, err := s.nh.ReadIndex(shardID, time.Second)
fmt.Println("ReadIndex - rs: ", rs)
if err != nil || rs == nil {
log.Infof("[%05x] Error reading shard index: %v", shardID, err)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
}
continue
}
res := <-rs.ResultC()
fmt.Println("readIndex - res: ", res)
rs.Release()
if !res.Completed() {
log.Infof("[%05x] Waiting for other nodes", shardID)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
}
continue
}
break
}
return nil
} Here is the response from new node: readIndex - res: {0 {0 []} [] {0 0} false false}
time="2024-06-09T16:02:12Z" level=info msg="[00000] Waiting for other nodes"
ReadIndex - rs: &{0 0 0 0 367 {0 0} 0 {0} {0} <nil> <nil> 0x4000bc7440 0x40010aa000 0x40003e6870 false <nil>}
time="2024-06-09T16:02:14Z" level=info msg="[00000] Waiting for other nodes"
readIndex - res: {0 {0 []} [] {0 0} false false}
ReadIndex - rs: &{0 0 0 0 378 {0 0} 0 {0} {0} <nil> <nil> 0x4000bc7380 0x40010aa000 0x40003e6870 false <nil>}
time="2024-06-09T16:02:16Z" level=info msg="[00000] Waiting for other nodes"
readIndex - res: {0 {0 []} [] {0 0} false false}
ReadIndex - rs: &{0 0 0 0 388 {0 0} 0 {0} {0} <nil> <nil> 0x4000bc7380 0x40010aa000 0x40003e6870 false <nil>}
readIndex - res: {0 {0 []} [] {0 0} false false}
time="2024-06-09T16:02:18Z" level=info msg="[00000] Waiting for other nodes"
ReadIndex - rs: &{0 0 0 0 399 {0 0} 0 {0} {0} <nil> <nil> 0x4000bc7380 0x40010aa000 0x40003e6870 false <nil>}
readIndex - res: {0 {0 []} [] {0 0} false false}
time="2024-06-09T16:02:21Z" level=info msg="[00000] Waiting for other nodes" This loop kept forever going on with same message. Anything I might be missing? Do you have any repository that I can check how this process is handled from start to end? Thanks in advance 🙌 |
It's hard to tell what might be wrong without a full example. The |
Once the replica is added to the shard, you can start it on the designated NodeHost, after that everything else is transparent to your application meaning raft logs will be appended to the new nodes and snapshot will be sent if necessary. In general, there is no extra step required, some would add replica as non-vote first and upgrade it to a full member later, this is extensively discussed in the raft thesis, please read the thesis for more details. To make sure the recently added replica is working as expected, you can - issue a ReadIndex on that node, if you can successfully complete it then it means all previous logs have been appended and applied onto your new replica. this is a pretty strong guarantee as it also implies that the new replica is recognized by the leader replica, which in turn requires a consensus from all replicas in the shard. |
@lni @kevburnsjr Thank you so much for your explanations and examples, I managed to resolve most of my issues. I really appreciate it! |
@lni @kevburnsjr panic: not initial recovery but snapshot shrunk
2024-06-11 16:29:59.163950 I | rsm: [00000:70460] opened disk SM, index 0
2024-06-11 16:29:59.164460 C | rsm: [00000:70460], ss.OnDiskIndex (106) > s.onDiskInitIndex (0)
panic: [00000:70460], ss.OnDiskIndex (106) > s.onDiskInitIndex (0)
goroutine 423 [running]:
github.com/lni/goutils/logutil/capnslog.(*PackageLogger).Panicf(0x1400128c060, {0x1039581df?, 0x10?}, {0x14001412630?, 0x14000591070?, 0x14000054dd0?})
/Users/candostyavuz/go/pkg/mod/github.com/lni/goutils@v1.4.0/logutil/capnslog/pkg_logger.go:88 +0xb4
github.com/lni/dragonboat/v4/logger.(*capnsLog).Panicf(0x14000591050?, {0x1039581df?, 0x10?}, {0x14001412630?, 0x103cefc01?, 0x140003e0098?})
/Users/candostyavuz/go/pkg/mod/github.com/lni/dragonboat/v4@v4.0.0-20231222133740-1d6e2d76cd57/logger/capnslogger.go:74 +0x28
github.com/lni/dragonboat/v4/logger.(*dragonboatLogger).Panicf(0x140003e00b0?, {0x1039581df, 0x30}, {0x14001412630, 0x3, 0x3})
/Users/candostyavuz/go/pkg/mod/github.com/lni/dragonboat/v4@v4.0.0-20231222133740-1d6e2d76cd57/logger/logger.go:135 +0x54
github.com/lni/dragonboat/v4/internal/rsm.(*StateMachine).checkPartialSnapshotApplyOnDiskSM(0x14000572400, {{0x140001f4d20, 0xd1}, 0x0, 0x6a, 0x2, {0x65, 0x14001412390, 0x0, 0x0, ...}, ...}, ...) or sometimes: panic: invalid commitTo index 105, lastIndex() 0 |
I am attempting to add a new node into an existing Raft cluster to increase the number of nodes in the network. My process involves using Dragonboat's functions like
SyncGetShardMembership
,SyncRequestAddNonVoting
, andSyncRequestAddReplica
. However, I'm facing issues with correctly syncing and propagating the new node information across the existing cluster.I see no information about this very important concept in docs, examples or in discussions. So, I really be grateful if you can provide me a flow that allow me to dynamically add / remove nodes in a running Raft network.
SyncRequestAddReplica
and then start the node with related Dragonboat start method, does it handle state sync for the new node automatically or else what should I do to ensure the new node successfully synchronized with the network and connect to existing leader?Current Approach and Problems
SyncGetShardMembership
to fetch the current members of the cluster.SyncRequestAddNonVoting
. The nodeID and raftAddress for the new node are derived from our internal configurations and passed appropriately. In this scenario membership config has not been updated whenSyncGetShardMembership
is called and logged again afterwards.SyncRequestAddReplica
directly, the membership is updated and new node info is added but only one node, which probably processed that operation. How should I propagate this updated member list to the other nodes? Isn'tSyncRequestAddReplica
handling this consensus automatically?In both scenarios, when starting new node with
join
flag = true and initial members list as empty by usingStartOnDiskReplica
function, the new node has absolutely no info about who the leader is or it can't fetch the state info about existing network. It feels like there's no automatic synchronization mechanism when we register & start the new node.Here are some function I am using to implement the logic I described above:
Leader logic which runs in go routine with time ticker:
To be more specific, the new node gives
can't find leader in config, that's not normal
error after registered & started with steps described above...I appreciate any guidance or insights into how to correctly manage node additions in a Raft cluster using Dragonboat, ensuring consistency and stability throughout the cluster. Thanks in advance 🙌
The text was updated successfully, but these errors were encountered: