Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GODRIVER-2810 [master] Switch to polling monitoring when running within a FaaS environment #1447

Merged
merged 1 commit into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ RUN export DEBIAN_FRONTEND=noninteractive && \
software-properties-common \
gpg \
apt-utils \
libc6-dev \
gcc \
make && \
sudo update-ca-certificates && \
rm -rf /var/lib/apt/lists/*
Expand Down
128 changes: 128 additions & 0 deletions internal/driverutil/hello.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright (C) MongoDB, Inc. 2023-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

package driverutil

import (
"os"
"strings"
)

const AwsLambdaPrefix = "AWS_Lambda_"

const (
// FaaS environment variable names

// EnvVarAWSExecutionEnv is the AWS Execution environment variable.
EnvVarAWSExecutionEnv = "AWS_EXECUTION_ENV"
// EnvVarAWSLambdaRuntimeAPI is the AWS Lambda runtime API variable.
EnvVarAWSLambdaRuntimeAPI = "AWS_LAMBDA_RUNTIME_API"
// EnvVarFunctionsWorkerRuntime is the functions worker runtime variable.
EnvVarFunctionsWorkerRuntime = "FUNCTIONS_WORKER_RUNTIME"
// EnvVarKService is the K Service variable.
EnvVarKService = "K_SERVICE"
// EnvVarFunctionName is the function name variable.
EnvVarFunctionName = "FUNCTION_NAME"
// EnvVarVercel is the Vercel variable.
EnvVarVercel = "VERCEL"
// EnvVarK8s is the K8s veriable.
EnvVarK8s = "KUBERNETES_SERVICE_HOST"
)

const (
// FaaS environment variable names

// EnvVarAWSRegion is the AWS region variable.
EnvVarAWSRegion = "AWS_REGION"
// EnvVarAWSLambdaFunctionMemorySize is the AWS Lambda function memory size variable.
EnvVarAWSLambdaFunctionMemorySize = "AWS_LAMBDA_FUNCTION_MEMORY_SIZE"
// EnvVarFunctionMemoryMB is the function memory in megabytes variable.
EnvVarFunctionMemoryMB = "FUNCTION_MEMORY_MB"
// EnvVarFunctionTimeoutSec is the function timeout in seconds variable.
EnvVarFunctionTimeoutSec = "FUNCTION_TIMEOUT_SEC"
// EnvVarFunctionRegion is the function region variable.
EnvVarFunctionRegion = "FUNCTION_REGION"
// EnvVarVercelRegion is the Vercel region variable.
EnvVarVercelRegion = "VERCEL_REGION"
)

const (
// FaaS environment names used by the client

// EnvNameAWSLambda is the AWS Lambda environment name.
EnvNameAWSLambda = "aws.lambda"
// EnvNameAzureFunc is the Azure Function environment name.
EnvNameAzureFunc = "azure.func"
// EnvNameGCPFunc is the Google Cloud Function environment name.
EnvNameGCPFunc = "gcp.func"
// EnvNameVercel is the Vercel environment name.
EnvNameVercel = "vercel"
)

// GetFaasEnvName parses the FaaS environment variable name and returns the
// corresponding name used by the client. If none of the variables or variables
// for multiple names are populated the client.env value MUST be entirely
// omitted. When variables for multiple "client.env.name" values are present,
// "vercel" takes precedence over "aws.lambda"; any other combination MUST cause
// "client.env" to be entirely omitted.
func GetFaasEnvName() string {
envVars := []string{
EnvVarAWSExecutionEnv,
EnvVarAWSLambdaRuntimeAPI,
EnvVarFunctionsWorkerRuntime,
EnvVarKService,
EnvVarFunctionName,
EnvVarVercel,
}

// If none of the variables are populated the client.env value MUST be
// entirely omitted.
names := make(map[string]struct{})

for _, envVar := range envVars {
val := os.Getenv(envVar)
if val == "" {
continue
}

var name string

switch envVar {
case EnvVarAWSExecutionEnv:
if !strings.HasPrefix(val, AwsLambdaPrefix) {
continue
}

name = EnvNameAWSLambda
case EnvVarAWSLambdaRuntimeAPI:
name = EnvNameAWSLambda
case EnvVarFunctionsWorkerRuntime:
name = EnvNameAzureFunc
case EnvVarKService, EnvVarFunctionName:
name = EnvNameGCPFunc
case EnvVarVercel:
// "vercel" takes precedence over "aws.lambda".
delete(names, EnvNameAWSLambda)

name = EnvNameVercel
}

names[name] = struct{}{}
if len(names) > 1 {
// If multiple names are populated the client.env value
// MUST be entirely omitted.
names = nil

break
}
}

for name := range names {
return name
}

return ""
}
File renamed without changes.
25 changes: 20 additions & 5 deletions internal/test/faas/awslambda/mongodb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ const timeout = 60 * time.Second
// event durations, as well as the number of heartbeats, commands, and open
// conections.
type eventListener struct {
commandCount int
commandDuration int64
heartbeatCount int
heartbeatDuration int64
openConnections int
commandCount int
commandDuration int64
heartbeatAwaitedCount int
heartbeatCount int
heartbeatDuration int64
openConnections int
}

// commandMonitor initializes an event.CommandMonitor that will count the number
Expand Down Expand Up @@ -61,11 +62,19 @@ func (listener *eventListener) serverMonitor() *event.ServerMonitor {
succeeded := func(e *event.ServerHeartbeatSucceededEvent) {
listener.heartbeatCount++
listener.heartbeatDuration += e.DurationNanos

if e.Awaited {
listener.heartbeatAwaitedCount++
}
}

failed := func(e *event.ServerHeartbeatFailedEvent) {
listener.heartbeatCount++
listener.heartbeatDuration += e.DurationNanos

if e.Awaited {
listener.heartbeatAwaitedCount++
}
}

return &event.ServerMonitor{
Expand Down Expand Up @@ -150,6 +159,12 @@ func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events
return gateway500(), fmt.Errorf("failed to delete: %w", err)
}

// Driver must switch to polling monitoring when running within a FaaS
// environment.
if listener.heartbeatAwaitedCount > 0 {
return gateway500(), fmt.Errorf("FaaS environment fialed to switch to polling")
}

var avgCmdDur float64
if count := listener.commandCount; count != 0 {
avgCmdDur = float64(listener.commandDuration) / float64(count)
Expand Down
102 changes: 42 additions & 60 deletions mongo/integration/handshake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,31 +55,18 @@ func TestHandshakeProse(t *testing.T) {
return elems
}

const (
envVarAWSExecutionEnv = "AWS_EXECUTION_ENV"
envVarAWSRegion = "AWS_REGION"
envVarAWSLambdaFunctionMemorySize = "AWS_LAMBDA_FUNCTION_MEMORY_SIZE"
envVarFunctionsWorkerRuntime = "FUNCTIONS_WORKER_RUNTIME"
envVarKService = "K_SERVICE"
envVarFunctionMemoryMB = "FUNCTION_MEMORY_MB"
envVarFunctionTimeoutSec = "FUNCTION_TIMEOUT_SEC"
envVarFunctionRegion = "FUNCTION_REGION"
envVarVercel = "VERCEL"
envVarVercelRegion = "VERCEL_REGION"
)

// Reset the environment variables to avoid environment namespace
// collision.
t.Setenv(envVarAWSExecutionEnv, "")
t.Setenv(envVarFunctionsWorkerRuntime, "")
t.Setenv(envVarKService, "")
t.Setenv(envVarVercel, "")
t.Setenv(envVarAWSRegion, "")
t.Setenv(envVarAWSLambdaFunctionMemorySize, "")
t.Setenv(envVarFunctionMemoryMB, "")
t.Setenv(envVarFunctionTimeoutSec, "")
t.Setenv(envVarFunctionRegion, "")
t.Setenv(envVarVercelRegion, "")
t.Setenv("AWS_EXECUTION_ENV", "")
t.Setenv("FUNCTIONS_WORKER_RUNTIME", "")
t.Setenv("K_SERVICE", "")
t.Setenv("VERCEL", "")
t.Setenv("AWS_REGION", "")
t.Setenv("AWS_LAMBDA_FUNCTION_MEMORY_SIZE", "")
t.Setenv("FUNCTION_MEMORY_MB", "")
t.Setenv("FUNCTION_TIMEOUT_SEC", "")
t.Setenv("FUNCTION_REGION", "")
t.Setenv("VERCEL_REGION", "")

for _, test := range []struct {
name string
Expand All @@ -89,9 +76,9 @@ func TestHandshakeProse(t *testing.T) {
{
name: "1. valid AWS",
env: map[string]string{
envVarAWSExecutionEnv: "AWS_Lambda_java8",
envVarAWSRegion: "us-east-2",
envVarAWSLambdaFunctionMemorySize: "1024",
"AWS_EXECUTION_ENV": "AWS_Lambda_java8",
"AWS_REGION": "us-east-2",
"AWS_LAMBDA_FUNCTION_MEMORY_SIZE": "1024",
},
want: clientMetadata(bson.D{
{Key: "name", Value: "aws.lambda"},
Expand All @@ -102,7 +89,7 @@ func TestHandshakeProse(t *testing.T) {
{
name: "2. valid Azure",
env: map[string]string{
envVarFunctionsWorkerRuntime: "node",
"FUNCTIONS_WORKER_RUNTIME": "node",
},
want: clientMetadata(bson.D{
{Key: "name", Value: "azure.func"},
Expand All @@ -111,10 +98,10 @@ func TestHandshakeProse(t *testing.T) {
{
name: "3. valid GCP",
env: map[string]string{
envVarKService: "servicename",
envVarFunctionMemoryMB: "1024",
envVarFunctionTimeoutSec: "60",
envVarFunctionRegion: "us-central1",
"K_SERVICE": "servicename",
"FUNCTION_MEMORY_MB": "1024",
"FUNCTION_TIMEOUT_SEC": "60",
"FUNCTION_REGION": "us-central1",
},
want: clientMetadata(bson.D{
{Key: "name", Value: "gcp.func"},
Expand All @@ -126,8 +113,8 @@ func TestHandshakeProse(t *testing.T) {
{
name: "4. valid Vercel",
env: map[string]string{
envVarVercel: "1",
envVarVercelRegion: "cdg1",
"VERCEL": "1",
"VERCEL_REGION": "cdg1",
},
want: clientMetadata(bson.D{
{Key: "name", Value: "vercel"},
Expand All @@ -137,16 +124,16 @@ func TestHandshakeProse(t *testing.T) {
{
name: "5. invalid multiple providers",
env: map[string]string{
envVarAWSExecutionEnv: "AWS_Lambda_java8",
envVarFunctionsWorkerRuntime: "node",
"AWS_EXECUTION_ENV": "AWS_Lambda_java8",
"FUNCTIONS_WORKER_RUNTIME": "node",
},
want: clientMetadata(nil),
},
{
name: "6. invalid long string",
env: map[string]string{
envVarAWSExecutionEnv: "AWS_Lambda_java8",
envVarAWSRegion: func() string {
"AWS_EXECUTION_ENV": "AWS_Lambda_java8",
"AWS_REGION": func() string {
var s string
for i := 0; i < 512; i++ {
s += "a"
Expand All @@ -161,8 +148,8 @@ func TestHandshakeProse(t *testing.T) {
{
name: "7. invalid wrong types",
env: map[string]string{
envVarAWSExecutionEnv: "AWS_Lambda_java8",
envVarAWSLambdaFunctionMemorySize: "big",
"AWS_EXECUTION_ENV": "AWS_Lambda_java8",
"AWS_LAMBDA_FUNCTION_MEMORY_SIZE": "big",
},
want: clientMetadata(bson.D{
{Key: "name", Value: "aws.lambda"},
Expand All @@ -171,7 +158,7 @@ func TestHandshakeProse(t *testing.T) {
{
name: "8. Invalid - AWS_EXECUTION_ENV does not start with \"AWS_Lambda_\"",
env: map[string]string{
envVarAWSExecutionEnv: "EC2",
"AWS_EXECUTION_ENV": "EC2",
},
want: clientMetadata(nil),
},
Expand All @@ -188,32 +175,27 @@ func TestHandshakeProse(t *testing.T) {
require.NoError(mt, err, "Ping error: %v", err)

messages := mt.GetProxiedMessages()
handshakeMessage := messages[:1][0]

// First two messages are handshake messages
for idx, pair := range messages[:2] {
hello := handshake.LegacyHello
// Expect "hello" command name with API version.
if os.Getenv("REQUIRE_API_VERSION") == "true" {
hello = "hello"
}

assert.Equal(mt, pair.CommandName, hello, "expected and actual command name at index %d are different", idx)
hello := handshake.LegacyHello
if os.Getenv("REQUIRE_API_VERSION") == "true" {
hello = "hello"
}

sent := pair.Sent
assert.Equal(mt, hello, handshakeMessage.CommandName)

// Lookup the "client" field in the command document.
clientVal, err := sent.Command.LookupErr("client")
require.NoError(mt, err, "expected command %s at index %d to contain client field", sent.Command, idx)
// Lookup the "client" field in the command document.
clientVal, err := handshakeMessage.Sent.Command.LookupErr("client")
require.NoError(mt, err, "expected command %s to contain client field", handshakeMessage.Sent.Command)

got, ok := clientVal.DocumentOK()
require.True(mt, ok, "expected client field to be a document, got %s", clientVal.Type)
got, ok := clientVal.DocumentOK()
require.True(mt, ok, "expected client field to be a document, got %s", clientVal.Type)

wantBytes, err := bson.Marshal(test.want)
require.NoError(mt, err, "error marshaling want document: %v", err)
wantBytes, err := bson.Marshal(test.want)
require.NoError(mt, err, "error marshaling want document: %v", err)

want := bsoncore.Document(wantBytes)
assert.Equal(mt, want, got, "want: %v, got: %v", want, got)
}
want := bsoncore.Document(wantBytes)
assert.Equal(mt, want, got, "want: %v, got: %v", want, got)
})
}
}
3 changes: 2 additions & 1 deletion mongo/integration/sdam_prose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func TestSDAMProse(t *testing.T) {
heartbeatIntervalMtOpts := mtest.NewOptions().
ClientOptions(heartbeatIntervalClientOpts).
CreateCollection(false).
ClientType(mtest.Proxy)
ClientType(mtest.Proxy).
MinServerVersion("4.4") // RTT Monitor / Streaming protocol is not supported for versions < 4.4.
mt.RunOpts("heartbeats processed more frequently", heartbeatIntervalMtOpts, func(mt *mtest.T) {
// Test that setting heartbeat interval to 500ms causes the client to process heartbeats
// approximately every 500ms instead of the default 10s. Note that a Client doesn't
Expand Down
2 changes: 2 additions & 0 deletions mongo/integration/unified/client_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,8 @@ func setClientOptionsFromURIOptions(clientOpts *options.ClientOptions, uriOpts b
clientOpts.SetTimeout(time.Duration(value.(int32)) * time.Millisecond)
case "serverselectiontimeoutms":
clientOpts.SetServerSelectionTimeout(time.Duration(value.(int32)) * time.Millisecond)
case "servermonitoringmode":
clientOpts.SetServerMonitoringMode(value.(string))
default:
return fmt.Errorf("unrecognized URI option %s", key)
}
Expand Down
Loading