Skip to content

Commit

Permalink
[FIXED] Improve per-subject state performance & keep ss.Last up-to-…
Browse files Browse the repository at this point in the history
…date (#6235)

Improve performance by only setting `ss.Last` during recalculation, so
only when it's needed.

Also fixes a bug where `ss.Last` was not kept up-to-date if `ss.Msgs >
1`, by introducing a `ss.lastNeedsUpdate` just like
`ss.firstNeedsUpdate`.

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
derekcollison authored Dec 11, 2024
2 parents a3a6551 + 3304ee7 commit 2565d45
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 112 deletions.
165 changes: 102 additions & 63 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2313,8 +2313,8 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
fseq = lseq + 1
for _, subj := range subs {
ss, _ := mb.fss.Find(stringToBytes(subj))
if ss != nil && ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) {
mb.recalculateForSubj(subj, ss)
}
if ss == nil || start > ss.Last || ss.First >= fseq {
continue
Expand Down Expand Up @@ -2443,8 +2443,8 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (
// If we already found a partial then don't do anything else.
return
}
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(bytesToString(bsubj), ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(bytesToString(bsubj), ss)
}
if sseq <= ss.First {
update(ss)
Expand Down Expand Up @@ -2743,8 +2743,8 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
mb.lsts = time.Now().UnixNano()
mb.fss.Match(stringToBytes(subject), func(bsubj []byte, ss *SimpleState) {
subj := string(bsubj)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
}
oss := fss[subj]
if oss.First == 0 { // New
Expand Down Expand Up @@ -3047,8 +3047,8 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
return
}
subj := bytesToString(bsubj)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
}
if sseq <= ss.First {
t += ss.Msgs
Expand Down Expand Up @@ -3335,8 +3335,8 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo
// If we already found a partial then don't do anything else.
return
}
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
}
if sseq <= ss.First {
t += ss.Msgs
Expand Down Expand Up @@ -4009,8 +4009,8 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
info.fblk = i
}
}
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
}
mb.mu.Unlock()
// Re-acquire fs lock
Expand Down Expand Up @@ -4141,8 +4141,8 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {
mb.mu.Lock()
mb.ensurePerSubjectInfoLoaded()
ss, ok := mb.fss.Find(stringToBytes(subj))
if ok && ss != nil && ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ok && ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) {
mb.recalculateForSubj(subj, ss)
}
mb.mu.Unlock()
if ss == nil {
Expand Down Expand Up @@ -7498,9 +7498,6 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
// Update fss
smb.removeSeqPerSubject(sm.subj, mseq)
fs.removePerSubject(sm.subj)
// Need to mark the sequence as deleted. Otherwise, recalculating ss.First
// for per-subject info would be able to find it still.
smb.dmap.Insert(mseq)
}
}

Expand Down Expand Up @@ -7946,73 +7943,115 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) {

ss.Msgs--

// Only one left.
if ss.Msgs == 1 {
// Update first if we need to, we must check if this removal is about what's going to be ss.First
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
// If we're removing the first message, we must recalculate again.
// ss.Last is lazy as well, so need to calculate new ss.First and set ss.Last to it.
if ss.First == seq {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
ss.Last = ss.First
ss.firstNeedsUpdate = false
return
}

// We can lazily calculate the first sequence when needed.
// We can lazily calculate the first/last sequence when needed.
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate
}

// Will recalulate the first sequence for this subject in this block.
// Will recalculate the first and/or last sequence for this subject in this block.
// Will avoid slower path message lookups and scan the cache directly instead.
func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) {
// Need to make sure messages are loaded.
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
return
}
}

// Mark first as updated.
ss.firstNeedsUpdate = false

startSlot := int(startSeq - mb.cache.fseq)
startSlot := int(ss.First - mb.cache.fseq)
if startSlot < 0 {
startSlot = 0
}
if startSlot >= len(mb.cache.idx) {
ss.First = ss.Last
return
} else if startSlot < 0 {
startSlot = 0
}

