Skip to content

Commit

Permalink
ConnectionManager.ping(): rewrite to fix pings stacking if none ever …
Browse files Browse the repository at this point in the history
…succeed

previously if the transport keeps becoming active but then gets
disconnected before the ping succeeds, listeners would just keep
stacking up

the whole previous implementation is way over-complex now we no longer
do transport upgrades. replaced with a much simpler one.
  • Loading branch information
SimonWoolf committed Jun 17, 2024
1 parent efb71d0 commit 7d20530
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 57 deletions.
4 changes: 1 addition & 3 deletions src/common/lib/client/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ class Connection extends EventEmitter {

async ping(): Promise<number> {
Logger.logAction(this.logger, Logger.LOG_MINOR, 'Connection.ping()', '');
return new Promise((resolve, reject) => {
this.connectionManager.ping(null, (err: unknown, result: number) => (err ? reject(err) : resolve(result)));
});
return this.connectionManager.ping();
}

close(): void {
Expand Down
77 changes: 23 additions & 54 deletions src/common/lib/transport/connectionmanager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1924,66 +1924,35 @@ class ConnectionManager extends EventEmitter {
await this.realtime.channels.processChannelMessage(message);
}

ping(transport: Transport | null, callback: Function): void {
/* if transport is specified, try that */
if (transport) {
Logger.logAction(this.logger, Logger.LOG_MINOR, 'ConnectionManager.ping()', 'transport = ' + transport);

const onTimeout = function () {
transport.off('heartbeat', onHeartbeat);
callback(new ErrorInfo('Timeout waiting for heartbeat response', 50000, 500));
};

const pingStart = Date.now(),
id = Utils.cheapRandStr();

const onHeartbeat = function (responseId: string) {
if (responseId === id) {
transport.off('heartbeat', onHeartbeat);
clearTimeout(timer);
const responseTime = Date.now() - pingStart;
callback(null, responseTime);
}
};

const timer = setTimeout(onTimeout, this.options.timeouts.realtimeRequestTimeout);

transport.on('heartbeat', onHeartbeat);
transport.ping(id);
return;
}

/* if we're not connected, don't attempt */
async ping(): Promise<number> {
if (this.state.state !== 'connected') {
callback(new ErrorInfo('Unable to ping service; not connected', 40000, 400));
return;
throw new ErrorInfo('Unable to ping service; not connected', 40000, 400);
}

/* no transport was specified, so use the current (connected) one
* but ensure that we retry if the transport is superseded before we complete */
let completed = false;
const transport = this.activeProtocol?.getTransport();
if (!transport) {
throw this.getStateError();
}

const onPingComplete = (err: Error, responseTime: number) => {
this.off('transport.active', onTransportActive);
if (!completed) {
completed = true;
callback(err, responseTime);
}
};
Logger.logAction(this.logger, Logger.LOG_MINOR, 'ConnectionManager.ping()', 'transport = ' + transport);

const onTransportActive = () => {
if (!completed) {
/* ensure that no callback happens for the currently outstanding operation */
completed = true;
/* repeat but picking up the new transport */
Platform.Config.nextTick(() => {
this.ping(null, callback);
});
}
};
const pingStart = Date.now();
const id = Utils.cheapRandStr();

this.on('transport.active', onTransportActive);
this.ping((this.activeProtocol as Protocol).getTransport(), onPingComplete);
return Utils.withTimeoutAsync<number>(
new Promise((resolve) => {
const onHeartbeat = (responseId: string) => {
if (responseId === id) {
transport.off('heartbeat', onHeartbeat);
resolve(Date.now() - pingStart);
}
};
transport.on('heartbeat', onHeartbeat);
transport.ping(id);
}),
this.options.timeouts.realtimeRequestTimeout,
'Timeout waiting for heartbeat response',
);
}

abort(error: ErrorInfo): void {
Expand Down
5 changes: 5 additions & 0 deletions src/common/lib/util/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -462,3 +462,8 @@ export function createMissingPluginError(pluginName: keyof ModularPlugins): Erro
export function throwMissingPluginError(pluginName: keyof ModularPlugins): never {
throw createMissingPluginError(pluginName);
}

export async function withTimeoutAsync<A>(promise: Promise<A>, timeout = 5000, err = 'Timeout expired'): Promise<A> {
const e = new ErrorInfo(err, 50000, 500);
return Promise.race([promise, new Promise<A>((_resolve, reject) => setTimeout(() => reject(e), timeout))]);
}

0 comments on commit 7d20530

Please sign in to comment.