diff --git a/base/dcp_client.go b/base/dcp_client.go index 3a39257a61..5697130168 100644 --- a/base/dcp_client.go +++ b/base/dcp_client.go @@ -497,13 +497,7 @@ func (dc *DCPClient) openStreamRequest(vbID uint16) error { if err == nil { err = dc.verifyFailoverLog(vbID, f) if err == nil { - e := streamOpenEvent{ - streamEventCommon: streamEventCommon{ - vbID: vbID, - }, - failoverLogs: f, - } - dc.workerForVbno(vbID).Send(e) + dc.metadata.SetFailoverEntries(vbID, f) } } openStreamError <- err diff --git a/base/dcp_client_stream_event.go b/base/dcp_client_stream_event.go index 87c7ee6bf7..6ab4fa0621 100644 --- a/base/dcp_client_stream_event.go +++ b/base/dcp_client_stream_event.go @@ -15,43 +15,44 @@ import ( sgbucket "github.com/couchbase/sg-bucket" ) +// streamEvent is an interface for events that can be sent on a DCP stream. type streamEvent interface { VbID() uint16 } +// streamEventCommon is a struct that contains common fields for all stream events. type streamEventCommon struct { vbID uint16 streamID uint16 } +// VbID return the vBucket ID for the event. func (sec streamEventCommon) VbID() uint16 { return sec.vbID } +// snapshotEvent represents a DCP snapshot event (opcode 0x56). type snapshotEvent struct { - streamEventCommon startSeq uint64 endSeq uint64 snapshotType gocbcore.SnapshotState + streamEventCommon } +// mutationEvent represents a DCP mutation event (opcode 0x57). type mutationEvent struct { - streamEventCommon + key []byte + value []byte seq uint64 + cas uint64 flags uint32 expiry uint32 - cas uint64 - datatype uint8 collection uint32 - key []byte - value []byte -} - -type streamOpenEvent struct { streamEventCommon - failoverLogs []gocbcore.FailoverEntry + datatype uint8 } +// asFeedEvent converts a mutationEvent to a sgbucket.FeedEvent. func (e mutationEvent) asFeedEvent() sgbucket.FeedEvent { return sgbucket.FeedEvent{ Opcode: sgbucket.FeedOpMutation, @@ -67,16 +68,18 @@ func (e mutationEvent) asFeedEvent() sgbucket.FeedEvent { } } +// deletionEvent represents a DCP deletion event (opcode 0x58). type deletionEvent struct { - streamEventCommon + key []byte + value []byte seq uint64 cas uint64 - datatype uint8 collection uint32 - key []byte - value []byte + datatype uint8 + streamEventCommon } +// asFeedEvent converts a deletionEvent to a sgbucket.FeedEvent. func (e deletionEvent) asFeedEvent() sgbucket.FeedEvent { return sgbucket.FeedEvent{ Opcode: sgbucket.FeedOpDeletion, @@ -90,11 +93,13 @@ func (e deletionEvent) asFeedEvent() sgbucket.FeedEvent { } } +// endStreamEvent represents a DCP end stream event, and the error associated with the stream end (opcode 0x55). type endStreamEvent struct { - streamEventCommon err error + streamEventCommon } +// seqnoAdvancedEvent represents a DCP Seqno advanced event (opcode 0x64). type seqnoAdvancedEvent struct { streamEventCommon seq uint64 diff --git a/base/dcp_client_worker.go b/base/dcp_client_worker.go index d7d4cfbd2a..7f47a49309 100644 --- a/base/dcp_client_worker.go +++ b/base/dcp_client_worker.go @@ -110,8 +110,6 @@ func (w *DCPWorker) Start(wg *sync.WaitGroup) { case event := <-w.eventFeed: vbID := event.VbID() switch e := event.(type) { - case streamOpenEvent: - w.metadata.SetFailoverEntries(e.vbID, e.failoverLogs) case snapshotEvent: // Set pending snapshot - don't persist to meta until we receive first sequence in the snapshot, // to avoid attempting to restart with a new snapshot and old sequence value