diff --git a/jetstream/kv.go b/jetstream/kv.go index 91a1bdcef..38acbdc61 100644 --- a/jetstream/kv.go +++ b/jetstream/kv.go @@ -166,6 +166,10 @@ type ( // with the same options as Watch. WatchAll(ctx context.Context, opts ...WatchOpt) (KeyWatcher, error) + // WatchFiltered will watch for any updates to keys that match the keys + // argument. It can be configured with the same options as Watch. + WatchFiltered(ctx context.Context, keys []string, opts ...WatchOpt) (KeyWatcher, error) + // Keys will return all keys. // Deprecated: Use ListKeys instead to avoid memory issues. Keys(ctx context.Context, opts ...WatchOpt) ([]string, error) @@ -1069,11 +1073,11 @@ func (w *watcher) Stop() error { return w.sub.Unsubscribe() } -// Watch for any updates to keys that match the keys argument which could include wildcards. -// Watch will send a nil entry when it has received all initial values. -func (kv *kvs) Watch(ctx context.Context, keys string, opts ...WatchOpt) (KeyWatcher, error) { - if !searchKeyValid(keys) { - return nil, fmt.Errorf("%w: %s", ErrInvalidKey, "keys cannot be empty and must be a valid NATS subject") +func (kv *kvs) WatchFiltered(ctx context.Context, keys []string, opts ...WatchOpt) (KeyWatcher, error) { + for _, key := range keys { + if !searchKeyValid(key) { + return nil, fmt.Errorf("%w: %s", ErrInvalidKey, "key cannot be empty and must be a valid NATS subject") + } } var o watchOpts for _, opt := range opts { @@ -1085,10 +1089,20 @@ func (kv *kvs) Watch(ctx context.Context, keys string, opts ...WatchOpt) (KeyWat } // Could be a pattern so don't check for validity as we normally do. - var b strings.Builder - b.WriteString(kv.pre) - b.WriteString(keys) - keys = b.String() + for i, key := range keys { + var b strings.Builder + b.WriteString(kv.pre) + b.WriteString(key) + keys[i] = b.String() + } + + // if no keys are provided, watch all keys + if len(keys) == 0 { + var b strings.Builder + b.WriteString(kv.pre) + b.WriteString(AllKeys) + keys = []string{b.String()} + } // We will block below on placing items on the chan. That is by design. w := &watcher{updates: make(chan KeyValueEntry, 256)} @@ -1161,7 +1175,14 @@ func (kv *kvs) Watch(ctx context.Context, keys string, opts ...WatchOpt) (KeyWat // update() callback. w.mu.Lock() defer w.mu.Unlock() - sub, err := kv.pushJS.Subscribe(keys, update, subOpts...) + var sub *nats.Subscription + var err error + if len(keys) == 1 { + sub, err = kv.pushJS.Subscribe(keys[0], update, subOpts...) + } else { + subOpts = append(subOpts, nats.ConsumerFilterSubjects(keys...)) + sub, err = kv.pushJS.Subscribe("", update, subOpts...) + } if err != nil { return nil, err } @@ -1185,6 +1206,12 @@ func (kv *kvs) Watch(ctx context.Context, keys string, opts ...WatchOpt) (KeyWat return w, nil } +// Watch for any updates to keys that match the keys argument which could include wildcards. +// Watch will send a nil entry when it has received all initial values. +func (kv *kvs) Watch(ctx context.Context, keys string, opts ...WatchOpt) (KeyWatcher, error) { + return kv.WatchFiltered(ctx, []string{keys}, opts...) +} + // WatchAll will invoke the callback for all updates. func (kv *kvs) WatchAll(ctx context.Context, opts ...WatchOpt) (KeyWatcher, error) { return kv.Watch(ctx, AllKeys, opts...) diff --git a/jetstream/test/kv_test.go b/jetstream/test/kv_test.go index 50d2ffa69..42c0d28c3 100644 --- a/jetstream/test/kv_test.go +++ b/jetstream/test/kv_test.go @@ -246,6 +246,22 @@ func TestKeyValueWatch(t *testing.T) { } } } + expectPurgeF := func(t *testing.T, watcher jetstream.KeyWatcher) func(key string, revision uint64) { + return func(key string, revision uint64) { + t.Helper() + select { + case v := <-watcher.Updates(): + if v.Operation() != jetstream.KeyValuePurge { + t.Fatalf("Expected a delete operation but got %+v", v) + } + if v.Revision() != revision { + t.Fatalf("Did not get expected revision: %d vs %d", revision, v.Revision()) + } + case <-time.After(time.Second): + t.Fatalf("Did not receive an update like expected") + } + } + } expectInitDoneF := func(t *testing.T, watcher jetstream.KeyWatcher) func() { return func() { t.Helper() @@ -315,13 +331,27 @@ func TestKeyValueWatch(t *testing.T) { watcher, err = kv.Watch(ctx, "t.*") expectOk(t, err) - defer watcher.Stop() expectInitDone = expectInitDoneF(t, watcher) expectUpdate = expectUpdateF(t, watcher) expectUpdate("t.name", "ik", 8) expectUpdate("t.age", "44", 10) expectInitDone() + watcher.Stop() + + // test watcher with multiple filters + watcher, err = kv.WatchFiltered(ctx, []string{"t.name", "name"}) + expectOk(t, err) + expectInitDone = expectInitDoneF(t, watcher) + expectUpdate = expectUpdateF(t, watcher) + expectPurge := expectPurgeF(t, watcher) + expectUpdate("name", "ik", 3) + expectUpdate("t.name", "ik", 8) + expectInitDone() + err = kv.Purge(ctx, "name") + expectOk(t, err) + expectPurge("name", 11) + defer watcher.Stop() }) t.Run("watcher with history included", func(t *testing.T) { @@ -542,6 +572,48 @@ func TestKeyValueWatch(t *testing.T) { _, err = kv.Watch(ctx, "foo", jetstream.IncludeHistory(), jetstream.UpdatesOnly()) expectErr(t, err, jetstream.ErrInvalidOption) }) + + t.Run("filtered watch with no filters", func(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "WATCH"}) + expectOk(t, err) + + // this should behave like WatchAll + watcher, err := kv.WatchFiltered(ctx, []string{}) + expectOk(t, err) + defer watcher.Stop() + + expectInitDone := expectInitDoneF(t, watcher) + expectUpdate := expectUpdateF(t, watcher) + expectDelete := expectDeleteF(t, watcher) + // Make sure we already got an initial value marker. + expectInitDone() + + _, err = kv.Create(ctx, "name", []byte("derek")) + expectOk(t, err) + expectUpdate("name", "derek", 1) + _, err = kv.Put(ctx, "name", []byte("rip")) + expectOk(t, err) + expectUpdate("name", "rip", 2) + _, err = kv.Put(ctx, "name", []byte("ik")) + expectOk(t, err) + expectUpdate("name", "ik", 3) + _, err = kv.Put(ctx, "age", []byte("22")) + expectOk(t, err) + expectUpdate("age", "22", 4) + _, err = kv.Put(ctx, "age", []byte("33")) + expectOk(t, err) + expectUpdate("age", "33", 5) + expectOk(t, kv.Delete(ctx, "age")) + expectDelete("age", 6) + }) } func TestKeyValueWatchContext(t *testing.T) { diff --git a/kv.go b/kv.go index 3cade1f82..bcb283ff8 100644 --- a/kv.go +++ b/kv.go @@ -65,6 +65,9 @@ type KeyValue interface { Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) // WatchAll will invoke the callback for all updates. WatchAll(opts ...WatchOpt) (KeyWatcher, error) + // WatchFiltered will watch for any updates to keys that match the keys + // argument. It can be configured with the same options as Watch. + WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error) // Keys will return all keys. // Deprecated: Use ListKeys instead to avoid memory issues. Keys(opts ...WatchOpt) ([]string, error) @@ -964,11 +967,11 @@ func (kv *kvs) WatchAll(opts ...WatchOpt) (KeyWatcher, error) { return kv.Watch(AllKeys, opts...) } -// Watch will fire the callback when a key that matches the keys pattern is updated. -// keys needs to be a valid NATS subject. -func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { - if !searchKeyValid(keys) { - return nil, fmt.Errorf("%w: %s", ErrInvalidKey, "keys cannot be empty and must be a valid NATS subject") +func (kv *kvs) WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error) { + for _, key := range keys { + if !searchKeyValid(key) { + return nil, fmt.Errorf("%w: %s", ErrInvalidKey, "key cannot be empty and must be a valid NATS subject") + } } var o watchOpts for _, opt := range opts { @@ -980,10 +983,20 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { } // Could be a pattern so don't check for validity as we normally do. - var b strings.Builder - b.WriteString(kv.pre) - b.WriteString(keys) - keys = b.String() + for i, key := range keys { + var b strings.Builder + b.WriteString(kv.pre) + b.WriteString(key) + keys[i] = b.String() + } + + // if no keys are provided, watch all keys + if len(keys) == 0 { + var b strings.Builder + b.WriteString(kv.pre) + b.WriteString(AllKeys) + keys = []string{b.String()} + } // We will block below on placing items on the chan. That is by design. w := &watcher{updates: make(chan KeyValueEntry, 256), ctx: o.ctx} @@ -1056,7 +1069,14 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { // update() callback. w.mu.Lock() defer w.mu.Unlock() - sub, err := kv.js.Subscribe(keys, update, subOpts...) + var sub *Subscription + var err error + if len(keys) == 1 { + sub, err = kv.js.Subscribe(keys[0], update, subOpts...) + } else { + subOpts = append(subOpts, ConsumerFilterSubjects(keys...)) + sub, err = kv.js.Subscribe("", update, subOpts...) + } if err != nil { return nil, err } @@ -1083,6 +1103,12 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { return w, nil } +// Watch will fire the callback when a key that matches the keys pattern is updated. +// keys needs to be a valid NATS subject. +func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { + return kv.WatchFiltered([]string{keys}, opts...) +} + // Bucket returns the current bucket name (JetStream stream). func (kv *kvs) Bucket() string { return kv.name diff --git a/test/kv_test.go b/test/kv_test.go index 768895845..94703bc43 100644 --- a/test/kv_test.go +++ b/test/kv_test.go @@ -179,6 +179,22 @@ func TestKeyValueWatch(t *testing.T) { } } } + expectPurgeF := func(t *testing.T, watcher nats.KeyWatcher) func(key string, revision uint64) { + return func(key string, revision uint64) { + t.Helper() + select { + case v := <-watcher.Updates(): + if v.Operation() != nats.KeyValuePurge { + t.Fatalf("Expected a delete operation but got %+v", v) + } + if v.Revision() != revision { + t.Fatalf("Did not get expected revision: %d vs %d", revision, v.Revision()) + } + case <-time.After(time.Second): + t.Fatalf("Did not receive an update like expected") + } + } + } expectInitDoneF := func(t *testing.T, watcher nats.KeyWatcher) func() { return func() { t.Helper() @@ -237,13 +253,27 @@ func TestKeyValueWatch(t *testing.T) { watcher, err = kv.Watch("t.*") expectOk(t, err) - defer watcher.Stop() expectInitDone = expectInitDoneF(t, watcher) expectUpdate = expectUpdateF(t, watcher) expectUpdate("t.name", "ik", 8) expectUpdate("t.age", "44", 10) expectInitDone() + watcher.Stop() + + // test watcher with multiple filters + watcher, err = kv.WatchFiltered([]string{"t.name", "name"}) + expectOk(t, err) + expectInitDone = expectInitDoneF(t, watcher) + expectUpdate = expectUpdateF(t, watcher) + expectPurge := expectPurgeF(t, watcher) + expectUpdate("name", "ik", 3) + expectUpdate("t.name", "ik", 8) + expectInitDone() + err = kv.Purge("name") + expectOk(t, err) + expectPurge("name", 11) + defer watcher.Stop() }) t.Run("watcher with history included", func(t *testing.T) { @@ -384,6 +414,46 @@ func TestKeyValueWatch(t *testing.T) { _, err = kv.Watch("foo.") expectErr(t, err, nats.ErrInvalidKey) }) + + t.Run("filtered watch with no filters", func(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "WATCH"}) + expectOk(t, err) + + // this should behave like WatchAll + watcher, err := kv.WatchFiltered([]string{}) + expectOk(t, err) + defer watcher.Stop() + + expectInitDone := expectInitDoneF(t, watcher) + expectUpdate := expectUpdateF(t, watcher) + expectDelete := expectDeleteF(t, watcher) + // Make sure we already got an initial value marker. + expectInitDone() + + _, err = kv.Create("name", []byte("derek")) + expectOk(t, err) + expectUpdate("name", "derek", 1) + _, err = kv.Put("name", []byte("rip")) + expectOk(t, err) + expectUpdate("name", "rip", 2) + _, err = kv.Put("name", []byte("ik")) + expectOk(t, err) + expectUpdate("name", "ik", 3) + _, err = kv.Put("age", []byte("22")) + expectOk(t, err) + expectUpdate("age", "22", 4) + _, err = kv.Put("age", []byte("33")) + expectOk(t, err) + expectUpdate("age", "33", 5) + expectOk(t, kv.Delete("age")) + expectDelete("age", 6) + }) } func TestKeyValueWatchContext(t *testing.T) {