Skip to content

Commit

Permalink
Filter readOnly ingesters when sharding
Browse files Browse the repository at this point in the history
  • Loading branch information
danielblando committed Jan 16, 2025
1 parent 7ed0c41 commit 4e75055
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 51 deletions.
5 changes: 5 additions & 0 deletions pkg/compactor/shuffle_sharding_grouper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,11 @@ func (r *RingMock) ShuffleShardWithZoneStability(identifier string, size int) ri
return args.Get(0).(ring.ReadRing)
}

func (r *RingMock) ShuffleShardWithOperation(identifier string, size int, op ring.Operation) ring.ReadRing {
args := r.Called(identifier, size, op)
return args.Get(0).(ring.ReadRing)
}

func (r *RingMock) GetInstanceState(instanceID string) (ring.InstanceState, error) {
args := r.Called(instanceID)
return args.Get(0).(ring.InstanceState), args.Error(1)
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co

// Obtain a subring if required.
if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
subRing = d.ingestersRing.ShuffleShard(userID, limits.IngestionTenantShardSize)
subRing = d.ingestersRing.ShuffleShardWithOperation(userID, limits.IngestionTenantShardSize, ring.WriteShard)
}

keys := append(seriesKeys, metadataKeys...)
Expand Down
17 changes: 13 additions & 4 deletions pkg/ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ type CompareResult int
const (
Equal CompareResult = iota // Both rings contain same exact instances.
EqualButStatesAndTimestamps // Both rings contain the same instances with the same data except states and timestamps (may differ).
EqualButReadOnly // Both rings contain the same instances but Write ring can change due to ReadOnly update
Different // Rings have different set of instances, or their information don't match.
)

Expand All @@ -566,6 +567,7 @@ func (d *Desc) RingCompare(o *Desc) CompareResult {
}

equalStatesAndTimestamps := true
equalReadOnly := true

for name, ing := range d.Ingesters {
oing, ok := o.Ingesters[name]
Expand Down Expand Up @@ -600,14 +602,21 @@ func (d *Desc) RingCompare(o *Desc) CompareResult {
}

if ing.State != oing.State {
equalStatesAndTimestamps = false
if ing.State == READONLY || oing.State == READONLY {
equalReadOnly = false
} else {
equalStatesAndTimestamps = false
}
}
}

if equalStatesAndTimestamps {
return Equal
if !equalReadOnly {
return EqualButReadOnly
}
if !equalStatesAndTimestamps {
return EqualButStatesAndTimestamps
}
return EqualButStatesAndTimestamps
return Equal
}

func GetOrCreateRingDesc(d interface{}) *Desc {
Expand Down
20 changes: 20 additions & 0 deletions pkg/ring/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,21 @@ func TestDesc_RingsCompare(t *testing.T) {
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1"}}},
expected: Equal,
},
"same number of instances, from active to readOnly": {
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE}}},
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: READONLY}}},
expected: EqualButReadOnly,
},
"same number of instances, from readOnly to active": {
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: READONLY}}},
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE}}},
expected: EqualButReadOnly,
},
"same number of instances, prioritize readOnly than timestamp changes": {
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE, Timestamp: 123456}}},
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: READONLY, Timestamp: 789012}}},
expected: EqualButReadOnly,
},
"same single instance, different timestamp": {
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Timestamp: 123456}}},
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Timestamp: 789012}}},
Expand Down Expand Up @@ -440,6 +455,11 @@ func TestDesc_RingsCompare(t *testing.T) {
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing2": {Addr: "addr1", Tokens: []uint32{1, 2, 3}}}},
expected: Different,
},
"same number of instances, prioritize diff than ReadOnly": {
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Zone: "one", State: ACTIVE}}},
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Zone: "two", State: READONLY}}},
expected: Different,
},
}

