From 439590bbe8130135ab2c404d75eb1985776b22a0 Mon Sep 17 00:00:00 2001 From: Max Date: Fri, 22 Nov 2024 19:55:49 +0100 Subject: [PATCH] refactor(sync): cache steps in sync service Store the steps that need to be send where we also do the debouncing. They will be updated whenever there is a new message from y-websocket. Signed-off-by: Max --- src/components/Editor.vue | 4 +- src/helpers/yjs.js | 21 ----- src/services/Outbox.ts | 68 ++++++++++++++++ src/services/SyncService.js | 52 ++++++------ src/services/WebSocketPolyfill.js | 51 ++---------- src/tests/services/WebsocketPolyfill.spec.js | 86 +------------------- 6 files changed, 101 insertions(+), 181 deletions(-) create mode 100644 src/services/Outbox.ts diff --git a/src/components/Editor.vue b/src/components/Editor.vue index aeca240e912..3546ecf5e24 100644 --- a/src/components/Editor.vue +++ b/src/components/Editor.vue @@ -347,7 +347,6 @@ export default { }, created() { this.$ydoc = new Doc() - this.$queue = [] // The following can be useful for debugging ydoc updates // this.$ydoc.on('update', function(update, origin, doc, tr) { // console.debug('ydoc update', update, origin, doc, tr) @@ -398,7 +397,6 @@ export default { ydoc: this.$ydoc, syncService: this.$syncService, fileId: this.fileId, - queue: this.$queue, initialSession: this.initialSession, disableBC: true, }) @@ -698,7 +696,7 @@ export default { }, async close() { - await this.$syncService.sendRemainingSteps(this.$queue) + await this.$syncService.sendRemainingSteps() await this.disconnect() if (this.$editor) { try { diff --git a/src/helpers/yjs.js b/src/helpers/yjs.js index fbc58796e86..6d55d294f2a 100644 --- a/src/helpers/yjs.js +++ b/src/helpers/yjs.js @@ -88,27 +88,6 @@ export function applyStep(ydoc, step, origin = 'origin') { ) } -/** - * Get the steps for sending to the server - * - * @param {object[]} queue - queue for the outgoing steps - */ -export function getSteps(queue) { - return queue.map(s => encodeArrayBuffer(s)) - .filter(s => s < 'AQ') - .slice(-1) -} - -/** - * Encode the latest awareness message for sending - * - * @param {object[]} queue - queue for the outgoing steps - */ -export function getAwareness(queue) { - return queue.map(s => encodeArrayBuffer(s)) - .findLast(s => s > 'AQ') || '' -} - /** * Log y.js messages with their type and initiator call stack * diff --git a/src/services/Outbox.ts b/src/services/Outbox.ts new file mode 100644 index 00000000000..6c27a04872f --- /dev/null +++ b/src/services/Outbox.ts @@ -0,0 +1,68 @@ +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +import { encodeArrayBuffer } from '../helpers/base64.js' +import { logger } from '../helpers/logger.js' + +type Sendable = { + steps: string[], awareness: string +} + +export default class Outbox { + #awarenessUpdate = '' + #syncUpdate = '' + #syncQuery = '' + + storeStep(step: Uint8Array) { + const encoded = encodeArrayBuffer(step) + if (encoded < 'AAA') { + logger.warn('Unexpected step type:', { step, encoded }) + return + } + if (encoded < 'AAE') { + this.#syncQuery = encoded + return + } + if (encoded < 'AQ') { + this.#syncUpdate = encoded + return + } + if (encoded < 'Ag') { + this.#awarenessUpdate = encoded + return + } + logger.warn('Unexpected step type:', { step, encoded }) + } + + getDataToSend(): Sendable { + return { + steps: [this.#syncUpdate, this.#syncQuery].filter(s => s), + awareness: this.#awarenessUpdate, + } + } + + get hasUpdate(): boolean { + return !!this.#syncUpdate + } + + /* + * Clear data that has just been + * + * Only clear data that has not changed in the meantime. + * @param {Sendable} - data that was to the server + */ + clearSentData({ steps, awareness }: Sendable) { + if (steps.includes(this.#syncUpdate)) { + this.#syncUpdate = '' + } + if (steps.includes(this.#syncQuery)) { + this.#syncQuery = '' + } + if (this.#awarenessUpdate === awareness) { + this.#awarenessUpdate = '' + } + } + +} diff --git a/src/services/SyncService.js b/src/services/SyncService.js index e31e51804e8..f424767bebb 100644 --- a/src/services/SyncService.js +++ b/src/services/SyncService.js @@ -9,8 +9,9 @@ import mitt from 'mitt' import debounce from 'debounce' import PollingBackend from './PollingBackend.js' +import Outbox from './Outbox.ts' import SessionApi, { Connection } from './SessionApi.js' -import { getSteps, getAwareness, documentStateToStep } from '../helpers/yjs.js' +import { documentStateToStep } from '../helpers/yjs.js' import { logger } from '../helpers/logger.js' /** @@ -56,6 +57,7 @@ class SyncService { #sendIntervalId #connection + #outbox = new Outbox() constructor({ baseVersionEtag, serialize, getDocumentState, ...options }) { /** @type {import('mitt').Emitter} _bus */ @@ -152,30 +154,34 @@ class SyncService { }) } - sendSteps(getSendable) { + sendStep(step) { + this.#outbox.storeStep(step) + this.sendSteps() + } + + sendSteps() { // If already waiting to send, do nothing. if (this.#sendIntervalId) { return } - return new Promise((resolve, reject) => { - this.#sendIntervalId = setInterval(() => { - if (this.#connection && !this.sending) { - this.sendStepsNow(getSendable).then(resolve).catch(reject) - } - }, 200) - }) + this.#sendIntervalId = setInterval(() => { + if (this.#connection && !this.sending) { + this.sendStepsNow() + } + }, 200) } - sendStepsNow(getSendable) { + sendStepsNow() { this.sending = true clearInterval(this.#sendIntervalId) this.#sendIntervalId = null - const sendable = getSendable() + const sendable = this.#outbox.getDataToSend() if (sendable.steps.length > 0) { this.emit('stateChange', { dirty: true }) } - return this.#connection.push(sendable) + return this.#connection.push({ ...sendable, version: this.version }) .then((response) => { + this.#outbox.clearSentData(sendable) const { steps, documentState } = response.data if (documentState) { const documentStateStep = documentStateToStep(documentState) @@ -194,6 +200,7 @@ class SyncService { const { response, code } = err this.sending = false this.pushError++ + logger.error('Failed to push the steps to the server', err) if (!response || code === 'ECONNABORTED') { this.emit('error', { type: ERROR_TYPE.CONNECTION_FAILED, data: {} }) } @@ -298,25 +305,12 @@ class SyncService { }) } - async sendRemainingSteps(queue) { - if (queue.length === 0) { + sendRemainingSteps() { + if (!this.outbox.hasUpdate()) { return } - let outbox = [] - const steps = getSteps(queue) - const awareness = getAwareness(queue) - return this.sendStepsNow(() => { - const data = { steps, awareness, version: this.version } - outbox = [...queue] - logger.debug('sending final steps ', data) - return data - })?.then(() => { - // only keep the steps that were not send yet - queue.splice(0, - queue.length, - ...queue.filter(s => !outbox.includes(s)), - ) - }, err => logger.error(err)) + logger.debug('sending final steps') + return this.sendStepsNow().catch(err => logger.error(err)) } async close() { diff --git a/src/services/WebSocketPolyfill.js b/src/services/WebSocketPolyfill.js index 0fb7b5477e5..e9a885566dd 100644 --- a/src/services/WebSocketPolyfill.js +++ b/src/services/WebSocketPolyfill.js @@ -5,7 +5,6 @@ import { logger } from '../helpers/logger.js' import { decodeArrayBuffer } from '../helpers/base64.ts' -import { getSteps, getAwareness } from '../helpers/yjs.js' import getNotifyBus from './NotifyService.js' /** @@ -13,14 +12,11 @@ import getNotifyBus from './NotifyService.js' * @param {object} syncService - the sync service to build upon * @param {number} fileId - id of the file to open * @param {object} initialSession - initial session to open - * @param {object[]} queue - queue for the outgoing steps */ -export default function initWebSocketPolyfill(syncService, fileId, initialSession, queue) { +export default function initWebSocketPolyfill(syncService, fileId, initialSession) { return class WebSocketPolyfill { #url - #session - #version binaryType onmessage onerror @@ -35,34 +31,19 @@ export default function initWebSocketPolyfill(syncService, fileId, initialSessio this.url = url logger.debug('WebSocketPolyfill#constructor', { url, fileId, initialSession }) this.#registerHandlers({ - opened: ({ version, session }) => { - logger.debug('opened ', { version, session }) - this.#version = version - this.#session = session - }, - loaded: ({ version, session, content }) => { - logger.debug('loaded ', { version, session }) - this.#version = version - this.#session = session - }, sync: ({ steps, version }) => { - logger.debug('synced ', { version, steps }) - this.#version = version if (steps) { steps.forEach(s => { const data = decodeArrayBuffer(s.step) this.onmessage({ data }) }) + logger.debug('synced ', { version, steps }) } }, }) syncService.open({ fileId, initialSession }).then((data) => { if (syncService.hasActiveConnection) { - const { version, session } = data - this.#version = version - this.#session = session - this.onopen?.() } }) @@ -74,32 +55,10 @@ export default function initWebSocketPolyfill(syncService, fileId, initialSessio .forEach(([key, value]) => syncService.on(key, value)) } - send(...data) { + send(step) { // Useful for debugging what steps are sent and how they were initiated - // data.forEach(logStep) - - queue.push(...data) - let outbox = [] - return syncService.sendSteps(() => { - const data = { - steps: getSteps(queue), - awareness: getAwareness(queue), - version: this.#version, - } - outbox = [...queue] - logger.debug('sending steps ', data) - return data - })?.then(ret => { - // only keep the steps that were not send yet - queue.splice(0, - queue.length, - ...queue.filter(s => !outbox.includes(s)), - ) - return ret - }, err => { - logger.error(`Failed to push the queue with ${queue.length} steps to the server`, err) - this.onerror?.(err) - }) + // logStep(step) + syncService.sendStep(step) } async close() { diff --git a/src/tests/services/WebsocketPolyfill.spec.js b/src/tests/services/WebsocketPolyfill.spec.js index ca85236c3a7..b7dca0be6ee 100644 --- a/src/tests/services/WebsocketPolyfill.spec.js +++ b/src/tests/services/WebsocketPolyfill.spec.js @@ -4,9 +4,13 @@ */ import { describe, it, vi, expect } from 'vitest' +import { encodeArrayBuffer as encode } from '../../helpers/base64.ts' import initWebSocketPolyfill from '../../services/WebSocketPolyfill.js' describe('Init function', () => { + const syncQuery = new Uint8Array([0, 0, 1, 0]) + const syncUpdate = new Uint8Array([0, 2, 1, 0]) + const awarenessUpdate = new Uint8Array([1, 2, 1, 0]) const mockSyncService = (mocked = {}) => { return { on: vi.fn(), @@ -40,86 +44,4 @@ describe('Init function', () => { expect(syncService.open).toHaveBeenCalledWith({ fileId, initialSession }) }) - it('sends steps to sync service', async () => { - const syncService = mockSyncService({ - sendSteps: async getData => getData(), - }) - const queue = ['initial'] - const data = { dummy: 'data' } - const Polyfill = initWebSocketPolyfill(syncService, null, null, queue) - const websocket = new Polyfill('url') - const result = websocket.send(data) - expect(result).toBeInstanceOf(Promise) - expect(queue).toEqual(['initial', data]) - const dataSendOut = await result - expect(queue).toEqual([]) - expect(dataSendOut).toHaveProperty('awareness') - expect(dataSendOut).toHaveProperty('steps') - expect(dataSendOut).toHaveProperty('version') - }) - - it('handles early reject', async () => { - vi.spyOn(console, 'error').mockImplementation(() => {}) - const syncService = mockSyncService({ - sendSteps: vi.fn().mockRejectedValue('error before reading steps in sync service'), - }) - syncService.open.mockImplementation(async () => ({})) - const queue = ['initial'] - const data = { dummy: 'data' } - const Polyfill = initWebSocketPolyfill(syncService, null, null, queue) - const websocket = new Polyfill('url') - const result = websocket.send(data) - expect(queue).toEqual(['initial', data]) - expect(result).toBeInstanceOf(Promise) - const returned = await result - expect(returned).toBeUndefined() - expect(queue).toEqual(['initial', data]) - }) - - it('handles reject after reading data', async () => { - vi.spyOn(console, 'error').mockImplementation(() => {}) - const syncService = mockSyncService({ - sendSteps: vi.fn().mockImplementation(async getData => { - getData() - throw new Error('error when sending in sync service') - }), - }) - const queue = ['initial'] - const data = { dummy: 'data' } - const Polyfill = initWebSocketPolyfill(syncService, null, null, queue) - const websocket = new Polyfill('url') - const result = websocket.send(data) - expect(queue).toEqual(['initial', data]) - expect(result).toBeInstanceOf(Promise) - const returned = await result - expect(returned).toBeUndefined() - expect(queue).toEqual(['initial', data]) - }) - - it('queue survives a close', async () => { - vi.spyOn(console, 'error').mockImplementation(() => {}) - const syncService = mockSyncService({ - sendSteps: vi.fn().mockImplementation(async getData => { - getData() - throw new Error('error when sending in sync service') - }), - sendStepsNow: vi.fn().mockImplementation(async getData => { - getData() - throw new Error('sendStepsNow error when sending') - }), - off: vi.fn(), - close: vi.fn(async data => data), - }) - const queue = ['initial'] - const data = { dummy: 'data' } - const Polyfill = initWebSocketPolyfill(syncService, null, null, queue) - const websocket = new Polyfill('url') - websocket.onclose = vi.fn() - await websocket.send(data) - const promise = websocket.close() - expect(queue).toEqual(['initial', data]) - await promise - expect(queue).toEqual(['initial', data]) - }) - })