From 83f6ebb9db4136dfa8817084302f1fb91de74541 Mon Sep 17 00:00:00 2001 From: Juho Makinen Date: Thu, 19 Dec 2024 11:33:03 +1100 Subject: [PATCH] fix: wait for the index to be synced before reporting a shard ready. See https://github.com/lni/dragonboat/issues/357#issuecomment-2155171795 for more info --- internal/raft/cluster.go | 39 +++++++++++++++++++++++------------ internal/raft/cluster_test.go | 1 + 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/internal/raft/cluster.go b/internal/raft/cluster.go index 307a4c6e8..a641307a7 100644 --- a/internal/raft/cluster.go +++ b/internal/raft/cluster.go @@ -2,7 +2,6 @@ package raft import ( "context" - "errors" "fmt" "time" @@ -14,11 +13,12 @@ import ( ) type RaftConfig struct { - InitialMembers []string `help:"Initial members" required:""` - ReplicaID uint64 `help:"Node ID" required:""` - DataDir string `help:"Data directory" required:""` - RaftAddress string `help:"Address to advertise to other nodes" required:""` - ListenAddress string `help:"Address to listen for incoming traffic. If empty, RaftAddress will be used."` + InitialMembers []string `help:"Initial members" required:""` + ReplicaID uint64 `help:"Node ID" required:""` + DataDir string `help:"Data directory" required:""` + RaftAddress string `help:"Address to advertise to other nodes" required:""` + ListenAddress string `help:"Address to listen for incoming traffic. If empty, RaftAddress will be used."` + ShardReadyTimeout time.Duration `help:"Timeout for shard to be ready" default:"5s"` // Raft configuration RTT time.Duration `help:"Estimated average round trip time between nodes" default:"200ms"` ElectionRTT uint64 `help:"Election RTT as a multiple of RTT" default:"10"` @@ -196,15 +196,28 @@ func (c *Cluster) AddMember(ctx context.Context, shardID uint64, replicaID uint6 } func (c *Cluster) waitReady(ctx context.Context, shardID uint64) error { - retry := backoff.Backoff{} + retry := backoff.Backoff{ + Min: 5 * time.Millisecond, + Max: c.config.ShardReadyTimeout, + Factor: 2, + Jitter: true, + } for { - _, err := c.nh.SyncGetShardMembership(ctx, shardID) - if err == nil { - return nil + rs, err := c.nh.ReadIndex(shardID, c.config.ShardReadyTimeout) + if err != nil || rs == nil { + return fmt.Errorf("failed to read index: %w", err) } - if !errors.Is(err, dragonboat.ErrShardNotReady) { - return fmt.Errorf("failed to get shard membership: %w", err) + res := <-rs.ResultC() + rs.Release() + if !res.Completed() { + select { + case <-ctx.Done(): + return fmt.Errorf("context cancelled") + case <-time.After(retry.Duration()): + } + continue } - time.Sleep(retry.Duration()) + break } + return nil } diff --git a/internal/raft/cluster_test.go b/internal/raft/cluster_test.go index 8d17435e0..ad350e9c4 100644 --- a/internal/raft/cluster_test.go +++ b/internal/raft/cluster_test.go @@ -128,6 +128,7 @@ func testCluster(t *testing.T, members []string, id uint64, address string) *raf SnapshotEntries: 10, CompactionOverhead: 10, RTT: 10 * time.Millisecond, + ShardReadyTimeout: 1 * time.Second, }) }