Skip to content

Commit

Permalink
refactor(sync): cache steps in sync service
Browse files Browse the repository at this point in the history
Store the steps that need to be send
where we also do the debouncing.

They will be updated whenever there is a new message from y-websocket.

Signed-off-by: Max <max@nextcloud.com>
  • Loading branch information
max-nextcloud committed Nov 25, 2024
1 parent 3bc7015 commit 439590b
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 181 deletions.
4 changes: 1 addition & 3 deletions src/components/Editor.vue
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,6 @@ export default {
},
created() {
this.$ydoc = new Doc()
this.$queue = []
// The following can be useful for debugging ydoc updates
// this.$ydoc.on('update', function(update, origin, doc, tr) {
// console.debug('ydoc update', update, origin, doc, tr)
Expand Down Expand Up @@ -398,7 +397,6 @@ export default {
ydoc: this.$ydoc,
syncService: this.$syncService,
fileId: this.fileId,
queue: this.$queue,
initialSession: this.initialSession,
disableBC: true,
})
Expand Down Expand Up @@ -698,7 +696,7 @@ export default {
},

async close() {
await this.$syncService.sendRemainingSteps(this.$queue)
await this.$syncService.sendRemainingSteps()
await this.disconnect()
if (this.$editor) {
try {
Expand Down
21 changes: 0 additions & 21 deletions src/helpers/yjs.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,27 +88,6 @@ export function applyStep(ydoc, step, origin = 'origin') {
)
}

/**
* Get the steps for sending to the server
*
* @param {object[]} queue - queue for the outgoing steps
*/
export function getSteps(queue) {
return queue.map(s => encodeArrayBuffer(s))
.filter(s => s < 'AQ')
.slice(-1)
}

/**
* Encode the latest awareness message for sending
*
* @param {object[]} queue - queue for the outgoing steps
*/
export function getAwareness(queue) {
return queue.map(s => encodeArrayBuffer(s))
.findLast(s => s > 'AQ') || ''
}

/**
* Log y.js messages with their type and initiator call stack
*
Expand Down
68 changes: 68 additions & 0 deletions src/services/Outbox.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
* SPDX-License-Identifier: AGPL-3.0-or-later
*/

import { encodeArrayBuffer } from '../helpers/base64.js'
import { logger } from '../helpers/logger.js'

type Sendable = {
steps: string[], awareness: string
}

export default class Outbox {
#awarenessUpdate = ''
#syncUpdate = ''
#syncQuery = ''

storeStep(step: Uint8Array) {
const encoded = encodeArrayBuffer(step)
if (encoded < 'AAA') {
logger.warn('Unexpected step type:', { step, encoded })
return
}
if (encoded < 'AAE') {
this.#syncQuery = encoded
return
}
if (encoded < 'AQ') {
this.#syncUpdate = encoded
return
}
if (encoded < 'Ag') {
this.#awarenessUpdate = encoded
return
}
logger.warn('Unexpected step type:', { step, encoded })
}

getDataToSend(): Sendable {
return {
steps: [this.#syncUpdate, this.#syncQuery].filter(s => s),
awareness: this.#awarenessUpdate,
}
}

get hasUpdate(): boolean {
return !!this.#syncUpdate
}

/*
* Clear data that has just been
*
* Only clear data that has not changed in the meantime.
* @param {Sendable} - data that was to the server
*/
clearSentData({ steps, awareness }: Sendable) {
if (steps.includes(this.#syncUpdate)) {
this.#syncUpdate = ''
}
if (steps.includes(this.#syncQuery)) {
this.#syncQuery = ''
}
if (this.#awarenessUpdate === awareness) {
this.#awarenessUpdate = ''
}
}

}
52 changes: 23 additions & 29 deletions src/services/SyncService.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import mitt from 'mitt'
import debounce from 'debounce'

import PollingBackend from './PollingBackend.js'
import Outbox from './Outbox.ts'
import SessionApi, { Connection } from './SessionApi.js'
import { getSteps, getAwareness, documentStateToStep } from '../helpers/yjs.js'
import { documentStateToStep } from '../helpers/yjs.js'
import { logger } from '../helpers/logger.js'

/**
Expand Down Expand Up @@ -56,6 +57,7 @@ class SyncService {

#sendIntervalId
#connection
#outbox = new Outbox()

constructor({ baseVersionEtag, serialize, getDocumentState, ...options }) {
/** @type {import('mitt').Emitter<import('./SyncService').EventTypes>} _bus */
Expand Down Expand Up @@ -152,30 +154,34 @@ class SyncService {
})
}

sendSteps(getSendable) {
sendStep(step) {
this.#outbox.storeStep(step)
this.sendSteps()
}

sendSteps() {
// If already waiting to send, do nothing.
if (this.#sendIntervalId) {
return
}
return new Promise((resolve, reject) => {
this.#sendIntervalId = setInterval(() => {
if (this.#connection && !this.sending) {
this.sendStepsNow(getSendable).then(resolve).catch(reject)
}
}, 200)
})
this.#sendIntervalId = setInterval(() => {
if (this.#connection && !this.sending) {
this.sendStepsNow()
}
}, 200)
}