for testName, testData := range tests {
Expand Down
127 changes: 83 additions & 44 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ type ReadRing interface {
// and size (number of instances).
ShuffleShard(identifier string, size int) ReadRing

// ShuffleShardWithOperation returns a subring for the provided identifier (eg. a tenant ID)
// and size (number of instances) filtered for a given operation.
ShuffleShardWithOperation(identifier string, size int, op Operation) ReadRing

// ShuffleShardWithZoneStability does the same as ShuffleShard but using a different shuffle sharding algorithm.
// It doesn't round up shard size to be divisible to number of zones and make sure when scaling up/down one
// shard size at a time, at most 1 instance can be changed.
Expand Down Expand Up @@ -112,6 +116,8 @@ var (
return s == READONLY
})

WriteShard = NewOp([]InstanceState{ACTIVE, PENDING, LEAVING, JOINING}, func(s InstanceState) bool { return false })

// Read operation that extends the replica set if an instance is not ACTIVE, PENDING, LEAVING, JOINING OR READONLY
Read = NewOp([]InstanceState{ACTIVE, PENDING, LEAVING, JOINING, READONLY}, func(s InstanceState) bool {
// To match Write with extended replica set we have to also increase the
Expand Down Expand Up @@ -222,6 +228,7 @@ type subringCacheKey struct {
shardSize int

zoneStableSharding bool
operation Operation
}

// New creates a new Ring. Being a service, Ring needs to be started to do anything.
Expand Down Expand Up @@ -333,12 +340,16 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
}

rc := prevRing.RingCompare(ringDesc)
if rc == Equal || rc == EqualButStatesAndTimestamps {
if rc == Equal || rc == EqualButStatesAndTimestamps || rc == EqualButReadOnly {
// No need to update tokens or zones. Only states and timestamps
// have changed. (If Equal, nothing has changed, but that doesn't happen
// when watching the ring for updates).
r.mtx.Lock()
r.ringDesc = ringDesc
if rc == EqualButReadOnly && r.shuffledSubringCache != nil {
// Invalidate all cached subrings.
r.shuffledSubringCache = make(map[subringCacheKey]*Ring)
}
r.updateRingMetrics(rc)
r.mtx.Unlock()
return
Expand Down Expand Up @@ -732,11 +743,15 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) {
// - Shuffling: probabilistically, for a large enough cluster each identifier gets a different
// set of instances, with a reduced number of overlapping instances between two identifiers.
func (r *Ring) ShuffleShard(identifier string, size int) ReadRing {
return r.shuffleShardWithCache(identifier, size, false)
return r.shuffleShardWithCache(identifier, size, false, Reporting)
}

func (r *Ring) ShuffleShardWithOperation(identifier string, size int, op Operation) ReadRing {
return r.shuffleShardWithCache(identifier, size, false, op)
}

func (r *Ring) ShuffleShardWithZoneStability(identifier string, size int) ReadRing {
return r.shuffleShardWithCache(identifier, size, true)
return r.shuffleShardWithCache(identifier, size, true, Reporting)
}

// ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes all instances
Expand All @@ -752,26 +767,26 @@ func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPer
return r
}

return r.shuffleShard(identifier, size, lookbackPeriod, now, false)
return r.shuffleShard(identifier, size, lookbackPeriod, now, false, Reporting)
}

func (r *Ring) shuffleShardWithCache(identifier string, size int, zoneStableSharding bool) ReadRing {
func (r *Ring) shuffleShardWithCache(identifier string, size int, zoneStableSharding bool, op Operation) ReadRing {
// Nothing to do if the shard size is not smaller than the actual ring.
if size <= 0 || r.InstancesCount() <= size {
if size <= 0 || (op == Reporting && r.InstancesCount() <= size) {
return r
}

if cached := r.getCachedShuffledSubring(identifier, size, zoneStableSharding); cached != nil {
if cached := r.getCachedShuffledSubring(identifier, size, zoneStableSharding, op); cached != nil {
return cached
}

result := r.shuffleShard(identifier, size, 0, time.Now(), zoneStableSharding)
result := r.shuffleShard(identifier, size, 0, time.Now(), zoneStableSharding, op)

r.setCachedShuffledSubring(identifier, size, zoneStableSharding, result)
r.setCachedShuffledSubring(identifier, size, zoneStableSharding, op, result)
return result
}

func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Duration, now time.Time, zoneStableSharding bool) *Ring {
func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Duration, now time.Time, zoneStableSharding bool, op Operation) *Ring {
lookbackUntil := now.Add(-lookbackPeriod).Unix()

r.mtx.RLock()
Expand All @@ -783,14 +798,16 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
zonesWithExtraInstance int
)

ro := r.getRingForOperation(op)

if r.cfg.ZoneAwarenessEnabled {
if zoneStableSharding {
numInstancesPerZone = size / len(r.ringZones)
zonesWithExtraInstance = size % len(r.ringZones)
numInstancesPerZone = size / len(ro.ringZones)
zonesWithExtraInstance = size % len(ro.ringZones)
} else {
numInstancesPerZone = shardUtil.ShuffleShardExpectedInstancesPerZone(size, len(r.ringZones))
numInstancesPerZone = shardUtil.ShuffleShardExpectedInstancesPerZone(size, len(ro.ringZones))
}
actualZones = r.ringZones
actualZones = ro.ringZones
} else {
numInstancesPerZone = size
actualZones = []string{""}
Expand All @@ -802,12 +819,12 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
for _, zone := range actualZones {
var tokens []uint32

if r.cfg.ZoneAwarenessEnabled {
tokens = r.ringTokensByZone[zone]
if ro.cfg.ZoneAwarenessEnabled {
tokens = ro.ringTokensByZone[zone]
} else {
// When zone-awareness is disabled, we just iterate over 1 single fake zone
// and use all tokens in the ring.
tokens = r.ringTokens
tokens = ro.ringTokens
}

// Initialise the random generator used to select instances in the ring.
Expand Down Expand Up @@ -835,7 +852,7 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
// Wrap p around in the ring.
p %= len(tokens)

info, ok := r.ringInstanceByToken[tokens[p]]
info, ok := ro.ringInstanceByToken[tokens[p]]
if !ok {
// This should never happen unless a bug in the ring code.
panic(ErrInconsistentTokensInfo)
Expand All @@ -847,7 +864,7 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
}

instanceID := info.InstanceID
instance := r.ringDesc.Ingesters[instanceID]
instance := ro.ringDesc.Ingesters[instanceID]
shard[instanceID] = instance

// If the lookback is enabled and this instance has been registered within the lookback period
Expand All @@ -869,27 +886,7 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
}
}

// Build a read-only ring for the shard.
shardDesc := &Desc{Ingesters: shard}
shardTokensByZone := shardDesc.getTokensByZone()

return &Ring{
cfg: r.cfg,
strategy: r.strategy,
ringDesc: shardDesc,
ringTokens: shardDesc.GetTokens(),
ringTokensByZone: shardTokensByZone,
ringZones: getZones(shardTokensByZone),
KVClient: r.KVClient,

// We reference the original map as is in order to avoid copying. It's safe to do
// because this map is immutable by design and it's a superset of the actual instances
// with the subring.
ringInstanceByToken: r.ringInstanceByToken,

// For caching to work, remember these values.
lastTopologyChange: r.lastTopologyChange,
}
return r.copyWithNewDesc(shard)
}

// GetInstanceState returns the current state of an instance or an error if the
Expand Down Expand Up @@ -926,7 +923,7 @@ func (r *Ring) HasInstance(instanceID string) bool {
return ok
}

func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableSharding bool) *Ring {
func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableSharding bool, op Operation) *Ring {
if r.cfg.SubringCacheDisabled {
return nil
}
Expand All @@ -935,7 +932,7 @@ func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableS
defer r.mtx.RUnlock()

// if shuffledSubringCache map is nil, reading it returns default value (nil pointer).
cached := r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding}]
cached := r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding, operation: op}]
if cached == nil {
return nil
}
Expand All @@ -954,7 +951,7 @@ func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableS
return cached
}

