From e24905ec39008f927d662b0cfb74358c0e0d1a78 Mon Sep 17 00:00:00 2001 From: Qingyang Hu Date: Wed, 8 Nov 2023 23:58:48 -0500 Subject: [PATCH] WIP --- event/monitoring.go | 5 +++-- .../unified/collection_operation_execution.go | 1 + mongo/integration/unified/event_verification.go | 8 +++++++- mongo/integration/unified/testrunner_operation.go | 10 ++++++++-- mongo/integration/unified/unified_spec_runner.go | 4 ++++ x/mongo/driver/topology/connection.go | 4 ++++ x/mongo/driver/topology/pool.go | 10 ++++++---- x/mongo/driver/topology/server.go | 1 + 8 files changed, 34 insertions(+), 9 deletions(-) diff --git a/event/monitoring.go b/event/monitoring.go index b10d4b1213..a0f69e3d98 100644 --- a/event/monitoring.go +++ b/event/monitoring.go @@ -106,8 +106,9 @@ type PoolEvent struct { Reason string `json:"reason"` // ServiceID is only set if the Type is PoolCleared and the server is deployed behind a load balancer. This field // can be used to distinguish between individual servers in a load balanced deployment. - ServiceID *primitive.ObjectID `json:"serviceId"` - Error error `json:"error"` + ServiceID *primitive.ObjectID `json:"serviceId"` + Interruption bool `json:"interruptInUseConnections"` + Error error `json:"error"` } // PoolMonitor is a function that allows the user to gain access to events occurring in the pool diff --git a/mongo/integration/unified/collection_operation_execution.go b/mongo/integration/unified/collection_operation_execution.go index fc5c884791..b3ea1f866c 100644 --- a/mongo/integration/unified/collection_operation_execution.go +++ b/mongo/integration/unified/collection_operation_execution.go @@ -1072,6 +1072,7 @@ func executeInsertOne(ctx context.Context, operation *operation) (*operationResu } res, err := coll.InsertOne(ctx, document, opts) + fmt.Println("insert one", err) raw := emptyCoreDocument if res != nil { t, data, err := bson.MarshalValue(res.InsertedID) diff --git a/mongo/integration/unified/event_verification.go b/mongo/integration/unified/event_verification.go index 74655dc3a3..d14601dd3d 100644 --- a/mongo/integration/unified/event_verification.go +++ b/mongo/integration/unified/event_verification.go @@ -61,7 +61,8 @@ type cmapEvent struct { ConnectionCheckedInEvent *struct{} `bson:"connectionCheckedInEvent"` PoolClearedEvent *struct { - HasServiceID *bool `bson:"hasServiceId"` + HasServiceID *bool `bson:"hasServiceId"` + InterruptInUseConnections *bool `bson:"interruptInUseConnections"` } `bson:"poolClearedEvent"` } @@ -353,6 +354,7 @@ func verifyCMAPEvents(client *clientEntity, expectedEvents *expectedEvents) erro } case evt.PoolClearedEvent != nil: var actual *event.PoolEvent + fmt.Println("PoolClearedEvent", evt.PoolClearedEvent, actual) if actual, pooled, err = getNextPoolEvent(pooled, event.PoolCleared); err != nil { return newEventVerificationError(idx, client, err.Error()) } @@ -361,6 +363,10 @@ func verifyCMAPEvents(client *clientEntity, expectedEvents *expectedEvents) erro return newEventVerificationError(idx, client, "error verifying serviceID: %v", err) } } + if expectInterruption := evt.PoolClearedEvent.InterruptInUseConnections; expectInterruption != nil && *expectInterruption != actual.Interruption { + return newEventVerificationError(idx, client, "expected interruptInUseConnections %v, got %v", + expectInterruption, actual.Interruption) + } default: return newEventVerificationError(idx, client, "no expected event set on cmapEvent instance") } diff --git a/mongo/integration/unified/testrunner_operation.go b/mongo/integration/unified/testrunner_operation.go index 72cf3541fb..02c4c8d716 100644 --- a/mongo/integration/unified/testrunner_operation.go +++ b/mongo/integration/unified/testrunner_operation.go @@ -188,8 +188,14 @@ func executeTestRunnerOperation(ctx context.Context, operation *operation, loopD } return nil case "runOnThread": - // TODO - return nil + operationRaw, err := args.LookupErr("operation") + if err != nil { + return fmt.Errorf("'operation' argument not found in runOnThread operation") + } + if err := operationRaw.Unmarshal(operation); err != nil { + return fmt.Errorf("error unmarshalling 'operation' argument: %v", err) + } + return operation.execute(ctx, loopDone) case "waitForThread": // TODO return nil diff --git a/mongo/integration/unified/unified_spec_runner.go b/mongo/integration/unified/unified_spec_runner.go index 40fbff4fa1..1b2fa7bf2c 100644 --- a/mongo/integration/unified/unified_spec_runner.go +++ b/mongo/integration/unified/unified_spec_runner.go @@ -299,8 +299,10 @@ func (tc *TestCase) Run(ls LoggerSkipper) error { } } + fmt.Println("run") for idx, operation := range tc.Operations { if err := operation.execute(testCtx, tc.loopDone); err != nil { + fmt.Println("run err", err, operation) if isSkipTestError(err) { ls.Skip(err) } @@ -308,6 +310,7 @@ func (tc *TestCase) Run(ls LoggerSkipper) error { return fmt.Errorf("error running operation %q at index %d: %v", operation.Name, idx, err) } } + fmt.Println("~run") // Create a validator for log messages and start the workers that will // observe log messages as they occur operationally. @@ -324,6 +327,7 @@ func (tc *TestCase) Run(ls LoggerSkipper) error { if tc.Description != "BulkWrite on server that doesn't support arrayFilters with arrayFilters on second op" { for idx, expectedEvents := range tc.ExpectedEvents { if err := verifyEvents(testCtx, expectedEvents); err != nil { + fmt.Println("verification failed", expectedEvents) return fmt.Errorf("events verification failed at index %d: %v", idx, err) } } diff --git a/x/mongo/driver/topology/connection.go b/x/mongo/driver/topology/connection.go index f427c1c1e7..49166c6495 100644 --- a/x/mongo/driver/topology/connection.go +++ b/x/mongo/driver/topology/connection.go @@ -336,8 +336,10 @@ func (c *connection) cancellationListenerCallback() { } func (c *connection) writeWireMessage(ctx context.Context, wm []byte) error { + // fmt.Println("writeWireMessage") var err error if atomic.LoadInt64(&c.state) != connConnected { + fmt.Println("writeWireMessage close") return ConnectionError{ ConnectionID: c.id, Wrapped: c.err, @@ -392,7 +394,9 @@ func (c *connection) write(ctx context.Context, wm []byte) (err error) { // readWireMessage reads a wiremessage from the connection. The dst parameter will be overwritten. func (c *connection) readWireMessage(ctx context.Context) ([]byte, error) { + // fmt.Println("readWireMessage") if atomic.LoadInt64(&c.state) != connConnected { + fmt.Println("readWireMessage close") return nil, ConnectionError{ ConnectionID: c.id, Wrapped: c.err, diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index c05c9c4f41..cf3d1ebe1a 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -852,6 +852,7 @@ func (p *pool) checkInNoEvent(conn *connection) error { } func (p *pool) interruptInUseConnections() { + fmt.Println("interruptInUseConnections") for _, conn := range p.conns { if conn.inUse && p.stale(conn) { _ = p.removeConnection(conn, reason{ @@ -944,10 +945,11 @@ func (p *pool) clear(err error, serviceID *primitive.ObjectID, cleaningupFns ... if sendEvent && p.monitor != nil { p.monitor.Event(&event.PoolEvent{ - Type: event.PoolCleared, - Address: p.address.String(), - ServiceID: serviceID, - Error: err, + Type: event.PoolCleared, + Address: p.address.String(), + ServiceID: serviceID, + Interruption: len(cleaningupFns) > 0, + Error: err, }) } diff --git a/x/mongo/driver/topology/server.go b/x/mongo/driver/topology/server.go index 7786719778..1ebe87c7d7 100644 --- a/x/mongo/driver/topology/server.go +++ b/x/mongo/driver/topology/server.go @@ -640,6 +640,7 @@ func (s *Server) update() { // Clear the pool once the description has been updated to Unknown. Pass in a nil service ID to clear // because the monitoring routine only runs for non-load balanced deployments in which servers don't return // IDs. + fmt.Println("clear with interruptInUseConnections", err) s.pool.clear(err, nil, s.pool.interruptInUseConnections) } // We're either not handling a timeout error, or we just handled the 2nd consecutive