diff --git a/internal/raft/cluster.go b/internal/raft/cluster.go index 307a4c6e8..a641307a7 100644 --- a/internal/raft/cluster.go +++ b/internal/raft/cluster.go @@ -2,7 +2,6 @@ package raft import ( "context" - "errors" "fmt" "time" @@ -14,11 +13,12 @@ import ( ) type RaftConfig struct { - InitialMembers []string `help:"Initial members" required:""` - ReplicaID uint64 `help:"Node ID" required:""` - DataDir string `help:"Data directory" required:""` - RaftAddress string `help:"Address to advertise to other nodes" required:""` - ListenAddress string `help:"Address to listen for incoming traffic. If empty, RaftAddress will be used."` + InitialMembers []string `help:"Initial members" required:""` + ReplicaID uint64 `help:"Node ID" required:""` + DataDir string `help:"Data directory" required:""` + RaftAddress string `help:"Address to advertise to other nodes" required:""` + ListenAddress string `help:"Address to listen for incoming traffic. If empty, RaftAddress will be used."` + ShardReadyTimeout time.Duration `help:"Timeout for shard to be ready" default:"5s"` // Raft configuration RTT time.Duration `help:"Estimated average round trip time between nodes" default:"200ms"` ElectionRTT uint64 `help:"Election RTT as a multiple of RTT" default:"10"` @@ -196,15 +196,28 @@ func (c *Cluster) AddMember(ctx context.Context, shardID uint64, replicaID uint6 } func (c *Cluster) waitReady(ctx context.Context, shardID uint64) error { - retry := backoff.Backoff{} + retry := backoff.Backoff{ + Min: 5 * time.Millisecond, + Max: c.config.ShardReadyTimeout, + Factor: 2, + Jitter: true, + } for { - _, err := c.nh.SyncGetShardMembership(ctx, shardID) - if err == nil { - return nil + rs, err := c.nh.ReadIndex(shardID, c.config.ShardReadyTimeout) + if err != nil || rs == nil { + return fmt.Errorf("failed to read index: %w", err) } - if !errors.Is(err, dragonboat.ErrShardNotReady) { - return fmt.Errorf("failed to get shard membership: %w", err) + res := <-rs.ResultC() + rs.Release() + if !res.Completed() { + select { + case <-ctx.Done(): + return fmt.Errorf("context cancelled") + case <-time.After(retry.Duration()): + } + continue } - time.Sleep(retry.Duration()) + break } + return nil } diff --git a/internal/raft/cluster_test.go b/internal/raft/cluster_test.go index 8d17435e0..ad350e9c4 100644 --- a/internal/raft/cluster_test.go +++ b/internal/raft/cluster_test.go @@ -128,6 +128,7 @@ func testCluster(t *testing.T, members []string, id uint64, address string) *raf SnapshotEntries: 10, CompactionOverhead: 10, RTT: 10 * time.Millisecond, + ShardReadyTimeout: 1 * time.Second, }) }