func (r *Ring) setCachedShuffledSubring(identifier string, size int, zoneStableSharding bool, subring *Ring) {
func (r *Ring) setCachedShuffledSubring(identifier string, size int, zoneStableSharding bool, op Operation, subring *Ring) {
if subring == nil || r.cfg.SubringCacheDisabled {
return
}
Expand All @@ -966,7 +963,49 @@ func (r *Ring) setCachedShuffledSubring(identifier string, size int, zoneStableS
// (which can happen between releasing the read lock and getting read-write lock).
// Note that shuffledSubringCache can be only nil when set by test.
if r.shuffledSubringCache != nil && r.lastTopologyChange.Equal(subring.lastTopologyChange) {
r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding}] = subring
r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding, operation: op}] = subring
}
}

// getRingForOperation Returns a new ring filtered for operation.
// The ring read lock must be already taken when calling this function.
func (r *Ring) getRingForOperation(op Operation) *Ring {
//Avoid filtering if we are receiving default operation or empty ring
if r.ringDesc == nil || len(r.ringDesc.Ingesters) == 0 || op == Reporting {
return r
}

instanceDescs := make(map[string]InstanceDesc)
for id, instance := range r.ringDesc.Ingesters {
if op.IsInstanceInStateHealthy(instance.State) {
instanceDescs[id] = instance
}
}

return r.copyWithNewDesc(instanceDescs)
}

// copyWithNewDesc Return a new ring with updated data for different InstanceDesc
func (r *Ring) copyWithNewDesc(desc map[string]InstanceDesc) *Ring {
shardDesc := &Desc{Ingesters: desc}
shardTokensByZone := shardDesc.getTokensByZone()

return &Ring{
cfg: r.cfg,
strategy: r.strategy,
ringDesc: shardDesc,
ringTokens: shardDesc.GetTokens(),
ringTokensByZone: shardTokensByZone,
ringZones: getZones(shardTokensByZone),
KVClient: r.KVClient,

// We reference the original map as is in order to avoid copying. It's safe to do
// because this map is immutable by design and it's a superset of the actual instances
// with the subring.
ringInstanceByToken: r.ringInstanceByToken,

// For caching to work, remember these values.
lastTopologyChange: r.lastTopologyChange,
}
}

Expand Down
Loading

0 comments on commit 4e75055

Please sign in to comment.