Skip to content

Commit

Permalink
Revert "GODRIVER-2810 Switch to polling monitoring when running withi…
Browse files Browse the repository at this point in the history
…n a FaaS environment (mongodb#1376)"

This reverts commit 99bdb94.
  • Loading branch information
benjirewis committed Jul 23, 2024
1 parent f1ca84f commit b1f892b
Show file tree
Hide file tree
Showing 24 changed files with 213 additions and 1,390 deletions.
2 changes: 0 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ RUN export DEBIAN_FRONTEND=noninteractive && \
software-properties-common \
gpg \
apt-utils \
libc6-dev \
gcc \
make && \
sudo update-ca-certificates && \
rm -rf /var/lib/apt/lists/*
Expand Down
File renamed without changes.
128 changes: 0 additions & 128 deletions internal/driverutil/hello.go

This file was deleted.

25 changes: 5 additions & 20 deletions internal/test/faas/awslambda/mongodb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ const timeout = 60 * time.Second
// event durations, as well as the number of heartbeats, commands, and open
// connections.
type eventListener struct {
commandCount int
commandDuration int64
heartbeatAwaitedCount int
heartbeatCount int
heartbeatDuration int64
openConnections int
commandCount int
commandDuration int64
heartbeatCount int
heartbeatDuration int64
openConnections int
}

// commandMonitor initializes an event.CommandMonitor that will count the number
Expand Down Expand Up @@ -62,19 +61,11 @@ func (listener *eventListener) serverMonitor() *event.ServerMonitor {
succeeded := func(e *event.ServerHeartbeatSucceededEvent) {
listener.heartbeatCount++
listener.heartbeatDuration += e.DurationNanos

if e.Awaited {
listener.heartbeatAwaitedCount++
}
}

failed := func(e *event.ServerHeartbeatFailedEvent) {
listener.heartbeatCount++
listener.heartbeatDuration += e.DurationNanos

if e.Awaited {
listener.heartbeatAwaitedCount++
}
}

return &event.ServerMonitor{
Expand Down Expand Up @@ -159,12 +150,6 @@ func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events
return gateway500(), fmt.Errorf("failed to delete: %w", err)
}

// Driver must switch to polling monitoring when running within a FaaS
// environment.
if listener.heartbeatAwaitedCount > 0 {
return gateway500(), fmt.Errorf("FaaS environment failed to switch to polling")
}

var avgCmdDur float64
if count := listener.commandCount; count != 0 {
avgCmdDur = float64(listener.commandDuration) / float64(count)
Expand Down
102 changes: 60 additions & 42 deletions mongo/integration/handshake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,31 @@ func TestHandshakeProse(t *testing.T) {
return elems
}

const (
envVarAWSExecutionEnv = "AWS_EXECUTION_ENV"
envVarAWSRegion = "AWS_REGION"
envVarAWSLambdaFunctionMemorySize = "AWS_LAMBDA_FUNCTION_MEMORY_SIZE"
envVarFunctionsWorkerRuntime = "FUNCTIONS_WORKER_RUNTIME"
envVarKService = "K_SERVICE"
envVarFunctionMemoryMB = "FUNCTION_MEMORY_MB"
envVarFunctionTimeoutSec = "FUNCTION_TIMEOUT_SEC"
envVarFunctionRegion = "FUNCTION_REGION"
envVarVercel = "VERCEL"
envVarVercelRegion = "VERCEL_REGION"
)

// Reset the environment variables to avoid environment namespace
// collision.
t.Setenv("AWS_EXECUTION_ENV", "")
t.Setenv("FUNCTIONS_WORKER_RUNTIME", "")
t.Setenv("K_SERVICE", "")
t.Setenv("VERCEL", "")
t.Setenv("AWS_REGION", "")
t.Setenv("AWS_LAMBDA_FUNCTION_MEMORY_SIZE", "")
t.Setenv("FUNCTION_MEMORY_MB", "")
t.Setenv("FUNCTION_TIMEOUT_SEC", "")
t.Setenv("FUNCTION_REGION", "")
t.Setenv("VERCEL_REGION", "")
t.Setenv(envVarAWSExecutionEnv, "")
t.Setenv(envVarFunctionsWorkerRuntime, "")
t.Setenv(envVarKService, "")
t.Setenv(envVarVercel, "")
t.Setenv(envVarAWSRegion, "")
t.Setenv(envVarAWSLambdaFunctionMemorySize, "")
t.Setenv(envVarFunctionMemoryMB, "")
t.Setenv(envVarFunctionTimeoutSec, "")
t.Setenv(envVarFunctionRegion, "")
t.Setenv(envVarVercelRegion, "")

for _, test := range []struct {
name string
Expand All @@ -76,9 +89,9 @@ func TestHandshakeProse(t *testing.T) {
{
name: "1. valid AWS",
env: map[string]string{
"AWS_EXECUTION_ENV": "AWS_Lambda_java8",
"AWS_REGION": "us-east-2",
"AWS_LAMBDA_FUNCTION_MEMORY_SIZE": "1024",
envVarAWSExecutionEnv: "AWS_Lambda_java8",
envVarAWSRegion: "us-east-2",
envVarAWSLambdaFunctionMemorySize: "1024",
},
want: clientMetadata(bson.D{
{Key: "name", Value: "aws.lambda"},
Expand All @@ -89,7 +102,7 @@ func TestHandshakeProse(t *testing.T) {
{
name: "2. valid Azure",
env: map[string]string{
"FUNCTIONS_WORKER_RUNTIME": "node",
envVarFunctionsWorkerRuntime: "node",
},
want: clientMetadata(bson.D{
{Key: "name", Value: "azure.func"},
Expand All @@ -98,10 +111,10 @@ func TestHandshakeProse(t *testing.T) {
{
name: "3. valid GCP",
env: map[string]string{
"K_SERVICE": "servicename",
"FUNCTION_MEMORY_MB": "1024",
"FUNCTION_TIMEOUT_SEC": "60",
"FUNCTION_REGION": "us-central1",
envVarKService: "servicename",
envVarFunctionMemoryMB: "1024",
envVarFunctionTimeoutSec: "60",
envVarFunctionRegion: "us-central1",
},
want: clientMetadata(bson.D{
{Key: "name", Value: "gcp.func"},
Expand All @@ -113,8 +126,8 @@ func TestHandshakeProse(t *testing.T) {
{
name: "4. valid Vercel",
env: map[string]string{
"VERCEL": "1",
"VERCEL_REGION": "cdg1",
envVarVercel: "1",
envVarVercelRegion: "cdg1",
},
want: clientMetadata(bson.D{
{Key: "name", Value: "vercel"},
Expand All @@ -124,16 +137,16 @@ func TestHandshakeProse(t *testing.T) {
{
name: "5. invalid multiple providers",
env: map[string]string{
"AWS_EXECUTION_ENV": "AWS_Lambda_java8",
"FUNCTIONS_WORKER_RUNTIME": "node",
envVarAWSExecutionEnv: "AWS_Lambda_java8",
envVarFunctionsWorkerRuntime: "node",
},
want: clientMetadata(nil),
},
{
name: "6. invalid long string",
env: map[string]string{
"AWS_EXECUTION_ENV": "AWS_Lambda_java8",
"AWS_REGION": func() string {
envVarAWSExecutionEnv: "AWS_Lambda_java8",
envVarAWSRegion: func() string {
var s string
for i := 0; i < 512; i++ {
s += "a"
Expand All @@ -148,8 +161,8 @@ func TestHandshakeProse(t *testing.T) {
{
name: "7. invalid wrong types",
env: map[string]string{
"AWS_EXECUTION_ENV": "AWS_Lambda_java8",
"AWS_LAMBDA_FUNCTION_MEMORY_SIZE": "big",
envVarAWSExecutionEnv: "AWS_Lambda_java8",
envVarAWSLambdaFunctionMemorySize: "big",
},
want: clientMetadata(bson.D{
{Key: "name", Value: "aws.lambda"},
Expand All @@ -158,7 +171,7 @@ func TestHandshakeProse(t *testing.T) {
{
name: "8. Invalid - AWS_EXECUTION_ENV does not start with \"AWS_Lambda_\"",
env: map[string]string{
"AWS_EXECUTION_ENV": "EC2",
envVarAWSExecutionEnv: "EC2",
},
want: clientMetadata(nil),
},
Expand All @@ -175,27 +188,32 @@ func TestHandshakeProse(t *testing.T) {
require.NoError(mt, err, "Ping error: %v", err)

messages := mt.GetProxiedMessages()
handshakeMessage := messages[:1][0]

hello := handshake.LegacyHello
if os.Getenv("REQUIRE_API_VERSION") == "true" {
hello = "hello"
}
// First two messages are handshake messages
for idx, pair := range messages[:2] {
hello := handshake.LegacyHello
// Expect "hello" command name with API version.
if os.Getenv("REQUIRE_API_VERSION") == "true" {
hello = "hello"
}

assert.Equal(mt, hello, handshakeMessage.CommandName)
assert.Equal(mt, pair.CommandName, hello, "expected and actual command name at index %d are different", idx)

// Lookup the "client" field in the command document.
clientVal, err := handshakeMessage.Sent.Command.LookupErr("client")
require.NoError(mt, err, "expected command %s to contain client field", handshakeMessage.Sent.Command)
sent := pair.Sent

got, ok := clientVal.DocumentOK()
require.True(mt, ok, "expected client field to be a document, got %s", clientVal.Type)
// Lookup the "client" field in the command document.
clientVal, err := sent.Command.LookupErr("client")
require.NoError(mt, err, "expected command %s at index %d to contain client field", sent.Command, idx)

wantBytes, err := bson.Marshal(test.want)
require.NoError(mt, err, "error marshaling want document: %v", err)
got, ok := clientVal.DocumentOK()
require.True(mt, ok, "expected client field to be a document, got %s", clientVal.Type)

want := bsoncore.Document(wantBytes)
assert.Equal(mt, want, got, "want: %v, got: %v", want, got)
wantBytes, err := bson.Marshal(test.want)
require.NoError(mt, err, "error marshaling want document: %v", err)

want := bsoncore.Document(wantBytes)
assert.Equal(mt, want, got, "want: %v, got: %v", want, got)
}
})
}
}
3 changes: 1 addition & 2 deletions mongo/integration/sdam_prose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ func TestSDAMProse(t *testing.T) {
heartbeatIntervalMtOpts := mtest.NewOptions().
ClientOptions(heartbeatIntervalClientOpts).
CreateCollection(false).
ClientType(mtest.Proxy).
MinServerVersion("4.4") // RTT Monitor / Streaming protocol is not supported for versions < 4.4.
ClientType(mtest.Proxy)
mt.RunOpts("heartbeats processed more frequently", heartbeatIntervalMtOpts, func(mt *mtest.T) {
// Test that setting heartbeat interval to 500ms causes the client to process heartbeats
// approximately every 500ms instead of the default 10s. Note that a Client doesn't
Expand Down
2 changes: 0 additions & 2 deletions mongo/integration/unified/client_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,6 @@ func setClientOptionsFromURIOptions(clientOpts *options.ClientOptions, uriOpts b
clientOpts.SetTimeout(time.Duration(value.(int32)) * time.Millisecond)
case "serverselectiontimeoutms":
clientOpts.SetServerSelectionTimeout(time.Duration(value.(int32)) * time.Millisecond)
case "servermonitoringmode":
clientOpts.SetServerMonitoringMode(value.(string))
default:
return fmt.Errorf("unrecognized URI option %s", key)
}
Expand Down
Loading

0 comments on commit b1f892b

Please sign in to comment.