Skip to content

Commit

Permalink
Merge pull request #1793 from ably/ping
Browse files Browse the repository at this point in the history
ConnectionManager.ping(): rewrite to fix pings stacking if none ever succeed
  • Loading branch information
SimonWoolf authored Jun 21, 2024
2 parents 2b24d21 + 7d20530 commit 5de39d2
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 57 deletions.
4 changes: 1 addition & 3 deletions src/common/lib/client/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ class Connection extends EventEmitter {

async ping(): Promise<number> {
Logger.logAction(this.logger, Logger.LOG_MINOR, 'Connection.ping()', '');
return new Promise((resolve, reject) => {
this.connectionManager.ping(null, (err: unknown, result: number) => (err ? reject(err) : resolve(result)));
});
return this.connectionManager.ping();
}

close(): void {
Expand Down
77 changes: 23 additions & 54 deletions src/common/lib/transport/connectionmanager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1924,66 +1924,35 @@ class ConnectionManager extends EventEmitter {
await this.realtime.channels.processChannelMessage(message);
}

ping(transport: Transport | null, callback: Function): void {
/* if transport is specified, try that */
if (transport) {
Logger.logAction(this.logger, Logger.LOG_MINOR, 'ConnectionManager.ping()', 'transport = ' + transport);

const onTimeout = function () {
transport.off('heartbeat', onHeartbeat);
callback(new ErrorInfo('Timeout waiting for heartbeat response', 50000, 500));
};

const pingStart = Date.now(),
id = Utils.cheapRandStr();

const onHeartbeat = function (responseId: string) {
if (responseId === id) {
transport.off('heartbeat', onHeartbeat);
clearTimeout(timer);
const responseTime = Date.now() - pingStart;
callback(null, responseTime);
}
};

const timer = setTimeout(onTimeout, this.options.timeouts.realtimeRequestTimeout);

transport.on('heartbeat', onHeartbeat);
transport.ping(id);
return;
}

/* if we're not connected, don't attempt */
async ping(): Promise<number> {
if (this.state.state !== 'connected') {
callback(new ErrorInfo('Unable to ping service; not connected', 40000, 400));
return;
throw new ErrorInfo('Unable to ping service; not connected', 40000, 400);
}

/* no transport was specified, so use the current (connected) one
* but ensure that we retry if the transport is superseded before we complete */
let completed = false;
const transport = this.activeProtocol?.getTransport();
if (!transport) {
throw this.getStateError();
}

const onPingComplete = (err: Error, responseTime: number) => {
this.off('transport.active', onTransportActive);
if (!completed) {
completed = true;
callback(err, responseTime);
}
};
Logger.logAction(this.logger, Logger.LOG_MINOR, 'ConnectionManager.ping()', 'transport = ' + transport);

const onTransportActive = () => {
if (!completed) {
/* ensure that no callback happens for the currently outstanding operation */
completed = true;
/* repeat but picking up the new transport */
Platform.Config.nextTick(() => {
this.ping(null, callback);
});
}
};
const pingStart = Date.now();
const id = Utils.cheapRandStr();

this.on('transport.active', onTransportActive);
this.ping((this.activeProtocol as Protocol).getTransport(), onPingComplete);
return Utils.withTimeoutAsync<number>(
new Promise((resolve) => {
const onHeartbeat = (responseId: string) => {
if (responseId === id) {
transport.off('heartbeat', onHeartbeat);
resolve(Date.now() - pingStart);
}
};
transport.on('heartbeat', onHeartbeat);
transport.ping(id);
}),
this.options.timeouts.realtimeRequestTimeout,
'Timeout waiting for heartbeat response',
);
}

abort(error: ErrorInfo): void {
Expand Down
5 changes: 5 additions & 0 deletions src/common/lib/util/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -462,3 +462,8 @@ export function createMissingPluginError(pluginName: keyof ModularPlugins): Erro
export function throwMissingPluginError(pluginName: keyof ModularPlugins): never {
throw createMissingPluginError(pluginName);
}

export async function withTimeoutAsync<A>(promise: Promise<A>, timeout = 5000, err = 'Timeout expired'): Promise<A> {
const e = new ErrorInfo(err, 50000, 500);
return Promise.race([promise, new Promise<A>((_resolve, reject) => setTimeout(() => reject(e), timeout))]);
}

0 comments on commit 5de39d2

Please sign in to comment.