Skip to content

Commit

Permalink
[FIXED] MaxMsgsPerSubject limit not applied when updating from no val…
Browse files Browse the repository at this point in the history
…ue (#6244)

If a stream, either in-memory or file-based, would be updated from
having no `MaxMsgsPer` to having it set the messages would not be
removed.

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
derekcollison authored Dec 12, 2024
2 parents 88ab06b + e659de1 commit cee58f7
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 3 deletions.
5 changes: 4 additions & 1 deletion server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,9 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
if cfg.Storage != FileStorage {
return fmt.Errorf("fileStore requires file storage type in config")
}
if cfg.MaxMsgsPer < -1 {
cfg.MaxMsgsPer = -1
}

fs.mu.Lock()
new_cfg := FileStreamInfo{Created: fs.cfg.Created, StreamConfig: *cfg}
Expand Down Expand Up @@ -611,7 +614,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
fs.ageChk = nil
}

if fs.cfg.MaxMsgsPer > 0 && fs.cfg.MaxMsgsPer < old_cfg.MaxMsgsPer {
if fs.cfg.MaxMsgsPer > 0 && (old_cfg.MaxMsgsPer == 0 || fs.cfg.MaxMsgsPer < old_cfg.MaxMsgsPer) {
fs.enforceMsgPerSubjectLimit(true)
}
fs.mu.Unlock()
Expand Down
7 changes: 5 additions & 2 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,13 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
ms.ageChk = nil
}
// Make sure to update MaxMsgsPer
if cfg.MaxMsgsPer < -1 {
cfg.MaxMsgsPer = -1
}
maxp := ms.maxp
ms.maxp = cfg.MaxMsgsPer
// If the value is smaller we need to enforce that.
if ms.maxp != 0 && ms.maxp < maxp {
// If the value is smaller, or was unset before, we need to enforce that.
if ms.maxp > 0 && (maxp == 0 || ms.maxp < maxp) {
lm := uint64(ms.maxp)
ms.fss.Iter(func(subj []byte, ss *SimpleState) bool {
if ss.Msgs > lm {
Expand Down
44 changes: 44 additions & 0 deletions server/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,47 @@ func TestStoreSubjectStateConsistency(t *testing.T) {
},
)
}

func TestStoreMaxMsgsPerUpdateBug(t *testing.T) {
config := func() StreamConfig {
return StreamConfig{Name: "TEST", Subjects: []string{"foo"}, MaxMsgsPer: 0}
}
testAllStoreAllPermutations(
t, false, config(),
func(t *testing.T, fs StreamStore) {
for i := 0; i < 5; i++ {
_, _, err := fs.StoreMsg("foo", nil, nil)
require_NoError(t, err)
}

ss := fs.State()
require_Equal(t, ss.Msgs, 5)
require_Equal(t, ss.FirstSeq, 1)
require_Equal(t, ss.LastSeq, 5)

// Update max messages per-subject from 0 (infinite) to 1.
// Since the per-subject limit was not specified before, messages should be removed upon config update.
cfg := config()
if _, ok := fs.(*fileStore); ok {
cfg.Storage = FileStorage
} else {
cfg.Storage = MemoryStorage
}
cfg.MaxMsgsPer = 1
err := fs.UpdateConfig(&cfg)
require_NoError(t, err)

// Only one message should remain.
ss = fs.State()
require_Equal(t, ss.Msgs, 1)
require_Equal(t, ss.FirstSeq, 5)
require_Equal(t, ss.LastSeq, 5)

// Update max messages per-subject from 0 (infinite) to an invalid value (< -1).
cfg.MaxMsgsPer = -2
err = fs.UpdateConfig(&cfg)
require_NoError(t, err)
require_Equal(t, cfg.MaxMsgsPer, -1)
},
)
}

0 comments on commit cee58f7

Please sign in to comment.