sendStepsNow(getSendable) {
sendStepsNow() {
this.sending = true
clearInterval(this.#sendIntervalId)
this.#sendIntervalId = null
const sendable = getSendable()
const sendable = this.#outbox.getDataToSend()
if (sendable.steps.length > 0) {
this.emit('stateChange', { dirty: true })
}
return this.#connection.push(sendable)
return this.#connection.push({ ...sendable, version: this.version })
.then((response) => {
this.#outbox.clearSentData(sendable)
const { steps, documentState } = response.data
if (documentState) {
const documentStateStep = documentStateToStep(documentState)
Expand All @@ -194,6 +200,7 @@ class SyncService {
const { response, code } = err
this.sending = false
this.pushError++
logger.error('Failed to push the steps to the server', err)
if (!response || code === 'ECONNABORTED') {
this.emit('error', { type: ERROR_TYPE.CONNECTION_FAILED, data: {} })
}
Expand Down Expand Up @@ -298,25 +305,12 @@ class SyncService {
})
}

async sendRemainingSteps(queue) {
if (queue.length === 0) {
sendRemainingSteps() {
if (!this.outbox.hasUpdate()) {
return
}
let outbox = []
const steps = getSteps(queue)
const awareness = getAwareness(queue)
return this.sendStepsNow(() => {
const data = { steps, awareness, version: this.version }
outbox = [...queue]
logger.debug('sending final steps ', data)
return data
})?.then(() => {
// only keep the steps that were not send yet
queue.splice(0,
queue.length,
...queue.filter(s => !outbox.includes(s)),
)
}, err => logger.error(err))
logger.debug('sending final steps')
return this.sendStepsNow().catch(err => logger.error(err))
}

async close() {
Expand Down
51 changes: 5 additions & 46 deletions src/services/WebSocketPolyfill.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,18 @@

import { logger } from '../helpers/logger.js'
import { decodeArrayBuffer } from '../helpers/base64.ts'
import { getSteps, getAwareness } from '../helpers/yjs.js'
import getNotifyBus from './NotifyService.js'

/**
*
* @param {object} syncService - the sync service to build upon
* @param {number} fileId - id of the file to open
* @param {object} initialSession - initial session to open
* @param {object[]} queue - queue for the outgoing steps
*/
export default function initWebSocketPolyfill(syncService, fileId, initialSession, queue) {
export default function initWebSocketPolyfill(syncService, fileId, initialSession) {
return class WebSocketPolyfill {

#url
#session
#version
binaryType
onmessage
onerror
Expand All @@ -35,34 +31,19 @@ export default function initWebSocketPolyfill(syncService, fileId, initialSessio
this.url = url
logger.debug('WebSocketPolyfill#constructor', { url, fileId, initialSession })
this.#registerHandlers({
opened: ({ version, session }) => {
logger.debug('opened ', { version, session })
this.#version = version
this.#session = session
},
loaded: ({ version, session, content }) => {
logger.debug('loaded ', { version, session })
this.#version = version
this.#session = session
},
sync: ({ steps, version }) => {
logger.debug('synced ', { version, steps })
this.#version = version
if (steps) {
steps.forEach(s => {
const data = decodeArrayBuffer(s.step)
this.onmessage({ data })
})
logger.debug('synced ', { version, steps })
}
},
})

syncService.open({ fileId, initialSession }).then((data) => {
if (syncService.hasActiveConnection) {
const { version, session } = data
this.#version = version
this.#session = session

this.onopen?.()
}
})
Expand All @@ -74,32 +55,10 @@ export default function initWebSocketPolyfill(syncService, fileId, initialSessio
.forEach(([key, value]) => syncService.on(key, value))
}

send(...data) {
send(step) {
// Useful for debugging what steps are sent and how they were initiated
// data.forEach(logStep)

queue.push(...data)
let outbox = []
return syncService.sendSteps(() => {
const data = {
steps: getSteps(queue),
awareness: getAwareness(queue),
version: this.#version,
}
outbox = [...queue]
logger.debug('sending steps ', data)
return data
})?.then(ret => {
// only keep the steps that were not send yet
queue.splice(0,
queue.length,
...queue.filter(s => !outbox.includes(s)),
)
return ret
}, err => {
logger.error(`Failed to push the queue with ${queue.length} steps to the server`, err)
this.onerror?.(err)
})
// logStep(step)
syncService.sendStep(step)
}

async close() {
Expand Down
Loading

0 comments on commit 439590b

Please sign in to comment.