Skip to content

Commit

Permalink
feat(NODE-5197): add server monitoring mode (#3899)
Browse files Browse the repository at this point in the history
  • Loading branch information
durran authored Nov 14, 2023
1 parent 08c9fb4 commit ae4c94a
Show file tree
Hide file tree
Showing 18 changed files with 947 additions and 2,201 deletions.
12 changes: 12 additions & 0 deletions src/connection_string.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
} from './mongo_logger';
import { ReadConcern, type ReadConcernLevel } from './read_concern';
import { ReadPreference, type ReadPreferenceMode } from './read_preference';
import { ServerMonitoringMode } from './sdam/monitor';
import type { TagSet } from './sdam/server_description';
import {
DEFAULT_PK_FACTORY,
Expand Down Expand Up @@ -1055,6 +1056,17 @@ export const OPTIONS = {
serializeFunctions: {
type: 'boolean'
},
serverMonitoringMode: {
default: 'auto',
transform({ values: [value] }) {
if (!Object.values(ServerMonitoringMode).includes(value as any)) {
throw new MongoParseError(
'serverMonitoringMode must be one of `auto`, `poll`, or `stream`'
);
}
return value;
}
},
serverSelectionTimeoutMS: {
default: 30000,
type: 'uint'
Expand Down
3 changes: 2 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,8 @@ export type {
MonitorOptions,
MonitorPrivate,
RTTPinger,
RTTPingerOptions
RTTPingerOptions,
ServerMonitoringMode
} from './sdam/monitor';
export type { Server, ServerEvents, ServerOptions, ServerPrivate } from './sdam/server';
export type {
Expand Down
4 changes: 4 additions & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import { executeOperation } from './operations/execute_operation';
import { RunAdminCommandOperation } from './operations/run_command';
import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_concern';
import { ReadPreference, type ReadPreferenceMode } from './read_preference';
import type { ServerMonitoringMode } from './sdam/monitor';
import type { TagSet } from './sdam/server_description';
import { readPreferenceServerSelector } from './sdam/server_selection';
import type { SrvPoller } from './sdam/srv_polling';
Expand Down Expand Up @@ -257,6 +258,8 @@ export interface MongoClientOptions extends BSONSerializeOptions, SupportedNodeC
proxyUsername?: string;
/** Configures a Socks5 proxy password when the proxy in proxyHost requires username/password authentication. */
proxyPassword?: string;
/** Instructs the driver monitors to use a specific monitoring mode */
serverMonitoringMode?: ServerMonitoringMode;

/** @internal */
srvPoller?: SrvPoller;
Expand Down Expand Up @@ -816,6 +819,7 @@ export interface MongoOptions
proxyPort?: number;
proxyUsername?: string;
proxyPassword?: string;
serverMonitoringMode: ServerMonitoringMode;

/** @internal */
connectionType?: typeof Connection;
Expand Down
69 changes: 50 additions & 19 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { clearTimeout, setTimeout } from 'timers';
import { type Document, Long } from '../bson';
import { connect } from '../cmap/connect';
import { Connection, type ConnectionOptions } from '../cmap/connection';
import { getFAASEnv } from '../cmap/handshake/client_metadata';
import { LEGACY_HELLO_COMMAND } from '../constants';
import { MongoError, MongoErrorLabel, MongoNetworkTimeoutError } from '../error';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
Expand Down Expand Up @@ -44,6 +45,16 @@ function isInCloseState(monitor: Monitor) {
return monitor.s.state === STATE_CLOSED || monitor.s.state === STATE_CLOSING;
}

/** @public */
export const ServerMonitoringMode = Object.freeze({
auto: 'auto',
poll: 'poll',
stream: 'stream'
} as const);

/** @public */
export type ServerMonitoringMode = (typeof ServerMonitoringMode)[keyof typeof ServerMonitoringMode];

/** @internal */
export interface MonitorPrivate {
state: string;
Expand All @@ -55,6 +66,7 @@ export interface MonitorOptions
connectTimeoutMS: number;
heartbeatFrequencyMS: number;
minHeartbeatFrequencyMS: number;
serverMonitoringMode: ServerMonitoringMode;
}

/** @public */
Expand All @@ -73,9 +85,16 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
s: MonitorPrivate;
address: string;
options: Readonly<
Pick<MonitorOptions, 'connectTimeoutMS' | 'heartbeatFrequencyMS' | 'minHeartbeatFrequencyMS'>
Pick<
MonitorOptions,
| 'connectTimeoutMS'
| 'heartbeatFrequencyMS'
| 'minHeartbeatFrequencyMS'
| 'serverMonitoringMode'
>
>;
connectOptions: ConnectionOptions;
isRunningInFaasEnv: boolean;
[kServer]: Server;
[kConnection]?: Connection;
[kCancellationToken]: CancellationToken;
Expand Down Expand Up @@ -103,8 +122,10 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
this.options = Object.freeze({
connectTimeoutMS: options.connectTimeoutMS ?? 10000,
heartbeatFrequencyMS: options.heartbeatFrequencyMS ?? 10000,
minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS ?? 500
minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS ?? 500,
serverMonitoringMode: options.serverMonitoringMode
});
this.isRunningInFaasEnv = getFAASEnv() != null;

const cancellationToken = this[kCancellationToken];
// TODO: refactor this to pull it directly from the pool, requires new ConnectionPool integration
Expand Down Expand Up @@ -207,27 +228,38 @@ function resetMonitorState(monitor: Monitor) {
monitor[kConnection] = undefined;
}

function useStreamingProtocol(monitor: Monitor, topologyVersion: TopologyVersion | null): boolean {
// If we have no topology version we always poll no matter
// what the user provided, since the server does not support
// the streaming protocol.
if (topologyVersion == null) return false;

const serverMonitoringMode = monitor.options.serverMonitoringMode;
if (serverMonitoringMode === ServerMonitoringMode.poll) return false;
if (serverMonitoringMode === ServerMonitoringMode.stream) return true;

// If we are in auto mode, we need to figure out if we're in a FaaS
// environment or not and choose the appropriate mode.
if (monitor.isRunningInFaasEnv) return false;
return true;
}

function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
let start = now();
const topologyVersion = monitor[kServer].description.topologyVersion;
const isAwaitable = topologyVersion != null;
const isAwaitable = useStreamingProtocol(monitor, topologyVersion);
monitor.emit(
Server.SERVER_HEARTBEAT_STARTED,
new ServerHeartbeatStartedEvent(monitor.address, isAwaitable)
);

function failureHandler(err: Error) {
function failureHandler(err: Error, awaited: boolean) {
monitor[kConnection]?.destroy({ force: true });
monitor[kConnection] = undefined;

monitor.emit(
Server.SERVER_HEARTBEAT_FAILED,
new ServerHeartbeatFailedEvent(
monitor.address,
calculateDurationInMs(start),
err,
isAwaitable
)
new ServerHeartbeatFailedEvent(monitor.address, calculateDurationInMs(start), err, awaited)
);

const error = !(err instanceof MongoError)
Expand Down Expand Up @@ -274,7 +306,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {

connection.command(ns('admin.$cmd'), cmd, options, (err, hello) => {
if (err) {
return failureHandler(err);
return failureHandler(err, isAwaitable);
}

if (!('isWritablePrimary' in hello)) {
Expand All @@ -286,15 +318,14 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
const duration =
isAwaitable && rttPinger ? rttPinger.roundTripTime : calculateDurationInMs(start);

const awaited = isAwaitable && hello.topologyVersion != null;
monitor.emit(
Server.SERVER_HEARTBEAT_SUCCEEDED,
new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, awaited)
new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, isAwaitable)
);

// if we are using the streaming protocol then we immediately issue another `started`
// event, otherwise the "check" is complete and return to the main monitor loop
if (awaited) {
// If we are using the streaming protocol then we immediately issue another 'started'
// event, otherwise the "check" is complete and return to the main monitor loop.
if (isAwaitable) {
monitor.emit(
Server.SERVER_HEARTBEAT_STARTED,
new ServerHeartbeatStartedEvent(monitor.address, true)
Expand All @@ -316,7 +347,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
if (err) {
monitor[kConnection] = undefined;

failureHandler(err);
failureHandler(err, false);
return;
}

Expand All @@ -337,7 +368,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
monitor.address,
calculateDurationInMs(start),
conn.hello,
false
useStreamingProtocol(monitor, conn.hello?.topologyVersion)
)
);

Expand Down Expand Up @@ -370,7 +401,7 @@ function monitorServer(monitor: Monitor) {
}

// if the check indicates streaming is supported, immediately reschedule monitoring
if (hello && hello.topologyVersion) {
if (useStreamingProtocol(monitor, hello?.topologyVersion)) {
setTimeout(() => {
if (!isInCloseState(monitor)) {
monitor[kMonitorId]?.wake();
Expand Down
2 changes: 2 additions & 0 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import {
TopologyDescriptionChangedEvent,
TopologyOpeningEvent
} from './events';
import type { ServerMonitoringMode } from './monitor';
import { Server, type ServerEvents, type ServerOptions } from './server';
import { compareTopologyVersion, ServerDescription } from './server_description';
import { readPreferenceServerSelector, type ServerSelector } from './server_selection';
Expand Down Expand Up @@ -143,6 +144,7 @@ export interface TopologyOptions extends BSONSerializeOptions, ServerOptions {
directConnection: boolean;
loadBalanced: boolean;
metadata: ClientMetadata;
serverMonitoringMode: ServerMonitoringMode;
/** MongoDB server API version */
serverApi?: ServerApi;
[featureFlag: symbol]: any;
Expand Down
5 changes: 5 additions & 0 deletions test/lambda/mongodb/app.mjs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import * as assert from 'node:assert/strict';

import { MongoClient } from 'mongodb';

// Creates the client that is cached for all requests, subscribes to
Expand Down Expand Up @@ -30,18 +32,21 @@ mongoClient.on('commandFailed', (event) => {

mongoClient.on('serverHeartbeatStarted', (event) => {
console.log('serverHeartbeatStarted', event);
assert.strictEqual(event.awaited, false);
});

mongoClient.on('serverHeartbeatSucceeded', (event) => {
heartbeatCount++;
totalHeartbeatDuration += event.duration;
console.log('serverHeartbeatSucceeded', event);
assert.strictEqual(event.awaited, false);
});

mongoClient.on('serverHeartbeatFailed', (event) => {
heartbeatCount++;
totalHeartbeatDuration += event.duration;
console.log('serverHeartbeatFailed', event);
assert.strictEqual(event.awaited, false);
});

mongoClient.on('connectionCreated', (event) => {
Expand Down
Loading

0 comments on commit ae4c94a

Please sign in to comment.