Skip to content

Commit

Permalink
refactor(NODE-5778): change kConnection to connection, split polling …
Browse files Browse the repository at this point in the history
…and streaming functions, define exhaustCommand (#3942)
  • Loading branch information
nbbeeken authored Dec 7, 2023
1 parent 80999b5 commit 9126b3e
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 52 deletions.
4 changes: 4 additions & 0 deletions src/cmap/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ export class OpQueryResponse {
bsonRegExp?: boolean;
index?: number;

/** moreToCome is an OP_MSG only concept */
moreToCome = false;

constructor(
message: Buffer,
msgHeader: MessageHeader,
Expand Down Expand Up @@ -598,6 +601,7 @@ export class OpMsgResponse {
fromCompressed?: boolean;
responseFlags: number;
checksumPresent: boolean;
/** Indicates the server will be sending more responses on this connection */
moreToCome: boolean;
exhaustAllowed: boolean;
useBigInt64: boolean;
Expand Down
20 changes: 19 additions & 1 deletion src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,15 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
callback(err);
}
}

exhaustCommand(
ns: MongoDBNamespace,
command: Document,
options: CommandOptions | undefined,
replyListener: Callback
) {
return this.command(ns, command, options, replyListener);
}
}

/** @internal */
Expand Down Expand Up @@ -1156,6 +1165,15 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {

return document;
}

exhaustCommand(
_ns: MongoDBNamespace,
_command: Document,
_options: CommandOptions,
_replyListener: Callback
) {
throw new Error('NODE-5742: not implemented.');
}
}

const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4;
Expand Down Expand Up @@ -1253,7 +1271,7 @@ export async function* readMany(
const response = await decompressResponse(message);
yield response;

if (!('moreToCome' in response) || !response.moreToCome) {
if (!response.moreToCome) {
return;
}
}
Expand Down
107 changes: 56 additions & 51 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ const kServer = Symbol('server');
/** @internal */
const kMonitorId = Symbol('monitorId');
/** @internal */
const kConnection = Symbol('connection');
/** @internal */
const kCancellationToken = Symbol('cancellationToken');
/** @internal */
const kRoundTripTime = Symbol('roundTripTime');
Expand Down Expand Up @@ -94,21 +92,17 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
connectOptions: ConnectionOptions;
isRunningInFaasEnv: boolean;
[kServer]: Server;
[kConnection]?: Connection;
connection: Connection | null;
[kCancellationToken]: CancellationToken;
/** @internal */
[kMonitorId]?: MonitorInterval;
rttPinger?: RTTPinger;

get connection(): Connection | undefined {
return this[kConnection];
}

constructor(server: Server, options: MonitorOptions) {
super();

this[kServer] = server;
this[kConnection] = undefined;
this.connection = null;
this[kCancellationToken] = new CancellationToken();
this[kCancellationToken].setMaxListeners(Infinity);
this[kMonitorId] = undefined;
Expand Down Expand Up @@ -219,8 +213,8 @@ function resetMonitorState(monitor: Monitor) {

monitor[kCancellationToken].emit('cancel');

monitor[kConnection]?.destroy({ force: true });
monitor[kConnection] = undefined;
monitor.connection?.destroy({ force: true });
monitor.connection = null;
}

function useStreamingProtocol(monitor: Monitor, topologyVersion: TopologyVersion | null): boolean {
Expand All @@ -241,16 +235,17 @@ function useStreamingProtocol(monitor: Monitor, topologyVersion: TopologyVersion

function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
let start = now();
let awaited: boolean;
const topologyVersion = monitor[kServer].description.topologyVersion;
const isAwaitable = useStreamingProtocol(monitor, topologyVersion);
monitor.emit(
Server.SERVER_HEARTBEAT_STARTED,
new ServerHeartbeatStartedEvent(monitor.address, isAwaitable)
);

function failureHandler(err: Error, awaited: boolean) {
monitor[kConnection]?.destroy({ force: true });
monitor[kConnection] = undefined;
function onHeartbeatFailed(err: Error) {
monitor.connection?.destroy({ force: true });
monitor.connection = null;

monitor.emit(
Server.SERVER_HEARTBEAT_FAILED,
Expand All @@ -269,7 +264,39 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
callback(err);
}

const connection = monitor[kConnection];
function onHeartbeatSucceeded(hello: Document) {
if (!('isWritablePrimary' in hello)) {
// Provide hello-style response document.
hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND];
}

const duration =
isAwaitable && monitor.rttPinger
? monitor.rttPinger.roundTripTime
: calculateDurationInMs(start);

monitor.emit(
Server.SERVER_HEARTBEAT_SUCCEEDED,
new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, isAwaitable)
);

// If we are using the streaming protocol then we immediately issue another 'started'
// event, otherwise the "check" is complete and return to the main monitor loop.
if (isAwaitable) {
monitor.emit(
Server.SERVER_HEARTBEAT_STARTED,
new ServerHeartbeatStartedEvent(monitor.address, true)
);
start = now();
} else {
monitor.rttPinger?.close();
monitor.rttPinger = undefined;

callback(undefined, hello);
}
}

const { connection } = monitor;
if (connection && !connection.closed) {
const { serverApi, helloOk } = connection;
const connectTimeoutMS = monitor.options.connectTimeoutMS;
Expand Down Expand Up @@ -299,51 +326,29 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
);
}

connection.command(ns('admin.$cmd'), cmd, options, (err, hello) => {
if (err) {
return failureHandler(err, isAwaitable);
}

if (!('isWritablePrimary' in hello)) {
// Provide hello-style response document.
hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND];
}

const duration =
isAwaitable && monitor.rttPinger
? monitor.rttPinger.roundTripTime
: calculateDurationInMs(start);

monitor.emit(
Server.SERVER_HEARTBEAT_SUCCEEDED,
new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, isAwaitable)
);
if (isAwaitable) {
awaited = true;
return connection.exhaustCommand(ns('admin.$cmd'), cmd, options, (error, hello) => {
if (error) return onHeartbeatFailed(error);
return onHeartbeatSucceeded(hello);
});
}

// If we are using the streaming protocol then we immediately issue another 'started'
// event, otherwise the "check" is complete and return to the main monitor loop.
if (isAwaitable) {
monitor.emit(
Server.SERVER_HEARTBEAT_STARTED,
new ServerHeartbeatStartedEvent(monitor.address, true)
);
start = now();
} else {
monitor.rttPinger?.close();
monitor.rttPinger = undefined;

callback(undefined, hello);
}
});
awaited = false;
connection
.commandAsync(ns('admin.$cmd'), cmd, options)
.then(onHeartbeatSucceeded, onHeartbeatFailed);

return;
}

// connecting does an implicit `hello`
connect(monitor.connectOptions, (err, conn) => {
if (err) {
monitor[kConnection] = undefined;
monitor.connection = null;

failureHandler(err, false);
awaited = false;
onHeartbeatFailed(err);
return;
}

Expand All @@ -357,7 +362,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
return;
}

monitor[kConnection] = conn;
monitor.connection = conn;
monitor.emit(
Server.SERVER_HEARTBEAT_SUCCEEDED,
new ServerHeartbeatSucceededEvent(
Expand Down

0 comments on commit 9126b3e

Please sign in to comment.