fseq := startSeq + 1
if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq {
fseq = mbFseq
endSlot := int(ss.Last - mb.cache.fseq)
if endSlot < 0 {
endSlot = 0
}
if endSlot >= len(mb.cache.idx) || startSlot > endSlot {
return
}

var le = binary.LittleEndian
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
continue
if ss.firstNeedsUpdate {
// Mark first as updated.
ss.firstNeedsUpdate = false

fseq := ss.First + 1
if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq {
fseq = mbFseq
}
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
ss.First = ss.Last
return
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
continue
}
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
ss.First = ss.Last
return
}
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
continue
}
ss.First = seq
if ss.Msgs == 1 {
ss.Last = seq
ss.lastNeedsUpdate = false
return
}
// Skip the start slot ahead, if we need to recalculate last we can stop early.
startSlot = slot
break
}
}
}
if ss.lastNeedsUpdate {
// Mark last as updated.
ss.lastNeedsUpdate = false

lseq := ss.Last - 1
if mbLseq := atomic.LoadUint64(&mb.last.seq); lseq > mbLseq {
lseq = mbLseq
}
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
for slot := endSlot; slot >= startSlot; slot-- {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
continue
}
ss.First = seq
return
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
// Can't overwrite ss.Last, just skip.
return
}
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq > lseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
continue
}
// Sequence should never be lower, but guard against it nonetheless.
if seq < ss.First {
seq = ss.First
}
ss.Last = seq
if ss.Msgs == 1 {
ss.First = seq
ss.firstNeedsUpdate = false
}
return
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5031,7 +5031,7 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) {
mb.clearCacheAndOffset()
// Now call with start sequence of 1, the old one
// This will panic without the fix.
mb.recalculateFirstForSubj("foo", 1, ss)
mb.recalculateForSubj("foo", ss)
// Make sure it was update properly.
require_True(t, *ss == SimpleState{Msgs: 1, First: 3, Last: 3, firstNeedsUpdate: false})
}
Expand Down
89 changes: 49 additions & 40 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int
return ErrMaxBytes
}
// If we are here we are at a subject maximum, need to determine if dropping last message gives us enough room.
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subj, ss)
}
sm, ok := ms.msgs[ss.First]
if !ok || memStoreMsgSize(sm.subj, sm.hdr, sm.msg) < memStoreMsgSize(subj, hdr, msg) {
Expand Down Expand Up @@ -428,8 +428,8 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje
var totalSkipped uint64
// We will track start and end sequences as we go.
ms.fss.Match(stringToBytes(filter), func(subj []byte, fss *SimpleState) {
if fss.firstNeedsUpdate {
ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss)
if fss.firstNeedsUpdate || fss.lastNeedsUpdate {
ms.recalculateForSubj(bytesToString(subj), fss)
}
if sseq <= fss.First {
update(fss)
Expand Down Expand Up @@ -583,8 +583,8 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState {
fss := make(map[string]SimpleState)
ms.fss.Match(stringToBytes(subject), func(subj []byte, ss *SimpleState) {
subjs := string(subj)
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subjs, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subjs, ss)
}
oss := fss[subjs]
if oss.First == 0 { // New
Expand Down Expand Up @@ -721,8 +721,8 @@ func (ms *memStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject boo
var totalSkipped uint64
// We will track start and end sequences as we go.
IntersectStree[SimpleState](ms.fss, sl, func(subj []byte, fss *SimpleState) {
if fss.firstNeedsUpdate {
ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss)
if fss.firstNeedsUpdate || fss.lastNeedsUpdate {
ms.recalculateForSubj(bytesToString(subj), fss)
}
if sseq <= fss.First {
update(fss)
Expand Down Expand Up @@ -839,8 +839,8 @@ func (ms *memStore) enforcePerSubjectLimit(subj string, ss *SimpleState) {
return
}
for nmsgs := ss.Msgs; nmsgs > uint64(ms.maxp); nmsgs = ss.Msgs {
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subj, ss)
}
if !ms.removeMsg(ss.First, false) {
break
Expand Down Expand Up @@ -1313,8 +1313,8 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store
if !ok {
continue
}
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subj, ss)
}
if ss.First < fseq {
fseq = ss.First
Expand Down Expand Up @@ -1408,38 +1408,47 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
}
ss.Msgs--

// Only one left.
if ss.Msgs == 1 {
// Update first if we need to, we must check if this removal is about what's going to be ss.First
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
// If we're removing the first message, we must recalculate again.
// ss.Last is lazy as well, so need to calculate new ss.First and set ss.Last to it.
if ss.First == seq {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
ss.Last = ss.First
ss.firstNeedsUpdate = false
return
}

// We can lazily calculate the first sequence when needed.
// We can lazily calculate the first/last sequence when needed.
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate
}

// Will recalculate the first sequence for this subject in this block.
// Will recalculate the first and/or last sequence for this subject.
// Lock should be held.
func (ms *memStore) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
tseq := startSeq + 1
if tseq < ms.state.FirstSeq {
tseq = ms.state.FirstSeq
}
for ; tseq <= ss.Last; tseq++ {
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
ss.First = tseq
ss.firstNeedsUpdate = false
return
func (ms *memStore) recalculateForSubj(subj string, ss *SimpleState) {
if ss.firstNeedsUpdate {
tseq := ss.First + 1
if tseq < ms.state.FirstSeq {
tseq = ms.state.FirstSeq
}
for ; tseq <= ss.Last; tseq++ {
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
ss.First = tseq
ss.firstNeedsUpdate = false
if ss.Msgs == 1 {
ss.Last = tseq
ss.lastNeedsUpdate = false
return
}
break
}
}
}
if ss.lastNeedsUpdate {
tseq := ss.Last - 1
if tseq > ms.state.LastSeq {
tseq = ms.state.LastSeq
}
for ; tseq >= ss.First; tseq-- {
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
ss.Last = tseq
ss.lastNeedsUpdate = false
if ss.Msgs == 1 {
ss.First = tseq
ss.firstNeedsUpdate = false
}
return
}
}
}
}
Expand Down
Loading

0 comments on commit 2565d45

Please sign in to comment.