Skip to content

Commit

Permalink
Resolve failures in Race Detector Test
Browse files Browse the repository at this point in the history
  • Loading branch information
prestonvasquez committed Sep 12, 2023
1 parent 60ded84 commit 7e967b0
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 11 deletions.
2 changes: 1 addition & 1 deletion mongo/integration/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
require.NoError(mt, err, "failed to update idValue")
}()

nextCtx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
nextCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)

type splitEvent struct {
Expand Down
16 changes: 16 additions & 0 deletions mongo/integration/unified/client_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type clientEntity struct {

eventsCountLock sync.RWMutex
serverDescriptionChangedEventsCountLock sync.RWMutex
eventProcessMu sync.RWMutex

entityMap *EntityMap

Expand Down Expand Up @@ -471,6 +472,9 @@ func (c *clientEntity) processPoolEvent(evt *event.PoolEvent) {
}

func (c *clientEntity) processServerDescriptionChangedEvent(evt *event.ServerDescriptionChangedEvent) {
c.eventProcessMu.Lock()
defer c.eventProcessMu.Unlock()

if !c.getRecordEvents() {
return
}
Expand All @@ -487,6 +491,9 @@ func (c *clientEntity) processServerDescriptionChangedEvent(evt *event.ServerDes
}

func (c *clientEntity) processServerHeartbeatFailedEvent(evt *event.ServerHeartbeatFailedEvent) {
c.eventProcessMu.Lock()
defer c.eventProcessMu.Unlock()

if !c.getRecordEvents() {
return
}
Expand All @@ -499,6 +506,9 @@ func (c *clientEntity) processServerHeartbeatFailedEvent(evt *event.ServerHeartb
}

func (c *clientEntity) processServerHeartbeatStartedEvent(evt *event.ServerHeartbeatStartedEvent) {
c.eventProcessMu.Lock()
defer c.eventProcessMu.Unlock()

if !c.getRecordEvents() {
return
}
Expand All @@ -511,6 +521,9 @@ func (c *clientEntity) processServerHeartbeatStartedEvent(evt *event.ServerHeart
}

func (c *clientEntity) processServerHeartbeatSucceededEvent(evt *event.ServerHeartbeatSucceededEvent) {
c.eventProcessMu.Lock()
defer c.eventProcessMu.Unlock()

if !c.getRecordEvents() {
return
}
Expand All @@ -523,6 +536,9 @@ func (c *clientEntity) processServerHeartbeatSucceededEvent(evt *event.ServerHea
}

func (c *clientEntity) processTopologyDescriptionChangedEvent(evt *event.TopologyDescriptionChangedEvent) {
c.eventProcessMu.Lock()
defer c.eventProcessMu.Unlock()

if !c.getRecordEvents() {
return
}
Expand Down
6 changes: 3 additions & 3 deletions mongo/integration/unified/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
func newTestContext(
ctx context.Context,
entityMap *EntityMap,
expectedLogMessageCount int,
expectedLogMessageCount uint64,
hasOperationalFailPoint bool,
) context.Context {
ctx = context.WithValue(ctx, operationalFailPointKey, hasOperationalFailPoint)
Expand Down Expand Up @@ -84,6 +84,6 @@ func entities(ctx context.Context) *EntityMap {
return ctx.Value(entitiesKey).(*EntityMap)
}

func expectedLogMessageCount(ctx context.Context) int {
return ctx.Value(expectedLogMessageCountKey).(int)
func expectedLogMessageCount(ctx context.Context) uint64 {
return ctx.Value(expectedLogMessageCountKey).(uint64)
}
17 changes: 12 additions & 5 deletions mongo/integration/unified/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
package unified

import (
"sync"
"sync/atomic"

"go.mongodb.org/mongo-driver/internal/logger"
)

Expand All @@ -20,12 +23,13 @@ type orderedLogMessage struct {
// Logger is the Sink used to captured log messages for logger verification in
// the unified spec tests.
type Logger struct {
lastOrder int
bufSize uint64
lastOrder uint64
logQueue chan orderedLogMessage
bufSize int
mu sync.RWMutex
}

func newLogger(olm *observeLogMessages, bufSize int) *Logger {
func newLogger(olm *observeLogMessages, bufSize uint64) *Logger {
if olm == nil {
return nil
}
Expand All @@ -40,11 +44,14 @@ func newLogger(olm *observeLogMessages, bufSize int) *Logger {
// Info implements the logger.Sink interface's "Info" method for printing log
// messages.
func (log *Logger) Info(level int, msg string, args ...interface{}) {
log.mu.Lock()
defer log.mu.Unlock()

if log.logQueue == nil {
return
}

defer func() { log.lastOrder++ }()
defer func() { atomic.AddUint64(&log.lastOrder, 1) }()

// If the order is greater than the buffer size, we must return. This
// would indicate that the logQueue channel has been closed.
Expand All @@ -64,7 +71,7 @@ func (log *Logger) Info(level int, msg string, args ...interface{}) {
// Send the log message to the "orderedLogMessage" channel for
// validation.
log.logQueue <- orderedLogMessage{
order: log.lastOrder + 1,
order: int(log.lastOrder + 1),
logMessage: logMessage,
}

Expand Down
4 changes: 2 additions & 2 deletions mongo/integration/unified/unified_spec_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,9 @@ func (tc *TestCase) Run(ls LoggerSkipper) error {
}

// Count the number of expected log messages over all clients.
expectedLogCount := 0
var expectedLogCount uint64
for _, clientLog := range tc.ExpectLogMessages {
expectedLogCount += len(clientLog.LogMessages)
expectedLogCount += uint64(len(clientLog.LogMessages))
}

testCtx := newTestContext(context.Background(), tc.entities, expectedLogCount, tc.setsFailPoint())
Expand Down

0 comments on commit 7e967b0

Please sign in to comment.