Skip to content

Commit

Permalink
source-google-pubsub: fix race condition in sequencing runtime and Pu…
Browse files Browse the repository at this point in the history
…bSub acknowledgements

There was a possible race between adding the runtime acknowledgement signal to
the queue and the document to be emitted to a separate queue. This actually
needs to be done under and exclusive lock, which means that there really isn't
any need for the separate queue at all.
  • Loading branch information
williamhbaker committed Aug 8, 2024
1 parent 9d17606 commit 14f92e7
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 36 deletions.
3 changes: 0 additions & 3 deletions source-google-pubsub/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,6 @@ func (driver) Pull(open *pc.Request_Open, stream *boilerplate.PullOutput) error
group.Go(func() error {
return e.runtimeAckWorker(groupCtx)
})
group.Go(func() error {
return e.emitWorker(groupCtx)
})

for idx, binding := range open.Capture.Bindings {
idx := idx
Expand Down
47 changes: 17 additions & 30 deletions source-google-pubsub/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"sync"
"time"

"cloud.google.com/go/pubsub"
Expand Down Expand Up @@ -35,9 +36,9 @@ type emitMessage struct {
// kind of strategy is at-least-once, since the connector could crash between
// emitting the document and sending the PubSub acknowledgement.
type emitter struct {
stream *boilerplate.PullOutput
pendingAcks chan runtimeAck
emitMessages chan emitMessage
mu sync.Mutex
stream *boilerplate.PullOutput
pendingAcks chan runtimeAck
}

func newEmitter(stream *boilerplate.PullOutput) *emitter {
Expand All @@ -50,12 +51,18 @@ func newEmitter(stream *boilerplate.PullOutput) *emitter {
// practically may only limit how many documents the runtime batches
// together into a single checkpoint, since the connector will pause
// when the channel is full.
pendingAcks: make(chan runtimeAck, 50_000),
emitMessages: make(chan emitMessage),
pendingAcks: make(chan runtimeAck, 50_000),
}
}

func (e *emitter) emit(ctx context.Context, m emitMessage) (runtimeAck, error) {
// Runtime acknowledgement signals must be sequenced in the same order as
// output documents, so this operation is done under a lock to prevent races
// between sequencing the acknowledgement signal and outputting the
// document.
e.mu.Lock()
defer e.mu.Unlock()

ack := make(runtimeAck)

// Add the runtime acknowledgement signal channel to the `pendingAcks`
Expand All @@ -67,14 +74,11 @@ func (e *emitter) emit(ctx context.Context, m emitMessage) (runtimeAck, error) {
case e.pendingAcks <- ack:
}

// Now send the message for serialization and output the runtime. This must
// be done after the acknowledgement signal is queued to prevent races with
// receiving runtime acknowledgements before there are any signal channels
// to close.
select {
case <-ctx.Done():
return nil, ctx.Err()
case e.emitMessages <- m:
// Now output the message.
if doc, err := makeDoc(m); err != nil {
return nil, fmt.Errorf("making document: %w", err)
} else if err := e.stream.DocumentsAndCheckpoint(emptyCheckpoint, true, m.binding, doc); err != nil {
return nil, fmt.Errorf("emitting document: %w", err)
}

return ack, nil
Expand Down Expand Up @@ -111,23 +115,6 @@ func (e *emitter) runtimeAckWorker(ctx context.Context) error {
}
}

// emitWorker reads messages from the channel and serializes those into
// documents which are then output to the runtime.
func (e *emitter) emitWorker(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case m := <-e.emitMessages:
if doc, err := makeDoc(m); err != nil {
return fmt.Errorf("making document: %w", err)
} else if err := e.stream.DocumentsAndCheckpoint(emptyCheckpoint, true, m.binding, doc); err != nil {
return fmt.Errorf("emitting document: %w", err)
}
}
}
}

func makeDoc(m emitMessage) (json.RawMessage, error) {
msg := m.m

Expand Down
3 changes: 0 additions & 3 deletions source-google-pubsub/emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ func TestEmitter(t *testing.T) {
group.Go(func() error {
return emitter.runtimeAckWorker(groupCtx)
})
group.Go(func() error {
return emitter.emitWorker(groupCtx)
})

// Emit some messages concurrently with many goroutines.
numMessages := 50_000
Expand Down

0 comments on commit 14f92e7

Please sign in to comment.