diff --git a/mongo/change_stream.go b/mongo/change_stream.go index 1bedcc3f8a..8d0a2031de 100644 --- a/mongo/change_stream.go +++ b/mongo/change_stream.go @@ -531,6 +531,12 @@ func (cs *ChangeStream) ID() int64 { return cs.cursor.ID() } +// RemainingBatchLength returns the number of documents left in the current batch. If this returns zero, the subsequent +// call to Next or TryNext will do a network request to fetch the next batch. +func (cs *ChangeStream) RemainingBatchLength() int { + return len(cs.batch) +} + // SetBatchSize sets the number of documents to fetch from the database with // each iteration of the ChangeStream's "Next" or "TryNext" method. This setting // only affects subsequent document batches fetched from the database. diff --git a/mongo/integration/change_stream_test.go b/mongo/integration/change_stream_test.go index b3d0469c36..0a58454f5e 100644 --- a/mongo/integration/change_stream_test.go +++ b/mongo/integration/change_stream_test.go @@ -99,6 +99,7 @@ func TestChangeStream_ReplicaSet(t *testing.T) { // cause an event to occur so the resume token is updated generateEvents(mt, 1) assert.True(mt, cs.Next(context.Background()), "expected next to return true, got false") + assert.Equal(mt, 0, cs.RemainingBatchLength()) firstToken := cs.ResumeToken() // cause an event on a different collection than the one being watched so the server's PBRT is updated @@ -374,6 +375,7 @@ func TestChangeStream_ReplicaSet(t *testing.T) { // Iterate over one event to get resume token assert.True(mt, cs.Next(context.Background()), "expected Next to return true, got false") + assert.Equal(mt, numEvents-1, cs.RemainingBatchLength()) token := cs.ResumeToken() closeStream(cs)