Skip to content

Commit

Permalink
GODRIVER-2810 [master] Switch to polling monitoring when running with…
Browse files Browse the repository at this point in the history
…in a FaaS environment (#1447)

Co-authored-by: Preston Vasquez <prestonvasquez@icloud.com>
  • Loading branch information
blink1073 and prestonvasquez committed Nov 1, 2023
1 parent 9852b9c commit b593d81
Show file tree
Hide file tree
Showing 24 changed files with 1,390 additions and 213 deletions.
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ RUN export DEBIAN_FRONTEND=noninteractive && \
software-properties-common \
gpg \
apt-utils \
libc6-dev \
gcc \
make && \
sudo update-ca-certificates && \
rm -rf /var/lib/apt/lists/*
Expand Down
128 changes: 128 additions & 0 deletions internal/driverutil/hello.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright (C) MongoDB, Inc. 2023-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

package driverutil

import (
"os"
"strings"
)

const AwsLambdaPrefix = "AWS_Lambda_"

const (
// FaaS environment variable names

// EnvVarAWSExecutionEnv is the AWS Execution environment variable.
EnvVarAWSExecutionEnv = "AWS_EXECUTION_ENV"
// EnvVarAWSLambdaRuntimeAPI is the AWS Lambda runtime API variable.
EnvVarAWSLambdaRuntimeAPI = "AWS_LAMBDA_RUNTIME_API"
// EnvVarFunctionsWorkerRuntime is the functions worker runtime variable.
EnvVarFunctionsWorkerRuntime = "FUNCTIONS_WORKER_RUNTIME"
// EnvVarKService is the K Service variable.
EnvVarKService = "K_SERVICE"
// EnvVarFunctionName is the function name variable.
EnvVarFunctionName = "FUNCTION_NAME"
// EnvVarVercel is the Vercel variable.
EnvVarVercel = "VERCEL"
// EnvVarK8s is the K8s veriable.
EnvVarK8s = "KUBERNETES_SERVICE_HOST"
)

const (
// FaaS environment variable names

// EnvVarAWSRegion is the AWS region variable.
EnvVarAWSRegion = "AWS_REGION"
// EnvVarAWSLambdaFunctionMemorySize is the AWS Lambda function memory size variable.
EnvVarAWSLambdaFunctionMemorySize = "AWS_LAMBDA_FUNCTION_MEMORY_SIZE"
// EnvVarFunctionMemoryMB is the function memory in megabytes variable.
EnvVarFunctionMemoryMB = "FUNCTION_MEMORY_MB"
// EnvVarFunctionTimeoutSec is the function timeout in seconds variable.
EnvVarFunctionTimeoutSec = "FUNCTION_TIMEOUT_SEC"
// EnvVarFunctionRegion is the function region variable.
EnvVarFunctionRegion = "FUNCTION_REGION"
// EnvVarVercelRegion is the Vercel region variable.
EnvVarVercelRegion = "VERCEL_REGION"
)

const (
// FaaS environment names used by the client

// EnvNameAWSLambda is the AWS Lambda environment name.
EnvNameAWSLambda = "aws.lambda"
// EnvNameAzureFunc is the Azure Function environment name.
EnvNameAzureFunc = "azure.func"
// EnvNameGCPFunc is the Google Cloud Function environment name.
EnvNameGCPFunc = "gcp.func"
// EnvNameVercel is the Vercel environment name.
EnvNameVercel = "vercel"
)

// GetFaasEnvName parses the FaaS environment variable name and returns the
// corresponding name used by the client. If none of the variables or variables
// for multiple names are populated the client.env value MUST be entirely
// omitted. When variables for multiple "client.env.name" values are present,
// "vercel" takes precedence over "aws.lambda"; any other combination MUST cause
// "client.env" to be entirely omitted.
func GetFaasEnvName() string {
envVars := []string{
EnvVarAWSExecutionEnv,
EnvVarAWSLambdaRuntimeAPI,
EnvVarFunctionsWorkerRuntime,
EnvVarKService,
EnvVarFunctionName,
EnvVarVercel,
}

// If none of the variables are populated the client.env value MUST be
// entirely omitted.
names := make(map[string]struct{})

for _, envVar := range envVars {
val := os.Getenv(envVar)
if val == "" {
continue
}

var name string

switch envVar {
case EnvVarAWSExecutionEnv:
if !strings.HasPrefix(val, AwsLambdaPrefix) {
continue
}

name = EnvNameAWSLambda
case EnvVarAWSLambdaRuntimeAPI:
name = EnvNameAWSLambda
case EnvVarFunctionsWorkerRuntime:
name = EnvNameAzureFunc
case EnvVarKService, EnvVarFunctionName:
name = EnvNameGCPFunc
case EnvVarVercel:
// "vercel" takes precedence over "aws.lambda".
delete(names, EnvNameAWSLambda)

name = EnvNameVercel
}

names[name] = struct{}{}
if len(names) > 1 {
// If multiple names are populated the client.env value
// MUST be entirely omitted.
names = nil

break
}
}

for name := range names {
return name
}

return ""
}
File renamed without changes.
25 changes: 20 additions & 5 deletions internal/test/faas/awslambda/mongodb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ const timeout = 60 * time.Second
// event durations, as well as the number of heartbeats, commands, and open
// conections.
type eventListener struct {
commandCount int
commandDuration int64
heartbeatCount int
heartbeatDuration int64
openConnections int
commandCount int
commandDuration int64
heartbeatAwaitedCount int
heartbeatCount int
heartbeatDuration int64
openConnections int
}

// commandMonitor initializes an event.CommandMonitor that will count the number
Expand Down Expand Up @@ -61,11 +62,19 @@ func (listener *eventListener) serverMonitor() *event.ServerMonitor {
succeeded := func(e *event.ServerHeartbeatSucceededEvent) {
listener.heartbeatCount++
listener.heartbeatDuration += e.DurationNanos

if e.Awaited {
listener.heartbeatAwaitedCount++
}
}

failed := func(e *event.ServerHeartbeatFailedEvent) {
listener.heartbeatCount++
listener.heartbeatDuration += e.DurationNanos

if e.Awaited {
listener.heartbeatAwaitedCount++
}
}

return &event.ServerMonitor{
Expand Down Expand Up @@ -150,6 +159,12 @@ func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events
return gateway500(), fmt.Errorf("failed to delete: %w", err)
}

// Driver must switch to polling monitoring when running within a FaaS
// environment.
if listener.heartbeatAwaitedCount > 0 {
return gateway500(), fmt.Errorf("FaaS environment fialed to switch to polling")
}

var avgCmdDur float64
if count := listener.commandCount; count != 0 {
avgCmdDur = float64(listener.commandDuration) / float64(count)
Expand Down
102 changes: 42 additions & 60 deletions mongo/integration/handshake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,31 +55,18 @@ func TestHandshakeProse(t *testing.T) {
return elems
}

const (
envVarAWSExecutionEnv = "AWS_EXECUTION_ENV"
envVarAWSRegion = "AWS_REGION"
envVarAWSLambdaFunctionMemorySize = "AWS_LAMBDA_FUNCTION_MEMORY_SIZE"
envVarFunctionsWorkerRuntime = "FUNCTIONS_WORKER_RUNTIME"
envVarKService = "K_SERVICE"
envVarFunctionMemoryMB = "FUNCTION_MEMORY_MB"
envVarFunctionTimeoutSec = "FUNCTION_TIMEOUT_SEC"
envVarFunctionRegion = "FUNCTION_REGION"
envVarVercel = "VERCEL"
envVarVercelRegion = "VERCEL_REGION"
)

// Reset the environment variables to avoid environment namespace
// collision.
t.Setenv(envVarAWSExecutionEnv, "")
t.Setenv(envVarFunctionsWorkerRuntime, "")
t.Setenv(envVarKService, "")
t.Setenv(envVarVercel, "")
t.Setenv(envVarAWSRegion, "")
t.Setenv(envVarAWSLambdaFunctionMemorySize, "")
t.Setenv(envVarFunctionMemoryMB, "")
t.Setenv(envVarFunctionTimeoutSec, "")
t.Setenv(envVarFunctionRegion, "")
t.Setenv(envVarVercelRegion, "")
t.Setenv("AWS_EXECUTION_ENV", "")
t.Setenv("FUNCTIONS_WORKER_RUNTIME", "")
t.Setenv("K_SERVICE", "")
t.Setenv("VERCEL", "")
t.Setenv("AWS_REGION", "")
t.Setenv("AWS_LAMBDA_FUNCTION_MEMORY_SIZE", "")
t.Setenv("FUNCTION_MEMORY_MB", "")
t.Setenv("FUNCTION_TIMEOUT_SEC", "")
t.Setenv("FUNCTION_REGION", "")
t.Setenv("VERCEL_REGION", "")

for _, test := range []struct {
name string
Expand All @@ -89,9 +76,9 @@ func TestHandshakeProse(t *testing.T) {
{
name: "1. valid AWS",
env: map[string]string{
envVarAWSExecutionEnv: "AWS_Lambda_java8",
envVarAWSRegion: "us-east-2",
envVarAWSLambdaFunctionMemorySize: "1024",
"AWS_EXECUTION_ENV": "AWS_Lambda_java8",
"AWS_REGION": "us-east-2",
"AWS_LAMBDA_FUNCTION_MEMORY_SIZE": "1024",
},
want: clientMetadata(bson.D{
{Key: "name", Value: "aws.lambda"},
Expand All @@ -102,7 +89,7 @@ func TestHandshakeProse(t *testing.T) {
{
name: "2. valid Azure",
env: map[string]string{
envVarFunctionsWorkerRuntime: "node",
"FUNCTIONS_WORKER_RUNTIME": "node",
},
want: clientMetadata(bson.D{
{Key: "name", Value: "azure.func"},
Expand All @@ -111,10 +98,10 @@ func TestHandshakeProse(t *testing.T) {
{
name: "3. valid GCP",
env: map[string]string{
envVarKService: "servicename",
envVarFunctionMemoryMB: "1024",
envVarFunctionTimeoutSec: "60",
envVarFunctionRegion: "us-central1",
"K_SERVICE": "servicename",
"FUNCTION_MEMORY_MB": "1024",
"FUNCTION_TIMEOUT_SEC": "60",
"FUNCTION_REGION": "us-central1",
},
want: clientMetadata(bson.D{
{Key: "name", Value: "gcp.func"},
Expand All @@ -126,8 +113,8 @@ func TestHandshakeProse(t *testing.T) {
{
name: "4. valid Vercel",
env: map[string]string{
envVarVercel: "1",
envVarVercelRegion: "cdg1",
"VERCEL": "1",
"VERCEL_REGION": "cdg1",
},
want: clientMetadata(bson.D{
{Key: "name", Value: "vercel"},
Expand All @@ -137,16 +124,16 @@ func TestHandshakeProse(t *testing.T) {
{
name: "5. invalid multiple providers",
env: map[string]string{
envVarAWSExecutionEnv: "AWS_Lambda_java8",
envVarFunctionsWorkerRuntime: "node",
"AWS_EXECUTION_ENV": "AWS_Lambda_java8",
"FUNCTIONS_WORKER_RUNTIME": "node",
},
want: clientMetadata(nil),
},
{
name: "6. invalid long string",
env: map[string]string{
envVarAWSExecutionEnv: "AWS_Lambda_java8",
envVarAWSRegion: func() string {
"AWS_EXECUTION_ENV": "AWS_Lambda_java8",
"AWS_REGION": func() string {
var s string
for i := 0; i < 512; i++ {
s += "a"
Expand All @@ -161,8 +148,8 @@ func TestHandshakeProse(t *testing.T) {
{
name: "7. invalid wrong types",
env: map[string]string{
envVarAWSExecutionEnv: "AWS_Lambda_java8",
envVarAWSLambdaFunctionMemorySize: "big",
"AWS_EXECUTION_ENV": "AWS_Lambda_java8",
"AWS_LAMBDA_FUNCTION_MEMORY_SIZE": "big",
},
want: clientMetadata(bson.D{
{Key: "name", Value: "aws.lambda"},
Expand All @@ -171,7 +158,7 @@ func TestHandshakeProse(t *testing.T) {
{
name: "8. Invalid - AWS_EXECUTION_ENV does not start with \"AWS_Lambda_\"",
env: map[string]string{
envVarAWSExecutionEnv: "EC2",
"AWS_EXECUTION_ENV": "EC2",
},
want: clientMetadata(nil),
},
Expand All @@ -188,32 +175,27 @@ func TestHandshakeProse(t *testing.T) {
require.NoError(mt, err, "Ping error: %v", err)

messages := mt.GetProxiedMessages()
handshakeMessage := messages[:1][0]

// First two messages are handshake messages
for idx, pair := range messages[:2] {
hello := handshake.LegacyHello
// Expect "hello" command name with API version.
if os.Getenv("REQUIRE_API_VERSION") == "true" {
hello = "hello"
}

assert.Equal(mt, pair.CommandName, hello, "expected and actual command name at index %d are different", idx)
hello := handshake.LegacyHello
if os.Getenv("REQUIRE_API_VERSION") == "true" {
hello = "hello"
}

sent := pair.Sent
assert.Equal(mt, hello, handshakeMessage.CommandName)

// Lookup the "client" field in the command document.
clientVal, err := sent.Command.LookupErr("client")
require.NoError(mt, err, "expected command %s at index %d to contain client field", sent.Command, idx)
// Lookup the "client" field in the command document.
clientVal, err := handshakeMessage.Sent.Command.LookupErr("client")
require.NoError(mt, err, "expected command %s to contain client field", handshakeMessage.Sent.Command)

got, ok := clientVal.DocumentOK()
require.True(mt, ok, "expected client field to be a document, got %s", clientVal.Type)
got, ok := clientVal.DocumentOK()
require.True(mt, ok, "expected client field to be a document, got %s", clientVal.Type)

wantBytes, err := bson.Marshal(test.want)
require.NoError(mt, err, "error marshaling want document: %v", err)
wantBytes, err := bson.Marshal(test.want)
require.NoError(mt, err, "error marshaling want document: %v", err)

want := bsoncore.Document(wantBytes)
assert.Equal(mt, want, got, "want: %v, got: %v", want, got)
}
want := bsoncore.Document(wantBytes)
assert.Equal(mt, want, got, "want: %v, got: %v", want, got)
})
}
}
3 changes: 2 additions & 1 deletion mongo/integration/sdam_prose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func TestSDAMProse(t *testing.T) {
heartbeatIntervalMtOpts := mtest.NewOptions().
ClientOptions(heartbeatIntervalClientOpts).
CreateCollection(false).
ClientType(mtest.Proxy)
ClientType(mtest.Proxy).
MinServerVersion("4.4") // RTT Monitor / Streaming protocol is not supported for versions < 4.4.
mt.RunOpts("heartbeats processed more frequently", heartbeatIntervalMtOpts, func(mt *mtest.T) {
// Test that setting heartbeat interval to 500ms causes the client to process heartbeats
// approximately every 500ms instead of the default 10s. Note that a Client doesn't
Expand Down
2 changes: 2 additions & 0 deletions mongo/integration/unified/client_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,8 @@ func setClientOptionsFromURIOptions(clientOpts *options.ClientOptions, uriOpts b
clientOpts.SetTimeout(time.Duration(value.(int32)) * time.Millisecond)
case "serverselectiontimeoutms":
clientOpts.SetServerSelectionTimeout(time.Duration(value.(int32)) * time.Millisecond)
case "servermonitoringmode":
clientOpts.SetServerMonitoringMode(value.(string))
default:
return fmt.Errorf("unrecognized URI option %s", key)
}
Expand Down
Loading

0 comments on commit b593d81

Please sign in to comment.