From e97eeada30827a68147af1a683a1ee2b13eebf9b Mon Sep 17 00:00:00 2001 From: Max Date: Wed, 13 Nov 2024 11:31:02 +0100 Subject: [PATCH 01/16] fix(sync): inline y-websocket to track received updates there Signed-off-by: Max --- package-lock.json | 228 ++----------- package.json | 1 - src/helpers/yjs.js | 2 +- src/services/SyncServiceProvider.js | 2 +- src/services/y-websocket.js | 498 ++++++++++++++++++++++++++++ tsconfig.json | 3 +- 6 files changed, 524 insertions(+), 210 deletions(-) create mode 100644 src/services/y-websocket.js diff --git a/package-lock.json b/package-lock.json index b28fa7fd632..e1f294b1115 100644 --- a/package-lock.json +++ b/package-lock.json @@ -85,7 +85,6 @@ "vuex": "^3.6.2", "y-prosemirror": "^1.2.12", "y-protocols": "^1.0.6", - "y-websocket": "^2.0.4", "yjs": "^13.6.20" }, "devDependencies": { @@ -6981,22 +6980,6 @@ "integrity": "sha512-nne9/IiQ/hzIhY6pdDnbBtz7DjPTKrY00P/zvPSm5pOFkl6xuGrGnXn/VtTNNfNtAfZ9/1RtehkszU9qcTii0Q==", "dev": true }, - "node_modules/abstract-leveldown": { - "version": "6.2.3", - "resolved": "https://registry.npmjs.org/abstract-leveldown/-/abstract-leveldown-6.2.3.tgz", - "integrity": "sha512-BsLm5vFMRUrrLeCcRc+G0t2qOaTzpoJQLOubq2XM72eNpjF5UdU5o/5NvlNhx95XHcAvcl8OMXr4mlg/fRgUXQ==", - "optional": true, - "dependencies": { - "buffer": "^5.5.0", - "immediate": "^3.2.3", - "level-concat-iterator": "~2.0.0", - "level-supports": "~1.0.0", - "xtend": "~4.0.0" - }, - "engines": { - "node": ">=6" - } - }, "node_modules/acorn": { "version": "7.4.1", "resolved": "https://registry.npmjs.org/acorn/-/acorn-7.4.1.tgz", @@ -7333,12 +7316,6 @@ "integrity": "sha512-iAB+JbDEGXhyIUavoDl9WP/Jj106Kz9DEn1DPgYw5ruDn0e3Wgi3sKFm55sASdGBNOQB8F59d9qQ7deqrHA8wQ==", "dev": true }, - "node_modules/async-limiter": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.1.tgz", - "integrity": "sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ==", - "optional": true - }, "node_modules/asynckit": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", @@ -7625,7 +7602,7 @@ "version": "1.5.1", "resolved": "https://registry.npmjs.org/base64-js/-/base64-js-1.5.1.tgz", "integrity": "sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==", - "devOptional": true, + "dev": true, "funding": [ { "type": "github", @@ -7928,7 +7905,7 @@ "version": "5.7.1", "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz", "integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==", - "devOptional": true, + "dev": true, "funding": [ { "type": "github", @@ -9790,19 +9767,6 @@ "node": ">=0.8" } }, - "node_modules/deferred-leveldown": { - "version": "5.3.0", - "resolved": "https://registry.npmjs.org/deferred-leveldown/-/deferred-leveldown-5.3.0.tgz", - "integrity": "sha512-a59VOT+oDy7vtAbLRCZwWgxu2BaCfd5Hk7wxJd48ei7I+nsg8Orlb9CLG0PMZienk9BSUKgeAqkO2+Lw+1+Ukw==", - "optional": true, - "dependencies": { - "abstract-leveldown": "~6.2.1", - "inherits": "^2.0.3" - }, - "engines": { - "node": ">=6" - } - }, "node_modules/define-data-property": { "version": "1.1.4", "resolved": "https://registry.npmjs.org/define-data-property/-/define-data-property-1.1.4.tgz", @@ -10414,21 +10378,6 @@ "node": ">= 4" } }, - "node_modules/encoding-down": { - "version": "6.3.0", - "resolved": "https://registry.npmjs.org/encoding-down/-/encoding-down-6.3.0.tgz", - "integrity": "sha512-QKrV0iKR6MZVJV08QY0wp1e7vF6QbhnbQhb07bwpEyuz4uZiZgPlEGdkCROuFkUwdxlFaiPIhjyarH1ee/3vhw==", - "optional": true, - "dependencies": { - "abstract-leveldown": "^6.2.1", - "inherits": "^2.0.3", - "level-codec": "^9.0.0", - "level-errors": "^2.0.0" - }, - "engines": { - "node": ">=6" - } - }, "node_modules/end-of-stream": { "version": "1.4.4", "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.4.tgz", @@ -10486,18 +10435,6 @@ "node": ">=6" } }, - "node_modules/errno": { - "version": "0.1.8", - "resolved": "https://registry.npmjs.org/errno/-/errno-0.1.8.tgz", - "integrity": "sha512-dJ6oBr5SQ1VSd9qkk7ByRgb/1SH4JZjCHSW/mr63/QcXO9zLVxvJ6Oy13nio03rxpSnVDDjFor75SjVeZWPW/A==", - "optional": true, - "dependencies": { - "prr": "~1.0.1" - }, - "bin": { - "errno": "cli.js" - } - }, "node_modules/error-ex": { "version": "1.3.2", "resolved": "https://registry.npmjs.org/error-ex/-/error-ex-1.3.2.tgz", @@ -13098,7 +13035,7 @@ "version": "1.2.1", "resolved": "https://registry.npmjs.org/ieee754/-/ieee754-1.2.1.tgz", "integrity": "sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA==", - "devOptional": true, + "dev": true, "funding": [ { "type": "github", @@ -13123,12 +13060,6 @@ "node": ">= 4" } }, - "node_modules/immediate": { - "version": "3.3.0", - "resolved": "https://registry.npmjs.org/immediate/-/immediate-3.3.0.tgz", - "integrity": "sha512-HR7EVodfFUdQCTIeySw+WDRFJlPcLOJbXfwwZ7Oom6tjsvZ3bOkCDJHehQC3nxJrv7+f9XecwazynjU8e4Vw3Q==", - "optional": true - }, "node_modules/immutable": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/immutable/-/immutable-4.1.0.tgz", @@ -13223,7 +13154,7 @@ "version": "2.0.4", "resolved": "https://registry.npmjs.org/inherits/-/inherits-2.0.4.tgz", "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==", - "devOptional": true + "dev": true }, "node_modules/ini": { "version": "2.0.0", @@ -16452,7 +16383,9 @@ "node_modules/lodash.debounce": { "version": "4.0.8", "resolved": "https://registry.npmjs.org/lodash.debounce/-/lodash.debounce-4.0.8.tgz", - "integrity": "sha512-FT1yDzDYEoYWhnSGnpE/4Kj1fLZkDFyqRb7fNt6FdYOSxlUWAtp42Eh6Wb0rGIv/m9Bgo7x4GhQbm5Ys4SG5ow==" + "integrity": "sha512-FT1yDzDYEoYWhnSGnpE/4Kj1fLZkDFyqRb7fNt6FdYOSxlUWAtp42Eh6Wb0rGIv/m9Bgo7x4GhQbm5Ys4SG5ow==", + "dev": true, + "peer": true }, "node_modules/lodash.get": { "version": "4.4.2", @@ -16712,12 +16645,6 @@ "yallist": "^2.1.2" } }, - "node_modules/ltgt": { - "version": "2.2.1", - "resolved": "https://registry.npmjs.org/ltgt/-/ltgt-2.2.1.tgz", - "integrity": "sha512-AI2r85+4MquTw9ZYqabu4nMwy9Oftlfa/e/52t9IjtfG+mGBbTNdAoZ3RQKLHR6r0wQnwZnPIEh/Ya6XTWAKNA==", - "optional": true - }, "node_modules/lunr": { "version": "2.3.9", "resolved": "https://registry.npmjs.org/lunr/-/lunr-2.3.9.tgz", @@ -21386,12 +21313,6 @@ "node": "^10 || ^12 || ^13.7 || ^14 || >=15.0.1" } }, - "node_modules/napi-macros": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/napi-macros/-/napi-macros-2.0.0.tgz", - "integrity": "sha512-A0xLykHtARfueITVDernsAWdtIMbOJgKgcluwENp3AlsKN/PloyO10HtmoqnFAQAcxPkgZN7wdfPfEd0zNGxbg==", - "optional": true - }, "node_modules/natural-compare": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/natural-compare/-/natural-compare-1.4.0.tgz", @@ -22794,12 +22715,6 @@ "resolved": "https://registry.npmjs.org/proxy-polyfill/-/proxy-polyfill-0.3.2.tgz", "integrity": "sha512-ENKSXOMCewnQTOyqrQXxEjIhzT6dy572mtehiItbDoIUF5Sv5UkmRUc8kowg2MFvr232Uo8rwRpNg3V5kgTKbA==" }, - "node_modules/prr": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/prr/-/prr-1.0.1.tgz", - "integrity": "sha512-yPw4Sng1gWghHQWj0B3ZggWUm4qVbPwPFcRG8KyxiU7J2OHFSoEHKS+EZ3fv5l1t9CyCiop6l/ZYeWbrgoQejw==", - "optional": true - }, "node_modules/pseudomap": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/pseudomap/-/pseudomap-1.0.2.tgz", @@ -25030,7 +24945,7 @@ "version": "5.2.1", "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz", "integrity": "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==", - "devOptional": true, + "dev": true, "funding": [ { "type": "github", @@ -25648,7 +25563,7 @@ "version": "1.3.0", "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz", "integrity": "sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA==", - "devOptional": true, + "dev": true, "dependencies": { "safe-buffer": "~5.2.0" } @@ -27441,7 +27356,7 @@ "version": "1.0.2", "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==", - "devOptional": true + "dev": true }, "node_modules/uuid": { "version": "10.0.0", @@ -28913,28 +28828,11 @@ "version": "4.0.2", "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz", "integrity": "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==", - "devOptional": true, + "dev": true, "engines": { "node": ">=0.4" } }, - "node_modules/y-leveldb": { - "version": "0.1.2", - "resolved": "https://registry.npmjs.org/y-leveldb/-/y-leveldb-0.1.2.tgz", - "integrity": "sha512-6ulEn5AXfXJYi89rXPEg2mMHAyyw8+ZfeMMdOtBbV8FJpQ1NOrcgi6DTAcXof0dap84NjHPT2+9d0rb6cFsjEg==", - "optional": true, - "dependencies": { - "level": "^6.0.1", - "lib0": "^0.2.31" - }, - "funding": { - "type": "GitHub Sponsors ❤", - "url": "https://github.com/sponsors/dmonad" - }, - "peerDependencies": { - "yjs": "^13.0.0" - } - }, "node_modules/y-prosemirror": { "version": "1.2.12", "resolved": "https://registry.npmjs.org/y-prosemirror/-/y-prosemirror-1.2.12.tgz", @@ -33863,19 +33761,6 @@ "integrity": "sha512-nne9/IiQ/hzIhY6pdDnbBtz7DjPTKrY00P/zvPSm5pOFkl6xuGrGnXn/VtTNNfNtAfZ9/1RtehkszU9qcTii0Q==", "dev": true }, - "abstract-leveldown": { - "version": "6.2.3", - "resolved": "https://registry.npmjs.org/abstract-leveldown/-/abstract-leveldown-6.2.3.tgz", - "integrity": "sha512-BsLm5vFMRUrrLeCcRc+G0t2qOaTzpoJQLOubq2XM72eNpjF5UdU5o/5NvlNhx95XHcAvcl8OMXr4mlg/fRgUXQ==", - "optional": true, - "requires": { - "buffer": "^5.5.0", - "immediate": "^3.2.3", - "level-concat-iterator": "~2.0.0", - "level-supports": "~1.0.0", - "xtend": "~4.0.0" - } - }, "acorn": { "version": "7.4.1", "resolved": "https://registry.npmjs.org/acorn/-/acorn-7.4.1.tgz", @@ -34119,12 +34004,6 @@ "integrity": "sha512-iAB+JbDEGXhyIUavoDl9WP/Jj106Kz9DEn1DPgYw5ruDn0e3Wgi3sKFm55sASdGBNOQB8F59d9qQ7deqrHA8wQ==", "dev": true }, - "async-limiter": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.1.tgz", - "integrity": "sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ==", - "optional": true - }, "asynckit": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", @@ -34344,7 +34223,7 @@ "version": "1.5.1", "resolved": "https://registry.npmjs.org/base64-js/-/base64-js-1.5.1.tgz", "integrity": "sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==", - "devOptional": true + "dev": true }, "bcrypt-pbkdf": { "version": "1.0.2", @@ -34585,7 +34464,7 @@ "version": "5.7.1", "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz", "integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==", - "devOptional": true, + "dev": true, "requires": { "base64-js": "^1.3.1", "ieee754": "^1.1.13" @@ -35982,16 +35861,6 @@ } } }, - "deferred-leveldown": { - "version": "5.3.0", - "resolved": "https://registry.npmjs.org/deferred-leveldown/-/deferred-leveldown-5.3.0.tgz", - "integrity": "sha512-a59VOT+oDy7vtAbLRCZwWgxu2BaCfd5Hk7wxJd48ei7I+nsg8Orlb9CLG0PMZienk9BSUKgeAqkO2+Lw+1+Ukw==", - "optional": true, - "requires": { - "abstract-leveldown": "~6.2.1", - "inherits": "^2.0.3" - } - }, "define-data-property": { "version": "1.1.4", "resolved": "https://registry.npmjs.org/define-data-property/-/define-data-property-1.1.4.tgz", @@ -36457,18 +36326,6 @@ "integrity": "sha512-/kyM18EfinwXZbno9FyUGeFh87KC8HRQBQGildHZbEuRyWFOmv1U10o9BBp8XVZDVNNuQKyIGIu5ZYAAXJ0V2Q==", "dev": true }, - "encoding-down": { - "version": "6.3.0", - "resolved": "https://registry.npmjs.org/encoding-down/-/encoding-down-6.3.0.tgz", - "integrity": "sha512-QKrV0iKR6MZVJV08QY0wp1e7vF6QbhnbQhb07bwpEyuz4uZiZgPlEGdkCROuFkUwdxlFaiPIhjyarH1ee/3vhw==", - "optional": true, - "requires": { - "abstract-leveldown": "^6.2.1", - "inherits": "^2.0.3", - "level-codec": "^9.0.0", - "level-errors": "^2.0.0" - } - }, "end-of-stream": { "version": "1.4.4", "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.4.tgz", @@ -36510,15 +36367,6 @@ "dev": true, "peer": true }, - "errno": { - "version": "0.1.8", - "resolved": "https://registry.npmjs.org/errno/-/errno-0.1.8.tgz", - "integrity": "sha512-dJ6oBr5SQ1VSd9qkk7ByRgb/1SH4JZjCHSW/mr63/QcXO9zLVxvJ6Oy13nio03rxpSnVDDjFor75SjVeZWPW/A==", - "optional": true, - "requires": { - "prr": "~1.0.1" - } - }, "error-ex": { "version": "1.3.2", "resolved": "https://registry.npmjs.org/error-ex/-/error-ex-1.3.2.tgz", @@ -38433,7 +38281,7 @@ "version": "1.2.1", "resolved": "https://registry.npmjs.org/ieee754/-/ieee754-1.2.1.tgz", "integrity": "sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA==", - "devOptional": true + "dev": true }, "ignore": { "version": "5.3.1", @@ -38441,12 +38289,6 @@ "integrity": "sha512-5Fytz/IraMjqpwfd34ke28PTVMjZjJG2MPn5t7OE4eUCUNf8BAa7b5WUS9/Qvr6mwOQS7Mk6vdsMno5he+T8Xw==", "dev": true }, - "immediate": { - "version": "3.3.0", - "resolved": "https://registry.npmjs.org/immediate/-/immediate-3.3.0.tgz", - "integrity": "sha512-HR7EVodfFUdQCTIeySw+WDRFJlPcLOJbXfwwZ7Oom6tjsvZ3bOkCDJHehQC3nxJrv7+f9XecwazynjU8e4Vw3Q==", - "optional": true - }, "immutable": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/immutable/-/immutable-4.1.0.tgz", @@ -38516,7 +38358,7 @@ "version": "2.0.4", "resolved": "https://registry.npmjs.org/inherits/-/inherits-2.0.4.tgz", "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==", - "devOptional": true + "dev": true }, "ini": { "version": "2.0.0", @@ -40856,7 +40698,9 @@ "lodash.debounce": { "version": "4.0.8", "resolved": "https://registry.npmjs.org/lodash.debounce/-/lodash.debounce-4.0.8.tgz", - "integrity": "sha512-FT1yDzDYEoYWhnSGnpE/4Kj1fLZkDFyqRb7fNt6FdYOSxlUWAtp42Eh6Wb0rGIv/m9Bgo7x4GhQbm5Ys4SG5ow==" + "integrity": "sha512-FT1yDzDYEoYWhnSGnpE/4Kj1fLZkDFyqRb7fNt6FdYOSxlUWAtp42Eh6Wb0rGIv/m9Bgo7x4GhQbm5Ys4SG5ow==", + "dev": true, + "peer": true }, "lodash.get": { "version": "4.4.2", @@ -41059,12 +40903,6 @@ "yallist": "^2.1.2" } }, - "ltgt": { - "version": "2.2.1", - "resolved": "https://registry.npmjs.org/ltgt/-/ltgt-2.2.1.tgz", - "integrity": "sha512-AI2r85+4MquTw9ZYqabu4nMwy9Oftlfa/e/52t9IjtfG+mGBbTNdAoZ3RQKLHR6r0wQnwZnPIEh/Ya6XTWAKNA==", - "optional": true - }, "lunr": { "version": "2.3.9", "resolved": "https://registry.npmjs.org/lunr/-/lunr-2.3.9.tgz", @@ -43718,12 +43556,6 @@ "resolved": "https://registry.npmjs.org/nanoid/-/nanoid-3.3.7.tgz", "integrity": "sha512-eSRppjcPIatRIMC1U6UngP8XFcz8MQWGQdt1MTBQ7NaAmvXDfvNxbvWV3x2y6CdEUciCSsDHDQZbhYaB8QEo2g==" }, - "napi-macros": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/napi-macros/-/napi-macros-2.0.0.tgz", - "integrity": "sha512-A0xLykHtARfueITVDernsAWdtIMbOJgKgcluwENp3AlsKN/PloyO10HtmoqnFAQAcxPkgZN7wdfPfEd0zNGxbg==", - "optional": true - }, "natural-compare": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/natural-compare/-/natural-compare-1.4.0.tgz", @@ -44774,12 +44606,6 @@ "resolved": "https://registry.npmjs.org/proxy-polyfill/-/proxy-polyfill-0.3.2.tgz", "integrity": "sha512-ENKSXOMCewnQTOyqrQXxEjIhzT6dy572mtehiItbDoIUF5Sv5UkmRUc8kowg2MFvr232Uo8rwRpNg3V5kgTKbA==" }, - "prr": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/prr/-/prr-1.0.1.tgz", - "integrity": "sha512-yPw4Sng1gWghHQWj0B3ZggWUm4qVbPwPFcRG8KyxiU7J2OHFSoEHKS+EZ3fv5l1t9CyCiop6l/ZYeWbrgoQejw==", - "optional": true - }, "pseudomap": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/pseudomap/-/pseudomap-1.0.2.tgz", @@ -46219,7 +46045,7 @@ "version": "5.2.1", "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz", "integrity": "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==", - "devOptional": true + "dev": true }, "safer-buffer": { "version": "2.1.2", @@ -46706,7 +46532,7 @@ "version": "1.3.0", "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz", "integrity": "sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA==", - "devOptional": true, + "dev": true, "requires": { "safe-buffer": "~5.2.0" } @@ -47987,7 +47813,7 @@ "version": "1.0.2", "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==", - "devOptional": true + "dev": true }, "uuid": { "version": "10.0.0", @@ -48892,17 +48718,7 @@ "version": "4.0.2", "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz", "integrity": "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==", - "devOptional": true - }, - "y-leveldb": { - "version": "0.1.2", - "resolved": "https://registry.npmjs.org/y-leveldb/-/y-leveldb-0.1.2.tgz", - "integrity": "sha512-6ulEn5AXfXJYi89rXPEg2mMHAyyw8+ZfeMMdOtBbV8FJpQ1NOrcgi6DTAcXof0dap84NjHPT2+9d0rb6cFsjEg==", - "optional": true, - "requires": { - "level": "^6.0.1", - "lib0": "^0.2.31" - } + "dev": true }, "y-prosemirror": { "version": "1.2.12", diff --git a/package.json b/package.json index 26202a2f5fe..ec77f4c26ef 100644 --- a/package.json +++ b/package.json @@ -109,7 +109,6 @@ "vuex": "^3.6.2", "y-prosemirror": "^1.2.12", "y-protocols": "^1.0.6", - "y-websocket": "^2.0.4", "yjs": "^13.6.20" }, "engines": { diff --git a/src/helpers/yjs.js b/src/helpers/yjs.js index da202ca2d9c..e66f35bf060 100644 --- a/src/helpers/yjs.js +++ b/src/helpers/yjs.js @@ -8,7 +8,7 @@ import * as Y from 'yjs' import * as decoding from 'lib0/decoding.js' import * as encoding from 'lib0/encoding.js' import * as syncProtocol from 'y-protocols/sync' -import { messageSync } from 'y-websocket' +import { messageSync } from '../services/y-websocket.js' /** * Get Document state encode as base64. diff --git a/src/services/SyncServiceProvider.js b/src/services/SyncServiceProvider.js index 8572deafda7..e62ef4bcfc5 100644 --- a/src/services/SyncServiceProvider.js +++ b/src/services/SyncServiceProvider.js @@ -3,7 +3,7 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -import { WebsocketProvider } from 'y-websocket' +import { WebsocketProvider } from './y-websocket.js' import initWebSocketPolyfill from './WebSocketPolyfill.js' import { logger } from '../helpers/logger.js' diff --git a/src/services/y-websocket.js b/src/services/y-websocket.js new file mode 100644 index 00000000000..ecc78951cb2 --- /dev/null +++ b/src/services/y-websocket.js @@ -0,0 +1,498 @@ +/** + * @module provider/websocket + */ + +/* eslint-env browser */ + +import * as Y from 'yjs' // eslint-disable-line +import * as bc from 'lib0/broadcastchannel' +import * as time from 'lib0/time' +import * as encoding from 'lib0/encoding' +import * as decoding from 'lib0/decoding' +import * as syncProtocol from 'y-protocols/sync' +import * as authProtocol from 'y-protocols/auth' +import * as awarenessProtocol from 'y-protocols/awareness' +import { Observable } from 'lib0/observable' +import * as math from 'lib0/math' +import * as url from 'lib0/url' +import * as env from 'lib0/environment' + +export const messageSync = 0 +export const messageQueryAwareness = 3 +export const messageAwareness = 1 +export const messageAuth = 2 + +/** + * encoder, decoder, provider, emitSynced, messageType + * @type {Array} + */ +const messageHandlers = [] + +messageHandlers[messageSync] = ( + encoder, + decoder, + provider, + emitSynced, + _messageType +) => { + encoding.writeVarUint(encoder, messageSync) + const syncMessageType = syncProtocol.readSyncMessage( + decoder, + encoder, + provider.doc, + provider + ) + if ( + emitSynced && syncMessageType === syncProtocol.messageYjsSyncStep2 && + !provider.synced + ) { + provider.synced = true + } +} + +messageHandlers[messageQueryAwareness] = ( + encoder, + _decoder, + provider, + _emitSynced, + _messageType +) => { + encoding.writeVarUint(encoder, messageAwareness) + encoding.writeVarUint8Array( + encoder, + awarenessProtocol.encodeAwarenessUpdate( + provider.awareness, + Array.from(provider.awareness.getStates().keys()) + ) + ) +} + +messageHandlers[messageAwareness] = ( + _encoder, + decoder, + provider, + _emitSynced, + _messageType +) => { + awarenessProtocol.applyAwarenessUpdate( + provider.awareness, + decoding.readVarUint8Array(decoder), + provider + ) +} + +messageHandlers[messageAuth] = ( + _encoder, + decoder, + provider, + _emitSynced, + _messageType +) => { + authProtocol.readAuthMessage( + decoder, + provider.doc, + (_ydoc, reason) => permissionDeniedHandler(provider, reason) + ) +} + +// @todo - this should depend on awareness.outdatedTime +const messageReconnectTimeout = 30000 + +/** + * @param {WebsocketProvider} provider + * @param {string} reason + */ +const permissionDeniedHandler = (provider, reason) => + console.warn(`Permission denied to access ${provider.url}.\n${reason}`) + +/** + * @param {WebsocketProvider} provider + * @param {Uint8Array} buf + * @param {boolean} emitSynced + * @return {encoding.Encoder} + */ +const readMessage = (provider, buf, emitSynced) => { + const decoder = decoding.createDecoder(buf) + const encoder = encoding.createEncoder() + const messageType = decoding.readVarUint(decoder) + const messageHandler = provider.messageHandlers[messageType] + if (/** @type {any} */ (messageHandler)) { + messageHandler(encoder, decoder, provider, emitSynced, messageType) + } else { + console.error('Unable to compute message') + } + return encoder +} + +/** + * @param {WebsocketProvider} provider + */ +const setupWS = (provider) => { + if (provider.shouldConnect && provider.ws === null) { + const websocket = new provider._WS(provider.url, provider.protocols) + websocket.binaryType = 'arraybuffer' + provider.ws = websocket + provider.wsconnecting = true + provider.wsconnected = false + provider.synced = false + + websocket.onmessage = (event) => { + provider.wsLastMessageReceived = time.getUnixTime() + const encoder = readMessage(provider, new Uint8Array(event.data), true) + if (encoding.length(encoder) > 1) { + websocket.send(encoding.toUint8Array(encoder)) + } + } + websocket.onerror = (event) => { + provider.emit('connection-error', [event, provider]) + } + websocket.onclose = (event) => { + provider.emit('connection-close', [event, provider]) + provider.ws = null + provider.wsconnecting = false + if (provider.wsconnected) { + provider.wsconnected = false + provider.synced = false + // update awareness (all users except local left) + awarenessProtocol.removeAwarenessStates( + provider.awareness, + Array.from(provider.awareness.getStates().keys()).filter((client) => + client !== provider.doc.clientID + ), + provider + ) + provider.emit('status', [{ + status: 'disconnected' + }]) + } else { + provider.wsUnsuccessfulReconnects++ + } + // Start with no reconnect timeout and increase timeout by + // using exponential backoff starting with 100ms + setTimeout( + setupWS, + math.min( + math.pow(2, provider.wsUnsuccessfulReconnects) * 100, + provider.maxBackoffTime + ), + provider + ) + } + websocket.onopen = () => { + provider.wsLastMessageReceived = time.getUnixTime() + provider.wsconnecting = false + provider.wsconnected = true + provider.wsUnsuccessfulReconnects = 0 + provider.emit('status', [{ + status: 'connected' + }]) + // always send sync step 1 when connected + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageSync) + syncProtocol.writeSyncStep1(encoder, provider.doc) + websocket.send(encoding.toUint8Array(encoder)) + // broadcast local awareness state + if (provider.awareness.getLocalState() !== null) { + const encoderAwarenessState = encoding.createEncoder() + encoding.writeVarUint(encoderAwarenessState, messageAwareness) + encoding.writeVarUint8Array( + encoderAwarenessState, + awarenessProtocol.encodeAwarenessUpdate(provider.awareness, [ + provider.doc.clientID + ]) + ) + websocket.send(encoding.toUint8Array(encoderAwarenessState)) + } + } + provider.emit('status', [{ + status: 'connecting' + }]) + } +} + +/** + * @param {WebsocketProvider} provider + * @param {ArrayBuffer} buf + */ +const broadcastMessage = (provider, buf) => { + const ws = provider.ws + if (provider.wsconnected && ws && ws.readyState === ws.OPEN) { + ws.send(buf) + } + if (provider.bcconnected) { + bc.publish(provider.bcChannel, buf, provider) + } +} + +/** + * Websocket Provider for Yjs. Creates a websocket connection to sync the shared document. + * The document name is attached to the provided url. I.e. the following example + * creates a websocket connection to http://localhost:1234/my-document-name + * + * @example + * import * as Y from 'yjs' + * import { WebsocketProvider } from 'y-websocket' + * const doc = new Y.Doc() + * const provider = new WebsocketProvider('http://localhost:1234', 'my-document-name', doc) + * + * @extends {Observable} + */ +export class WebsocketProvider extends Observable { + /** + * @param {string} serverUrl + * @param {string} roomname + * @param {Y.Doc} doc + * @param {object} opts + * @param {boolean} [opts.connect] + * @param {awarenessProtocol.Awareness} [opts.awareness] + * @param {Object} [opts.params] specify url parameters + * @param {Array} [opts.protocols] specify websocket protocols + * @param {typeof WebSocket} [opts.WebSocketPolyfill] Optionall provide a WebSocket polyfill + * @param {number} [opts.resyncInterval] Request server state every `resyncInterval` milliseconds + * @param {number} [opts.maxBackoffTime] Maximum amount of time to wait before trying to reconnect (we try to reconnect using exponential backoff) + * @param {boolean} [opts.disableBc] Disable cross-tab BroadcastChannel communication + */ + constructor (serverUrl, roomname, doc, { + connect = true, + awareness = new awarenessProtocol.Awareness(doc), + params = {}, + protocols = [], + WebSocketPolyfill = WebSocket, + resyncInterval = -1, + maxBackoffTime = 2500, + disableBc = false + } = {}) { + super() + // ensure that url is always ends with / + while (serverUrl[serverUrl.length - 1] === '/') { + serverUrl = serverUrl.slice(0, serverUrl.length - 1) + } + this.serverUrl = serverUrl + this.bcChannel = serverUrl + '/' + roomname + this.maxBackoffTime = maxBackoffTime + /** + * The specified url parameters. This can be safely updated. The changed parameters will be used + * when a new connection is established. + * @type {Object} + */ + this.params = params + this.protocols = protocols + this.roomname = roomname + this.doc = doc + this._WS = WebSocketPolyfill + this.awareness = awareness + this.wsconnected = false + this.wsconnecting = false + this.bcconnected = false + this.disableBc = disableBc + this.wsUnsuccessfulReconnects = 0 + this.messageHandlers = messageHandlers.slice() + /** + * @type {boolean} + */ + this._synced = false + /** + * @type {WebSocket?} + */ + this.ws = null + this.wsLastMessageReceived = 0 + /** + * Whether to connect to other peers or not + * @type {boolean} + */ + this.shouldConnect = connect + + /** + * @type {number} + */ + this._resyncInterval = 0 + if (resyncInterval > 0) { + this._resyncInterval = /** @type {any} */ (setInterval(() => { + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + // resend sync step 1 + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageSync) + syncProtocol.writeSyncStep1(encoder, doc) + this.ws.send(encoding.toUint8Array(encoder)) + } + }, resyncInterval)) + } + + /** + * @param {ArrayBuffer} data + * @param {any} origin + */ + this._bcSubscriber = (data, origin) => { + if (origin !== this) { + const encoder = readMessage(this, new Uint8Array(data), false) + if (encoding.length(encoder) > 1) { + bc.publish(this.bcChannel, encoding.toUint8Array(encoder), this) + } + } + } + /** + * Listens to Yjs updates and sends them to remote peers (ws and broadcastchannel) + * @param {Uint8Array} update + * @param {any} origin + */ + this._updateHandler = (update, origin) => { + if (origin !== this) { + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageSync) + syncProtocol.writeUpdate(encoder, update) + broadcastMessage(this, encoding.toUint8Array(encoder)) + } + } + this.doc.on('update', this._updateHandler) + /** + * @param {any} changed + * @param {any} _origin + */ + this._awarenessUpdateHandler = ({ added, updated, removed }, _origin) => { + const changedClients = added.concat(updated).concat(removed) + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageAwareness) + encoding.writeVarUint8Array( + encoder, + awarenessProtocol.encodeAwarenessUpdate(awareness, changedClients) + ) + broadcastMessage(this, encoding.toUint8Array(encoder)) + } + this._exitHandler = () => { + awarenessProtocol.removeAwarenessStates( + this.awareness, + [doc.clientID], + 'app closed' + ) + } + if (env.isNode && typeof process !== 'undefined') { + process.on('exit', this._exitHandler) + } + awareness.on('update', this._awarenessUpdateHandler) + this._checkInterval = /** @type {any} */ (setInterval(() => { + if ( + this.wsconnected && + messageReconnectTimeout < + time.getUnixTime() - this.wsLastMessageReceived + ) { + // no message received in a long time - not even your own awareness + // updates (which are updated every 15 seconds) + /** @type {WebSocket} */ (this.ws).close() + } + }, messageReconnectTimeout / 10)) + if (connect) { + this.connect() + } + } + + get url () { + const encodedParams = url.encodeQueryParams(this.params) + return this.serverUrl + '/' + this.roomname + + (encodedParams.length === 0 ? '' : '?' + encodedParams) + } + + /** + * @type {boolean} + */ + get synced () { + return this._synced + } + + set synced (state) { + if (this._synced !== state) { + this._synced = state + this.emit('synced', [state]) + this.emit('sync', [state]) + } + } + + destroy () { + if (this._resyncInterval !== 0) { + clearInterval(this._resyncInterval) + } + clearInterval(this._checkInterval) + this.disconnect() + if (env.isNode && typeof process !== 'undefined') { + process.off('exit', this._exitHandler) + } + this.awareness.off('update', this._awarenessUpdateHandler) + this.doc.off('update', this._updateHandler) + super.destroy() + } + + connectBc () { + if (this.disableBc) { + return + } + if (!this.bcconnected) { + bc.subscribe(this.bcChannel, this._bcSubscriber) + this.bcconnected = true + } + // send sync step1 to bc + // write sync step 1 + const encoderSync = encoding.createEncoder() + encoding.writeVarUint(encoderSync, messageSync) + syncProtocol.writeSyncStep1(encoderSync, this.doc) + bc.publish(this.bcChannel, encoding.toUint8Array(encoderSync), this) + // broadcast local state + const encoderState = encoding.createEncoder() + encoding.writeVarUint(encoderState, messageSync) + syncProtocol.writeSyncStep2(encoderState, this.doc) + bc.publish(this.bcChannel, encoding.toUint8Array(encoderState), this) + // write queryAwareness + const encoderAwarenessQuery = encoding.createEncoder() + encoding.writeVarUint(encoderAwarenessQuery, messageQueryAwareness) + bc.publish( + this.bcChannel, + encoding.toUint8Array(encoderAwarenessQuery), + this + ) + // broadcast local awareness state + const encoderAwarenessState = encoding.createEncoder() + encoding.writeVarUint(encoderAwarenessState, messageAwareness) + encoding.writeVarUint8Array( + encoderAwarenessState, + awarenessProtocol.encodeAwarenessUpdate(this.awareness, [ + this.doc.clientID + ]) + ) + bc.publish( + this.bcChannel, + encoding.toUint8Array(encoderAwarenessState), + this + ) + } + + disconnectBc () { + // broadcast message with local awareness state set to null (indicating disconnect) + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageAwareness) + encoding.writeVarUint8Array( + encoder, + awarenessProtocol.encodeAwarenessUpdate(this.awareness, [ + this.doc.clientID + ], new Map()) + ) + broadcastMessage(this, encoding.toUint8Array(encoder)) + if (this.bcconnected) { + bc.unsubscribe(this.bcChannel, this._bcSubscriber) + this.bcconnected = false + } + } + + disconnect () { + this.shouldConnect = false + this.disconnectBc() + if (this.ws !== null) { + this.ws.close() + } + } + + connect () { + this.shouldConnect = true + if (!this.wsconnected && this.ws === null) { + setupWS(this) + this.connectBc() + } + } +} diff --git a/tsconfig.json b/tsconfig.json index 120039e57c8..9d64b435b9d 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -2,6 +2,7 @@ "extends": "@vue/tsconfig", "compilerOptions": { "allowSyntheticDefaultImports": true, + "allowJs": true, "declaration": true, "esModuleInterop": true, "lib": ["DOM", "ESNext"], @@ -21,4 +22,4 @@ "vueCompilerOptions": { "target": 2.7 } -} \ No newline at end of file +} From 105945f4b882a0632a2e3d0865807e4fdc682929 Mon Sep 17 00:00:00 2001 From: Max Date: Wed, 13 Nov 2024 11:35:29 +0100 Subject: [PATCH 02/16] fix(lint): y-websocket Signed-off-by: Max --- src/services/y-websocket.js | 795 ++++++++++++++++++------------------ 1 file changed, 399 insertions(+), 396 deletions(-) diff --git a/src/services/y-websocket.js b/src/services/y-websocket.js index ecc78951cb2..8bfb74d1f2a 100644 --- a/src/services/y-websocket.js +++ b/src/services/y-websocket.js @@ -3,6 +3,7 @@ */ /* eslint-env browser */ +/* eslint-disable jsdoc/require-param-description */ import * as Y from 'yjs' // eslint-disable-line import * as bc from 'lib0/broadcastchannel' @@ -29,70 +30,70 @@ export const messageAuth = 2 const messageHandlers = [] messageHandlers[messageSync] = ( - encoder, - decoder, - provider, - emitSynced, - _messageType + encoder, + decoder, + provider, + emitSynced, + _messageType, ) => { - encoding.writeVarUint(encoder, messageSync) - const syncMessageType = syncProtocol.readSyncMessage( - decoder, - encoder, - provider.doc, - provider - ) - if ( - emitSynced && syncMessageType === syncProtocol.messageYjsSyncStep2 && - !provider.synced - ) { - provider.synced = true - } + encoding.writeVarUint(encoder, messageSync) + const syncMessageType = syncProtocol.readSyncMessage( + decoder, + encoder, + provider.doc, + provider, + ) + if ( + emitSynced && syncMessageType === syncProtocol.messageYjsSyncStep2 + && !provider.synced + ) { + provider.synced = true + } } messageHandlers[messageQueryAwareness] = ( - encoder, - _decoder, - provider, - _emitSynced, - _messageType + encoder, + _decoder, + provider, + _emitSynced, + _messageType, ) => { - encoding.writeVarUint(encoder, messageAwareness) - encoding.writeVarUint8Array( - encoder, - awarenessProtocol.encodeAwarenessUpdate( - provider.awareness, - Array.from(provider.awareness.getStates().keys()) - ) - ) + encoding.writeVarUint(encoder, messageAwareness) + encoding.writeVarUint8Array( + encoder, + awarenessProtocol.encodeAwarenessUpdate( + provider.awareness, + Array.from(provider.awareness.getStates().keys()), + ), + ) } messageHandlers[messageAwareness] = ( - _encoder, - decoder, - provider, - _emitSynced, - _messageType + _encoder, + decoder, + provider, + _emitSynced, + _messageType, ) => { - awarenessProtocol.applyAwarenessUpdate( - provider.awareness, - decoding.readVarUint8Array(decoder), - provider - ) + awarenessProtocol.applyAwarenessUpdate( + provider.awareness, + decoding.readVarUint8Array(decoder), + provider, + ) } messageHandlers[messageAuth] = ( - _encoder, - decoder, - provider, - _emitSynced, - _messageType + _encoder, + decoder, + provider, + _emitSynced, + _messageType, ) => { - authProtocol.readAuthMessage( - decoder, - provider.doc, - (_ydoc, reason) => permissionDeniedHandler(provider, reason) - ) + authProtocol.readAuthMessage( + decoder, + provider.doc, + (_ydoc, reason) => permissionDeniedHandler(provider, reason), + ) } // @todo - this should depend on awareness.outdatedTime @@ -103,7 +104,7 @@ const messageReconnectTimeout = 30000 * @param {string} reason */ const permissionDeniedHandler = (provider, reason) => - console.warn(`Permission denied to access ${provider.url}.\n${reason}`) + console.warn(`Permission denied to access ${provider.url}.\n${reason}`) /** * @param {WebsocketProvider} provider @@ -112,102 +113,102 @@ const permissionDeniedHandler = (provider, reason) => * @return {encoding.Encoder} */ const readMessage = (provider, buf, emitSynced) => { - const decoder = decoding.createDecoder(buf) - const encoder = encoding.createEncoder() - const messageType = decoding.readVarUint(decoder) - const messageHandler = provider.messageHandlers[messageType] - if (/** @type {any} */ (messageHandler)) { - messageHandler(encoder, decoder, provider, emitSynced, messageType) - } else { - console.error('Unable to compute message') - } - return encoder + const decoder = decoding.createDecoder(buf) + const encoder = encoding.createEncoder() + const messageType = decoding.readVarUint(decoder) + const messageHandler = provider.messageHandlers[messageType] + if (/** @type {any} */ (messageHandler)) { + messageHandler(encoder, decoder, provider, emitSynced, messageType) + } else { + console.error('Unable to compute message') + } + return encoder } /** * @param {WebsocketProvider} provider */ const setupWS = (provider) => { - if (provider.shouldConnect && provider.ws === null) { - const websocket = new provider._WS(provider.url, provider.protocols) - websocket.binaryType = 'arraybuffer' - provider.ws = websocket - provider.wsconnecting = true - provider.wsconnected = false - provider.synced = false + if (provider.shouldConnect && provider.ws === null) { + const websocket = new provider._WS(provider.url, provider.protocols) + websocket.binaryType = 'arraybuffer' + provider.ws = websocket + provider.wsconnecting = true + provider.wsconnected = false + provider.synced = false - websocket.onmessage = (event) => { - provider.wsLastMessageReceived = time.getUnixTime() - const encoder = readMessage(provider, new Uint8Array(event.data), true) - if (encoding.length(encoder) > 1) { - websocket.send(encoding.toUint8Array(encoder)) - } - } - websocket.onerror = (event) => { - provider.emit('connection-error', [event, provider]) - } - websocket.onclose = (event) => { - provider.emit('connection-close', [event, provider]) - provider.ws = null - provider.wsconnecting = false - if (provider.wsconnected) { - provider.wsconnected = false - provider.synced = false - // update awareness (all users except local left) - awarenessProtocol.removeAwarenessStates( - provider.awareness, - Array.from(provider.awareness.getStates().keys()).filter((client) => - client !== provider.doc.clientID - ), - provider - ) - provider.emit('status', [{ - status: 'disconnected' - }]) - } else { - provider.wsUnsuccessfulReconnects++ - } - // Start with no reconnect timeout and increase timeout by - // using exponential backoff starting with 100ms - setTimeout( - setupWS, - math.min( - math.pow(2, provider.wsUnsuccessfulReconnects) * 100, - provider.maxBackoffTime - ), - provider - ) - } - websocket.onopen = () => { - provider.wsLastMessageReceived = time.getUnixTime() - provider.wsconnecting = false - provider.wsconnected = true - provider.wsUnsuccessfulReconnects = 0 - provider.emit('status', [{ - status: 'connected' - }]) - // always send sync step 1 when connected - const encoder = encoding.createEncoder() - encoding.writeVarUint(encoder, messageSync) - syncProtocol.writeSyncStep1(encoder, provider.doc) - websocket.send(encoding.toUint8Array(encoder)) - // broadcast local awareness state - if (provider.awareness.getLocalState() !== null) { - const encoderAwarenessState = encoding.createEncoder() - encoding.writeVarUint(encoderAwarenessState, messageAwareness) - encoding.writeVarUint8Array( - encoderAwarenessState, - awarenessProtocol.encodeAwarenessUpdate(provider.awareness, [ - provider.doc.clientID - ]) - ) - websocket.send(encoding.toUint8Array(encoderAwarenessState)) - } - } - provider.emit('status', [{ - status: 'connecting' - }]) - } + websocket.onmessage = (event) => { + provider.wsLastMessageReceived = time.getUnixTime() + const encoder = readMessage(provider, new Uint8Array(event.data), true) + if (encoding.length(encoder) > 1) { + websocket.send(encoding.toUint8Array(encoder)) + } + } + websocket.onerror = (event) => { + provider.emit('connection-error', [event, provider]) + } + websocket.onclose = (event) => { + provider.emit('connection-close', [event, provider]) + provider.ws = null + provider.wsconnecting = false + if (provider.wsconnected) { + provider.wsconnected = false + provider.synced = false + // update awareness (all users except local left) + awarenessProtocol.removeAwarenessStates( + provider.awareness, + Array.from(provider.awareness.getStates().keys()).filter((client) => + client !== provider.doc.clientID, + ), + provider, + ) + provider.emit('status', [{ + status: 'disconnected', + }]) + } else { + provider.wsUnsuccessfulReconnects++ + } + // Start with no reconnect timeout and increase timeout by + // using exponential backoff starting with 100ms + setTimeout( + setupWS, + math.min( + math.pow(2, provider.wsUnsuccessfulReconnects) * 100, + provider.maxBackoffTime, + ), + provider, + ) + } + websocket.onopen = () => { + provider.wsLastMessageReceived = time.getUnixTime() + provider.wsconnecting = false + provider.wsconnected = true + provider.wsUnsuccessfulReconnects = 0 + provider.emit('status', [{ + status: 'connected', + }]) + // always send sync step 1 when connected + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageSync) + syncProtocol.writeSyncStep1(encoder, provider.doc) + websocket.send(encoding.toUint8Array(encoder)) + // broadcast local awareness state + if (provider.awareness.getLocalState() !== null) { + const encoderAwarenessState = encoding.createEncoder() + encoding.writeVarUint(encoderAwarenessState, messageAwareness) + encoding.writeVarUint8Array( + encoderAwarenessState, + awarenessProtocol.encodeAwarenessUpdate(provider.awareness, [ + provider.doc.clientID, + ]), + ) + websocket.send(encoding.toUint8Array(encoderAwarenessState)) + } + } + provider.emit('status', [{ + status: 'connecting', + }]) + } } /** @@ -215,13 +216,13 @@ const setupWS = (provider) => { * @param {ArrayBuffer} buf */ const broadcastMessage = (provider, buf) => { - const ws = provider.ws - if (provider.wsconnected && ws && ws.readyState === ws.OPEN) { - ws.send(buf) - } - if (provider.bcconnected) { - bc.publish(provider.bcChannel, buf, provider) - } + const ws = provider.ws + if (provider.wsconnected && ws && ws.readyState === ws.OPEN) { + ws.send(buf) + } + if (provider.bcconnected) { + bc.publish(provider.bcChannel, buf, provider) + } } /** @@ -235,264 +236,266 @@ const broadcastMessage = (provider, buf) => { * const doc = new Y.Doc() * const provider = new WebsocketProvider('http://localhost:1234', 'my-document-name', doc) * - * @extends {Observable} + * @augments {Observable} */ export class WebsocketProvider extends Observable { - /** - * @param {string} serverUrl - * @param {string} roomname - * @param {Y.Doc} doc - * @param {object} opts - * @param {boolean} [opts.connect] - * @param {awarenessProtocol.Awareness} [opts.awareness] - * @param {Object} [opts.params] specify url parameters - * @param {Array} [opts.protocols] specify websocket protocols - * @param {typeof WebSocket} [opts.WebSocketPolyfill] Optionall provide a WebSocket polyfill - * @param {number} [opts.resyncInterval] Request server state every `resyncInterval` milliseconds - * @param {number} [opts.maxBackoffTime] Maximum amount of time to wait before trying to reconnect (we try to reconnect using exponential backoff) - * @param {boolean} [opts.disableBc] Disable cross-tab BroadcastChannel communication - */ - constructor (serverUrl, roomname, doc, { - connect = true, - awareness = new awarenessProtocol.Awareness(doc), - params = {}, - protocols = [], - WebSocketPolyfill = WebSocket, - resyncInterval = -1, - maxBackoffTime = 2500, - disableBc = false - } = {}) { - super() - // ensure that url is always ends with / - while (serverUrl[serverUrl.length - 1] === '/') { - serverUrl = serverUrl.slice(0, serverUrl.length - 1) - } - this.serverUrl = serverUrl - this.bcChannel = serverUrl + '/' + roomname - this.maxBackoffTime = maxBackoffTime - /** - * The specified url parameters. This can be safely updated. The changed parameters will be used - * when a new connection is established. - * @type {Object} - */ - this.params = params - this.protocols = protocols - this.roomname = roomname - this.doc = doc - this._WS = WebSocketPolyfill - this.awareness = awareness - this.wsconnected = false - this.wsconnecting = false - this.bcconnected = false - this.disableBc = disableBc - this.wsUnsuccessfulReconnects = 0 - this.messageHandlers = messageHandlers.slice() - /** - * @type {boolean} - */ - this._synced = false - /** - * @type {WebSocket?} - */ - this.ws = null - this.wsLastMessageReceived = 0 - /** - * Whether to connect to other peers or not - * @type {boolean} - */ - this.shouldConnect = connect - /** - * @type {number} - */ - this._resyncInterval = 0 - if (resyncInterval > 0) { - this._resyncInterval = /** @type {any} */ (setInterval(() => { - if (this.ws && this.ws.readyState === WebSocket.OPEN) { - // resend sync step 1 - const encoder = encoding.createEncoder() - encoding.writeVarUint(encoder, messageSync) - syncProtocol.writeSyncStep1(encoder, doc) - this.ws.send(encoding.toUint8Array(encoder)) - } - }, resyncInterval)) - } + /** + * @param {string} serverUrl + * @param {string} roomname + * @param {Y.Doc} doc + * @param {object} opts + * @param {boolean} [opts.connect] + * @param {awarenessProtocol.Awareness} [opts.awareness] + * @param {{[key: string]: string}} [opts.params] specify url parameters + * @param {Array} [opts.protocols] specify websocket protocols + * @param {typeof WebSocket} [opts.WebSocketPolyfill] Optionall provide a WebSocket polyfill + * @param {number} [opts.resyncInterval] Request server state every `resyncInterval` milliseconds + * @param {number} [opts.maxBackoffTime] Maximum amount of time to wait before trying to reconnect (we try to reconnect using exponential backoff) + * @param {boolean} [opts.disableBc] Disable cross-tab BroadcastChannel communication + */ + constructor(serverUrl, roomname, doc, { + connect = true, + awareness = new awarenessProtocol.Awareness(doc), + params = {}, + protocols = [], + WebSocketPolyfill = WebSocket, + resyncInterval = -1, + maxBackoffTime = 2500, + disableBc = false, + } = {}) { + super() + // ensure that url is always ends with / + while (serverUrl[serverUrl.length - 1] === '/') { + serverUrl = serverUrl.slice(0, serverUrl.length - 1) + } + this.serverUrl = serverUrl + this.bcChannel = serverUrl + '/' + roomname + this.maxBackoffTime = maxBackoffTime + /** + * The specified url parameters. This can be safely updated. The changed parameters will be used + * when a new connection is established. + * @type {{[key: string]: string}} + */ + this.params = params + this.protocols = protocols + this.roomname = roomname + this.doc = doc + this._WS = WebSocketPolyfill + this.awareness = awareness + this.wsconnected = false + this.wsconnecting = false + this.bcconnected = false + this.disableBc = disableBc + this.wsUnsuccessfulReconnects = 0 + this.messageHandlers = messageHandlers.slice() + /** + * @type {boolean} + */ + this._synced = false + /** + * @type {WebSocket?} + */ + this.ws = null + this.wsLastMessageReceived = 0 + /** + * Whether to connect to other peers or not + * @type {boolean} + */ + this.shouldConnect = connect - /** - * @param {ArrayBuffer} data - * @param {any} origin - */ - this._bcSubscriber = (data, origin) => { - if (origin !== this) { - const encoder = readMessage(this, new Uint8Array(data), false) - if (encoding.length(encoder) > 1) { - bc.publish(this.bcChannel, encoding.toUint8Array(encoder), this) - } - } - } - /** - * Listens to Yjs updates and sends them to remote peers (ws and broadcastchannel) - * @param {Uint8Array} update - * @param {any} origin - */ - this._updateHandler = (update, origin) => { - if (origin !== this) { - const encoder = encoding.createEncoder() - encoding.writeVarUint(encoder, messageSync) - syncProtocol.writeUpdate(encoder, update) - broadcastMessage(this, encoding.toUint8Array(encoder)) - } - } - this.doc.on('update', this._updateHandler) - /** - * @param {any} changed - * @param {any} _origin - */ - this._awarenessUpdateHandler = ({ added, updated, removed }, _origin) => { - const changedClients = added.concat(updated).concat(removed) - const encoder = encoding.createEncoder() - encoding.writeVarUint(encoder, messageAwareness) - encoding.writeVarUint8Array( - encoder, - awarenessProtocol.encodeAwarenessUpdate(awareness, changedClients) - ) - broadcastMessage(this, encoding.toUint8Array(encoder)) - } - this._exitHandler = () => { - awarenessProtocol.removeAwarenessStates( - this.awareness, - [doc.clientID], - 'app closed' - ) - } - if (env.isNode && typeof process !== 'undefined') { - process.on('exit', this._exitHandler) - } - awareness.on('update', this._awarenessUpdateHandler) - this._checkInterval = /** @type {any} */ (setInterval(() => { - if ( - this.wsconnected && - messageReconnectTimeout < - time.getUnixTime() - this.wsLastMessageReceived - ) { - // no message received in a long time - not even your own awareness - // updates (which are updated every 15 seconds) - /** @type {WebSocket} */ (this.ws).close() - } - }, messageReconnectTimeout / 10)) - if (connect) { - this.connect() - } - } + /** + * @type {number} + */ + this._resyncInterval = 0 + if (resyncInterval > 0) { + this._resyncInterval = /** @type {any} */ (setInterval(() => { + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + // resend sync step 1 + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageSync) + syncProtocol.writeSyncStep1(encoder, doc) + this.ws.send(encoding.toUint8Array(encoder)) + } + }, resyncInterval)) + } - get url () { - const encodedParams = url.encodeQueryParams(this.params) - return this.serverUrl + '/' + this.roomname + - (encodedParams.length === 0 ? '' : '?' + encodedParams) - } + /** + * @param {ArrayBuffer} data + * @param {any} origin + */ + this._bcSubscriber = (data, origin) => { + if (origin !== this) { + const encoder = readMessage(this, new Uint8Array(data), false) + if (encoding.length(encoder) > 1) { + bc.publish(this.bcChannel, encoding.toUint8Array(encoder), this) + } + } + } + /** + * Listens to Yjs updates and sends them to remote peers (ws and broadcastchannel) + * @param {Uint8Array} update + * @param {any} origin + */ + this._updateHandler = (update, origin) => { + if (origin !== this) { + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageSync) + syncProtocol.writeUpdate(encoder, update) + broadcastMessage(this, encoding.toUint8Array(encoder)) + } + } + this.doc.on('update', this._updateHandler) + /** + * @param {any} changed + * @param {any} _origin + */ + this._awarenessUpdateHandler = ({ added, updated, removed }, _origin) => { + const changedClients = added.concat(updated).concat(removed) + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageAwareness) + encoding.writeVarUint8Array( + encoder, + awarenessProtocol.encodeAwarenessUpdate(awareness, changedClients), + ) + broadcastMessage(this, encoding.toUint8Array(encoder)) + } + this._exitHandler = () => { + awarenessProtocol.removeAwarenessStates( + this.awareness, + [doc.clientID], + 'app closed', + ) + } + if (env.isNode && typeof process !== 'undefined') { + process.on('exit', this._exitHandler) + } + awareness.on('update', this._awarenessUpdateHandler) + this._checkInterval = /** @type {any} */ (setInterval(() => { + if ( + this.wsconnected + && messageReconnectTimeout + < time.getUnixTime() - this.wsLastMessageReceived + ) { + // no message received in a long time - not even your own awareness + // updates (which are updated every 15 seconds) + /** @type {WebSocket} */ (this.ws).close() + } + }, messageReconnectTimeout / 10)) + if (connect) { + this.connect() + } + } - /** - * @type {boolean} - */ - get synced () { - return this._synced - } + get url() { + const encodedParams = url.encodeQueryParams(this.params) + return this.serverUrl + '/' + this.roomname + + (encodedParams.length === 0 ? '' : '?' + encodedParams) + } - set synced (state) { - if (this._synced !== state) { - this._synced = state - this.emit('synced', [state]) - this.emit('sync', [state]) - } - } + /** + * @type {boolean} + */ + get synced() { + return this._synced + } - destroy () { - if (this._resyncInterval !== 0) { - clearInterval(this._resyncInterval) - } - clearInterval(this._checkInterval) - this.disconnect() - if (env.isNode && typeof process !== 'undefined') { - process.off('exit', this._exitHandler) - } - this.awareness.off('update', this._awarenessUpdateHandler) - this.doc.off('update', this._updateHandler) - super.destroy() - } + set synced(state) { + if (this._synced !== state) { + this._synced = state + this.emit('synced', [state]) + this.emit('sync', [state]) + } + } - connectBc () { - if (this.disableBc) { - return - } - if (!this.bcconnected) { - bc.subscribe(this.bcChannel, this._bcSubscriber) - this.bcconnected = true - } - // send sync step1 to bc - // write sync step 1 - const encoderSync = encoding.createEncoder() - encoding.writeVarUint(encoderSync, messageSync) - syncProtocol.writeSyncStep1(encoderSync, this.doc) - bc.publish(this.bcChannel, encoding.toUint8Array(encoderSync), this) - // broadcast local state - const encoderState = encoding.createEncoder() - encoding.writeVarUint(encoderState, messageSync) - syncProtocol.writeSyncStep2(encoderState, this.doc) - bc.publish(this.bcChannel, encoding.toUint8Array(encoderState), this) - // write queryAwareness - const encoderAwarenessQuery = encoding.createEncoder() - encoding.writeVarUint(encoderAwarenessQuery, messageQueryAwareness) - bc.publish( - this.bcChannel, - encoding.toUint8Array(encoderAwarenessQuery), - this - ) - // broadcast local awareness state - const encoderAwarenessState = encoding.createEncoder() - encoding.writeVarUint(encoderAwarenessState, messageAwareness) - encoding.writeVarUint8Array( - encoderAwarenessState, - awarenessProtocol.encodeAwarenessUpdate(this.awareness, [ - this.doc.clientID - ]) - ) - bc.publish( - this.bcChannel, - encoding.toUint8Array(encoderAwarenessState), - this - ) - } + destroy() { + if (this._resyncInterval !== 0) { + clearInterval(this._resyncInterval) + } + clearInterval(this._checkInterval) + this.disconnect() + if (env.isNode && typeof process !== 'undefined') { + process.off('exit', this._exitHandler) + } + this.awareness.off('update', this._awarenessUpdateHandler) + this.doc.off('update', this._updateHandler) + super.destroy() + } - disconnectBc () { - // broadcast message with local awareness state set to null (indicating disconnect) - const encoder = encoding.createEncoder() - encoding.writeVarUint(encoder, messageAwareness) - encoding.writeVarUint8Array( - encoder, - awarenessProtocol.encodeAwarenessUpdate(this.awareness, [ - this.doc.clientID - ], new Map()) - ) - broadcastMessage(this, encoding.toUint8Array(encoder)) - if (this.bcconnected) { - bc.unsubscribe(this.bcChannel, this._bcSubscriber) - this.bcconnected = false - } - } + connectBc() { + if (this.disableBc) { + return + } + if (!this.bcconnected) { + bc.subscribe(this.bcChannel, this._bcSubscriber) + this.bcconnected = true + } + // send sync step1 to bc + // write sync step 1 + const encoderSync = encoding.createEncoder() + encoding.writeVarUint(encoderSync, messageSync) + syncProtocol.writeSyncStep1(encoderSync, this.doc) + bc.publish(this.bcChannel, encoding.toUint8Array(encoderSync), this) + // broadcast local state + const encoderState = encoding.createEncoder() + encoding.writeVarUint(encoderState, messageSync) + syncProtocol.writeSyncStep2(encoderState, this.doc) + bc.publish(this.bcChannel, encoding.toUint8Array(encoderState), this) + // write queryAwareness + const encoderAwarenessQuery = encoding.createEncoder() + encoding.writeVarUint(encoderAwarenessQuery, messageQueryAwareness) + bc.publish( + this.bcChannel, + encoding.toUint8Array(encoderAwarenessQuery), + this, + ) + // broadcast local awareness state + const encoderAwarenessState = encoding.createEncoder() + encoding.writeVarUint(encoderAwarenessState, messageAwareness) + encoding.writeVarUint8Array( + encoderAwarenessState, + awarenessProtocol.encodeAwarenessUpdate(this.awareness, [ + this.doc.clientID, + ]), + ) + bc.publish( + this.bcChannel, + encoding.toUint8Array(encoderAwarenessState), + this, + ) + } - disconnect () { - this.shouldConnect = false - this.disconnectBc() - if (this.ws !== null) { - this.ws.close() - } - } + disconnectBc() { + // broadcast message with local awareness state set to null (indicating disconnect) + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageAwareness) + encoding.writeVarUint8Array( + encoder, + awarenessProtocol.encodeAwarenessUpdate(this.awareness, [ + this.doc.clientID, + ], new Map()), + ) + broadcastMessage(this, encoding.toUint8Array(encoder)) + if (this.bcconnected) { + bc.unsubscribe(this.bcChannel, this._bcSubscriber) + this.bcconnected = false + } + } + + disconnect() { + this.shouldConnect = false + this.disconnectBc() + if (this.ws !== null) { + this.ws.close() + } + } + + connect() { + this.shouldConnect = true + if (!this.wsconnected && this.ws === null) { + setupWS(this) + this.connectBc() + } + } - connect () { - this.shouldConnect = true - if (!this.wsconnected && this.ws === null) { - setupWS(this) - this.connectBc() - } - } } From 93abf90c4cbebd7e9c92ec86deaeb6dc4dba5a2a Mon Sep 17 00:00:00 2001 From: Max Date: Wed, 13 Nov 2024 12:26:44 +0100 Subject: [PATCH 03/16] enh(y-websocket): always send full diff to server state Keep an internal ydoc tracking updates that came from the server. Send updates, that would sync this doc with the current doc state. Signed-off-by: Max --- src/helpers/yjs.js | 1 + src/services/y-websocket.js | 32 ++++++++++++++++++++++++++++---- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/src/helpers/yjs.js b/src/helpers/yjs.js index e66f35bf060..9e58deff6ca 100644 --- a/src/helpers/yjs.js +++ b/src/helpers/yjs.js @@ -88,6 +88,7 @@ export function applyUpdateMessage(ydoc, updateMessage, origin = 'origin') { export function getSteps(queue) { return queue.map(s => encodeArrayBuffer(s)) .filter(s => s < 'AQ') + .slice(-1) } /** diff --git a/src/services/y-websocket.js b/src/services/y-websocket.js index 8bfb74d1f2a..57d460843bd 100644 --- a/src/services/y-websocket.js +++ b/src/services/y-websocket.js @@ -37,14 +37,31 @@ messageHandlers[messageSync] = ( _messageType, ) => { encoding.writeVarUint(encoder, messageSync) + const decoderForRemote = decoding.clone(decoder) const syncMessageType = syncProtocol.readSyncMessage( decoder, encoder, provider.doc, provider, ) + // Message came from the broadcast channel + // Do not track in this.remote and do not emit sync. + if (!emitSynced) { + return + } if ( - emitSynced && syncMessageType === syncProtocol.messageYjsSyncStep2 + syncMessageType === syncProtocol.messageYjsSyncStep2 + || syncMessageType === syncProtocol.messageYjsUpdate + ) { + syncProtocol.readSyncMessage( + decoderForRemote, + encoding.createEncoder(), + provider.remote, + provider, + ) + } + if ( + syncMessageType === syncProtocol.messageYjsSyncStep2 && !provider.synced ) { provider.synced = true @@ -289,6 +306,10 @@ export class WebsocketProvider extends Observable { this.disableBc = disableBc this.wsUnsuccessfulReconnects = 0 this.messageHandlers = messageHandlers.slice() + /** + * @type {Y.Doc} + */ + this.remote = new Y.Doc() /** * @type {boolean} */ @@ -334,14 +355,17 @@ export class WebsocketProvider extends Observable { } /** * Listens to Yjs updates and sends them to remote peers (ws and broadcastchannel) - * @param {Uint8Array} update + * @param {Uint8Array} _update * @param {any} origin + * @param {Y.Doc} doc */ - this._updateHandler = (update, origin) => { + this._updateHandler = (_update, origin, doc) => { if (origin !== this) { + const from = Y.encodeStateVector(this.remote) + const fullUpdate = Y.encodeStateAsUpdate(doc, from) const encoder = encoding.createEncoder() encoding.writeVarUint(encoder, messageSync) - syncProtocol.writeUpdate(encoder, update) + syncProtocol.writeUpdate(encoder, fullUpdate) broadcastMessage(this, encoding.toUint8Array(encoder)) } } From 1b214d97cecd4c873fbe7951154cf95e60e3f2c1 Mon Sep 17 00:00:00 2001 From: Max Date: Thu, 14 Nov 2024 14:53:39 +0100 Subject: [PATCH 04/16] fix(license): add license header and props to y-websocket Signed-off-by: Max --- src/services/y-websocket.js | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/services/y-websocket.js b/src/services/y-websocket.js index 57d460843bd..bb1e670692c 100644 --- a/src/services/y-websocket.js +++ b/src/services/y-websocket.js @@ -1,5 +1,11 @@ /** - * @module provider/websocket + * SPDX-FileCopyrightText: 2019 Kevin Jahns + * SPDX-License-Identifier: MIT + */ + +/** + * Based on the awesome y-websocket https://github.com/yjs/y-websocket/ + * Modified to match the needs of an http transport. */ /* eslint-env browser */ From ce033a26057f325eb3af56b87c5f2850fa971789 Mon Sep 17 00:00:00 2001 From: Max Date: Thu, 14 Nov 2024 17:32:46 +0100 Subject: [PATCH 05/16] fix(awareness): only send updates about the local client Signed-off-by: Max --- src/services/y-websocket.js | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/services/y-websocket.js b/src/services/y-websocket.js index bb1e670692c..7fdd7da5493 100644 --- a/src/services/y-websocket.js +++ b/src/services/y-websocket.js @@ -74,6 +74,7 @@ messageHandlers[messageSync] = ( } } +// modified to only send own awareness messageHandlers[messageQueryAwareness] = ( encoder, _decoder, @@ -86,7 +87,8 @@ messageHandlers[messageQueryAwareness] = ( encoder, awarenessProtocol.encodeAwarenessUpdate( provider.awareness, - Array.from(provider.awareness.getStates().keys()), + [provider.doc.clientID], + // Array.from(provider.awareness.getStates().keys()), ), ) } @@ -377,16 +379,22 @@ export class WebsocketProvider extends Observable { } this.doc.on('update', this._updateHandler) /** + * Send an awareness update message when local awareness changes + * modified to only send update about this client. * @param {any} changed * @param {any} _origin */ this._awarenessUpdateHandler = ({ added, updated, removed }, _origin) => { - const changedClients = added.concat(updated).concat(removed) + // const changedClients = added.concat(updated).concat(removed) const encoder = encoding.createEncoder() encoding.writeVarUint(encoder, messageAwareness) encoding.writeVarUint8Array( encoder, - awarenessProtocol.encodeAwarenessUpdate(awareness, changedClients), + awarenessProtocol.encodeAwarenessUpdate( + awareness, + [this.doc.clientID], + // changedClients + ), ) broadcastMessage(this, encoding.toUint8Array(encoder)) } From b6e44845a04a497f2d1d6271505bb57ffc763a56 Mon Sep 17 00:00:00 2001 From: Max Date: Thu, 14 Nov 2024 18:49:14 +0100 Subject: [PATCH 06/16] fix(sync): make use of steps in push responses The pushed steps are echoed back with all other steps since version immediately. Processing them reduces the size of the following pushes and syncs. Signed-off-by: Max --- src/components/Editor.vue | 4 +++- src/services/SyncService.js | 24 ++++++++++++------------ 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/src/components/Editor.vue b/src/components/Editor.vue index ac0fc586f32..15167128669 100644 --- a/src/components/Editor.vue +++ b/src/components/Editor.vue @@ -578,7 +578,9 @@ export default { this.$nextTick(() => { this.emit('sync-service:sync') }) - this.document = document + if (document) { + this.document = document + } }, onError({ type, data }) { diff --git a/src/services/SyncService.js b/src/services/SyncService.js index 92fa361b129..8b1dee0bd72 100644 --- a/src/services/SyncService.js +++ b/src/services/SyncService.js @@ -171,19 +171,18 @@ class SyncService { this.sending = true clearInterval(this.#sendIntervalId) this.#sendIntervalId = null - const data = getSendable() - if (data.steps.length > 0) { + const sendable = getSendable() + if (sendable.steps.length > 0) { this.emit('stateChange', { dirty: true }) } - return this.#connection.push(data) + return this.#connection.push(sendable) .then((response) => { + const { steps } = response.data this.pushError = 0 this.sending = false - this.emit('sync', { - steps: [], - document: this.#connection.document, - version: this.version, - }) + if (steps?.length > 0) { + this._receiveSteps({ steps }) + } }).catch(err => { const { response, code } = err this.sending = false @@ -194,11 +193,13 @@ class SyncService { if (response?.status === 412) { this.emit('error', { type: ERROR_TYPE.LOAD_ERROR, data: response }) } else if (response?.status === 403) { - if (!data.document) { + // TODO: is this really about sendable? + if (!sendable.document) { // either the session is invalid or the document is read only. logger.error('failed to write to document - not allowed') this.emit('error', { type: ERROR_TYPE.PUSH_FORBIDDEN, data: {} }) } + // TODO: does response.data ever have a document? maybe for errors? // Only emit conflict event if we have synced until the latest version if (response.data.document?.currentVersion === this.version) { this.emit('error', { type: ERROR_TYPE.PUSH_FAILURE, data: {} }) @@ -211,7 +212,7 @@ class SyncService { }) } - _receiveSteps({ steps, document, sessions }) { + _receiveSteps({ steps, document = null, sessions = [] }) { const awareness = sessions .filter(s => s.lastContact > (Math.floor(Date.now() / 1000) - COLLABORATOR_DISCONNECT_TIME)) .filter(s => s.lastAwarenessMessage) @@ -239,8 +240,7 @@ class SyncService { this.lastStepPush = Date.now() this.emit('sync', { steps: newSteps, - // TODO: do we actually need to dig into the connection here? - document: this.#connection.document, + document, version: this.version, }) } From 9d2c13cb273e64d573110ae343769f689485aaf0 Mon Sep 17 00:00:00 2001 From: Max Date: Sun, 17 Nov 2024 20:52:59 +0100 Subject: [PATCH 07/16] refactor(sync): rename _receiveSteps to receiveSteps It has always been publicly called from the PollingBackend. Signed-off-by: Max --- src/services/PollingBackend.js | 2 +- src/services/SyncService.js | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/services/PollingBackend.js b/src/services/PollingBackend.js index 054bbcc865b..f7980d03d94 100644 --- a/src/services/PollingBackend.js +++ b/src/services/PollingBackend.js @@ -127,7 +127,7 @@ class PollingBackend { this.#fetchRetryCounter = 0 this.#syncService.emit('change', { document, sessions }) - this.#syncService._receiveSteps(data) + this.#syncService.receiveSteps(data) if (data.steps.length === 0) { if (!this.#initialLoadingFinished) { diff --git a/src/services/SyncService.js b/src/services/SyncService.js index 8b1dee0bd72..266f51e7379 100644 --- a/src/services/SyncService.js +++ b/src/services/SyncService.js @@ -181,7 +181,7 @@ class SyncService { this.pushError = 0 this.sending = false if (steps?.length > 0) { - this._receiveSteps({ steps }) + this.receiveSteps({ steps }) } }).catch(err => { const { response, code } = err @@ -212,7 +212,7 @@ class SyncService { }) } - _receiveSteps({ steps, document = null, sessions = [] }) { + receiveSteps({ steps, document = null, sessions = [] }) { const awareness = sessions .filter(s => s.lastContact > (Math.floor(Date.now() / 1000) - COLLABORATOR_DISCONNECT_TIME)) .filter(s => s.lastAwarenessMessage) From ed92946a0a08e2fcca27fadbeef21bf9ccf56515 Mon Sep 17 00:00:00 2001 From: Max Date: Sun, 17 Nov 2024 20:55:19 +0100 Subject: [PATCH 08/16] refactor(sync): drop superfluous update message Updates now include all the local structs that were not yet received from remote. No need to compute a separate update message anymore. Signed-off-by: Max --- src/components/Editor.vue | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/components/Editor.vue b/src/components/Editor.vue index 15167128669..f45d9368b38 100644 --- a/src/components/Editor.vue +++ b/src/components/Editor.vue @@ -92,7 +92,7 @@ import { import ReadonlyBar from './Menu/ReadonlyBar.vue' import { logger } from '../helpers/logger.js' -import { getDocumentState, applyDocumentState, getUpdateMessage } from '../helpers/yjs.js' +import { getDocumentState, applyDocumentState } from '../helpers/yjs.js' import { SyncService, ERROR_TYPE, IDLE_TIMEOUT } from './../services/SyncService.js' import createSyncServiceProvider from './../services/SyncServiceProvider.js' import AttachmentResolver from './../services/AttachmentResolver.js' @@ -494,12 +494,6 @@ export default { onLoaded({ document, documentSource, documentState }) { if (documentState) { applyDocumentState(this.$ydoc, documentState, this.$providers[0]) - // distribute additional state that may exist locally - const updateMessage = getUpdateMessage(this.$ydoc, documentState) - if (updateMessage) { - logger.debug('onLoaded: Pushing local changes to server') - this.$queue.push(updateMessage) - } } else { this.setInitialYjsState(documentSource, { isRichEditor: this.isRichEditor }) } From 470f167153fbb50eb87d2bb5246a399f1c5f46c7 Mon Sep 17 00:00:00 2001 From: Max Date: Mon, 18 Nov 2024 01:30:06 +0100 Subject: [PATCH 09/16] fix(sync): Do not resend document state in first push. Apply document state as a step. Process it like other steps received from the remote. In particular include it in the tracking of steps already applied and set the version accordingly. Signed-off-by: Max --- cypress/component/helpers/yjs.cy.js | 32 ++++-------------- src/components/Editor.vue | 4 +-- src/helpers/yjs.js | 50 ++++++++++++++--------------- src/services/SyncService.js | 11 ++++++- 4 files changed, 42 insertions(+), 55 deletions(-) diff --git a/cypress/component/helpers/yjs.cy.js b/cypress/component/helpers/yjs.cy.js index 427bf671d06..f61c29c2b27 100644 --- a/cypress/component/helpers/yjs.cy.js +++ b/cypress/component/helpers/yjs.cy.js @@ -4,10 +4,10 @@ */ import * as Y from 'yjs' -import { getDocumentState, getUpdateMessage, applyUpdateMessage } from '../../../src/helpers/yjs.js' +import { getDocumentState, documentStateToStep, applyStep } from '../../../src/helpers/yjs.js' describe('Yjs base64 wrapped with our helpers', function() { - it('applies step in wrong order', function() { + it('applies step generated from document state', function() { const source = new Y.Doc() const target = new Y.Doc() const sourceMap = source.getMap() @@ -17,44 +17,26 @@ describe('Yjs base64 wrapped with our helpers', function() { // console.log('afterTransaction', tr) }) - const state0 = getDocumentState(source) - // Add keyA to source and apply to target sourceMap.set('keyA', 'valueA') const stateA = getDocumentState(source) - const update0A = getUpdateMessage(source, state0) - applyUpdateMessage(target, update0A) + const step0A = documentStateToStep(stateA) + applyStep(target, step0A) expect(targetMap.get('keyA')).to.be.eq('valueA') // Add keyB to source, don't apply to target yet sourceMap.set('keyB', 'valueB') const stateB = getDocumentState(source) - const updateAB = getUpdateMessage(source, stateA) + const step0B = documentStateToStep(stateB) // Add keyC to source, apply to target sourceMap.set('keyC', 'valueC') - const updateBC = getUpdateMessage(source, stateB) - applyUpdateMessage(target, updateBC) - expect(targetMap.get('keyB')).to.be.eq(undefined) - expect(targetMap.get('keyC')).to.be.eq(undefined) // Apply keyB to target - applyUpdateMessage(target, updateAB) + applyStep(target, step0B) expect(targetMap.get('keyB')).to.be.eq('valueB') - expect(targetMap.get('keyC')).to.be.eq('valueC') - }) - - it('update message is empty if no additional state exists', function() { - const source = new Y.Doc() - const sourceMap = source.getMap() - const state0 = getDocumentState(source) - sourceMap.set('keyA', 'valueA') - const stateA = getDocumentState(source) - const update0A = getUpdateMessage(source, state0) - const updateAA = getUpdateMessage(source, stateA) - expect(update0A.length).to.be.eq(29) - expect(updateAA).to.be.eq(undefined) + expect(targetMap.get('keyC')).to.be.eq(undefined) }) }) diff --git a/src/components/Editor.vue b/src/components/Editor.vue index f45d9368b38..7745df4c498 100644 --- a/src/components/Editor.vue +++ b/src/components/Editor.vue @@ -492,9 +492,7 @@ export default { }, onLoaded({ document, documentSource, documentState }) { - if (documentState) { - applyDocumentState(this.$ydoc, documentState, this.$providers[0]) - } else { + if (!documentState) { this.setInitialYjsState(documentSource, { isRichEditor: this.isRichEditor }) } diff --git a/src/helpers/yjs.js b/src/helpers/yjs.js index 9e58deff6ca..fbc58796e86 100644 --- a/src/helpers/yjs.js +++ b/src/helpers/yjs.js @@ -34,36 +34,44 @@ export function applyDocumentState(ydoc, documentState, origin) { } /** - * Update message for everything in ydoc that is not in encodedBaseUpdate + * Create a step from a document state + * i.e. create a sync protocol update message from it + * and encode it and wrap it in a step data structure. * - * @param {Y.Doc} ydoc - encode state of this doc - * @param {string} encodedBaseUpdate - base64 encoded doc update to build upon - * @return {Uint8Array|undefined} + * @param {string} documentState - base64 encoded doc state + * @return {string} base64 encoded yjs sync protocol update message */ -export function getUpdateMessage(ydoc, encodedBaseUpdate) { - const baseUpdate = decodeArrayBuffer(encodedBaseUpdate) - const baseStateVector = Y.encodeStateVectorFromUpdate(baseUpdate) - const docStateVector = Y.encodeStateVector(ydoc) - if (sameState(baseStateVector, docStateVector)) { - // no additional state in the ydoc - early return - return - } +export function documentStateToStep(documentState) { + const message = documentStateToUpdateMessage(documentState) + return { step: encodeArrayBuffer(message) } +} + +/** + * Create an update message from a document state + * i.e. decode the base64 encoded yjs update + * and create a sync protocol update message from it + * + * @param {string} documentState - base64 encoded doc state + * @return {Uint8Array} + */ +function documentStateToUpdateMessage(documentState) { + const update = decodeArrayBuffer(documentState) const encoder = encoding.createEncoder() encoding.writeVarUint(encoder, messageSync) - const update = Y.encodeStateAsUpdate(ydoc, baseStateVector) syncProtocol.writeUpdate(encoder, update) return encoding.toUint8Array(encoder) } /** - * Apply an updated message to the ydoc. + * Apply a step to the ydoc. * * Only used in tests right now. * @param {Y.Doc} ydoc - encode state of this doc - * @param {Uint8Array} updateMessage - y-websocket sync message with update + * @param {string} step - base64 encoded yjs sync update message * @param {object} origin - initiator object e.g. WebsocketProvider */ -export function applyUpdateMessage(ydoc, updateMessage, origin = 'origin') { +export function applyStep(ydoc, step, origin = 'origin') { + const updateMessage = decodeArrayBuffer(step.step) const decoder = decoding.createDecoder(updateMessage) const messageType = decoding.readVarUint(decoder) if (messageType !== messageSync) { @@ -128,13 +136,3 @@ export function logStep(step) { break } } - -/** - * Helper function to check if two state vectors have the same state - * @param {Array} arr - state vector to compare - * @param {Array} other - state vector to compare against - */ -function sameState(arr, other) { - return arr.length === other.length - && arr.every((value, index) => other[index] === value) -} diff --git a/src/services/SyncService.js b/src/services/SyncService.js index 266f51e7379..5c0c50f9f19 100644 --- a/src/services/SyncService.js +++ b/src/services/SyncService.js @@ -10,7 +10,7 @@ import debounce from 'debounce' import PollingBackend from './PollingBackend.js' import SessionApi, { Connection } from './SessionApi.js' -import { getSteps, getAwareness } from '../helpers/yjs.js' +import { getSteps, getAwareness, documentStateToStep } from '../helpers/yjs.js' import { logger } from '../helpers/logger.js' /** @@ -122,6 +122,15 @@ class SyncService { this.baseVersionEtag = this.#connection.document.baseVersionEtag this.emit('opened', this.connectionState) this.emit('loaded', this.connectionState) + const documentState = this.connectionState.documentState + if (documentState) { + const initialStep = documentStateToStep(documentState) + this.emit('sync', { + version: this.version, + steps: [initialStep], + document: this.#connection.document, + }) + } return this.connectionState } From 2b1c09a047412364344a220b9784f228bb7f9c62 Mon Sep 17 00:00:00 2001 From: Max Date: Mon, 18 Nov 2024 01:34:43 +0100 Subject: [PATCH 10/16] fix(sync): reply to queries with steps since last save This was a very inefficient attempt to resync that we did not even process on the client side. Only the steps since the last save may not be enough to get back in sync. However we can expand this by including the document state or storing it as the first step after a save. Signed-off-by: Max --- lib/Service/DocumentService.php | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/Service/DocumentService.php b/lib/Service/DocumentService.php index cd264c528b4..148b6963e5c 100644 --- a/lib/Service/DocumentService.php +++ b/lib/Service/DocumentService.php @@ -228,8 +228,10 @@ public function addStep(Document $document, Session $session, array $steps, int } $newVersion = $this->insertSteps($document, $session, $stepsToInsert); } - // If there were any queries in the steps send the entire history - $getStepsSinceVersion = count($querySteps) > 0 ? 0 : $version; + // If there were any queries in the steps send all steps since last save. + $getStepsSinceVersion = count($querySteps) > 0 + ? $document->getLastSavedVersion() + : $version; $allSteps = $this->getSteps($documentId, $getStepsSinceVersion); $stepsToReturn = []; foreach ($allSteps as $step) { From 31237d65d5f2c3c0d18e31098e53b6d7d11422db Mon Sep 17 00:00:00 2001 From: Max Date: Mon, 18 Nov 2024 11:02:46 +0100 Subject: [PATCH 11/16] fix(sync): include document state in response to queries Signed-off-by: Max --- lib/Service/DocumentService.php | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/lib/Service/DocumentService.php b/lib/Service/DocumentService.php index 148b6963e5c..84fe079083b 100644 --- a/lib/Service/DocumentService.php +++ b/lib/Service/DocumentService.php @@ -208,7 +208,8 @@ public function addStep(Document $document, Session $session, array $steps, int $documentId = $session->getDocumentId(); $readOnly = $this->isReadOnly($this->getFileForSession($session, $shareToken), $shareToken); $stepsToInsert = []; - $querySteps = []; + $stepsIncludeQuery = false; + $documentState = null; $newVersion = $version; foreach ($steps as $step) { $message = YjsMessage::fromBase64($step); @@ -217,7 +218,7 @@ public function addStep(Document $document, Session $session, array $steps, int } // Filter out query steps as they would just trigger clients to send their steps again if ($message->getYjsMessageType() === YjsMessage::YJS_MESSAGE_SYNC && $message->getYjsSyncType() === YjsMessage::YJS_MESSAGE_SYNC_STEP1) { - $querySteps[] = $step; + $stepsIncludeQuery = true; } else { $stepsToInsert[] = $step; } @@ -228,10 +229,24 @@ public function addStep(Document $document, Session $session, array $steps, int } $newVersion = $this->insertSteps($document, $session, $stepsToInsert); } - // If there were any queries in the steps send all steps since last save. - $getStepsSinceVersion = count($querySteps) > 0 - ? $document->getLastSavedVersion() - : $version; + + // By default send all steps the user has not received yet. + $getStepsSinceVersion = $version; + if ($stepsIncludeQuery) { + $this->logger->debug('Loading document state for ' . $documentId); + try { + $stateFile = $this->getStateFile($documentId); + $documentState = $stateFile->getContent(); + $this->logger->debug('Existing document, state file loaded ' . $documentId); + // If there were any queries in the steps send all steps since last save. + $getStepsSinceVersion = $document->getLastSavedVersion(); + } catch (NotFoundException $e) { + $this->logger->debug('Existing document, but no state file found for ' . $documentId); + // If there is no state file include all the steps. + $getStepsSinceVersion = 0; + } + } + $allSteps = $this->getSteps($documentId, $getStepsSinceVersion); $stepsToReturn = []; foreach ($allSteps as $step) { @@ -240,9 +255,11 @@ public function addStep(Document $document, Session $session, array $steps, int $stepsToReturn[] = $step; } } + return [ 'steps' => $stepsToReturn, - 'version' => $newVersion + 'version' => $newVersion, + 'documentState' => $documentState ]; } From a6190f82cca6322236d39a3f2ca3011ee415c236 Mon Sep 17 00:00:00 2001 From: Max Date: Mon, 18 Nov 2024 12:00:11 +0100 Subject: [PATCH 12/16] fix(sync): process document state from push response Do not process document state from create response. During create the editor has not been initialized fully and the cursor position is 0 - which is invalid as it is not inside a node with inline content. (It is inside the doc before the initial paragraph.) This also allows processing document state later on in order to recover from out of sync situations. But we do not make use of that yet. Signed-off-by: Max --- src/services/SyncService.js | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/src/services/SyncService.js b/src/services/SyncService.js index 5c0c50f9f19..e31e51804e8 100644 --- a/src/services/SyncService.js +++ b/src/services/SyncService.js @@ -122,16 +122,6 @@ class SyncService { this.baseVersionEtag = this.#connection.document.baseVersionEtag this.emit('opened', this.connectionState) this.emit('loaded', this.connectionState) - const documentState = this.connectionState.documentState - if (documentState) { - const initialStep = documentStateToStep(documentState) - this.emit('sync', { - version: this.version, - steps: [initialStep], - document: this.#connection.document, - }) - } - return this.connectionState } @@ -186,7 +176,15 @@ class SyncService { } return this.#connection.push(sendable) .then((response) => { - const { steps } = response.data + const { steps, documentState } = response.data + if (documentState) { + const documentStateStep = documentStateToStep(documentState) + this.emit('sync', { + version: this.version, + steps: [documentStateStep], + document: this.#connection.document, + }) + } this.pushError = 0 this.sending = false if (steps?.length > 0) { From eceb8b7d3c1bd5c75fe3e45bd8a36a2186c95065 Mon Sep 17 00:00:00 2001 From: Max Date: Fri, 22 Nov 2024 19:55:49 +0100 Subject: [PATCH 13/16] refactor(sync): cache steps in sync service Store the steps that need to be send where we also do the debouncing. They will be updated whenever there is a new message from y-websocket. Signed-off-by: Max --- src/components/Editor.vue | 6 +- src/helpers/yjs.js | 21 ----- src/services/Outbox.ts | 64 ++++++++++++++ src/services/SyncService.js | 55 ++++++------ src/services/WebSocketPolyfill.js | 51 ++--------- src/tests/services/WebsocketPolyfill.spec.js | 89 -------------------- 6 files changed, 98 insertions(+), 188 deletions(-) create mode 100644 src/services/Outbox.ts diff --git a/src/components/Editor.vue b/src/components/Editor.vue index 7745df4c498..ab52d349402 100644 --- a/src/components/Editor.vue +++ b/src/components/Editor.vue @@ -341,7 +341,6 @@ export default { }, created() { this.$ydoc = new Doc() - this.$queue = [] // The following can be useful for debugging ydoc updates // this.$ydoc.on('update', function(update, origin, doc, tr) { // console.debug('ydoc update', update, origin, doc, tr) @@ -392,7 +391,6 @@ export default { ydoc: this.$ydoc, syncService: this.$syncService, fileId: this.fileId, - queue: this.$queue, initialSession: this.initialSession, disableBC: true, }) @@ -684,8 +682,10 @@ export default { }, async close() { - await this.$syncService.sendRemainingSteps(this.$queue) + await this.$syncService.sendRemainingSteps() + .catch(err => logger.warn('Failed to send remaining steps', { err })) await this.disconnect() + .catch(err => logger.warn('Failed to disconnect', { err })) if (this.$editor) { try { this.unlistenEditorEvents() diff --git a/src/helpers/yjs.js b/src/helpers/yjs.js index fbc58796e86..6d55d294f2a 100644 --- a/src/helpers/yjs.js +++ b/src/helpers/yjs.js @@ -88,27 +88,6 @@ export function applyStep(ydoc, step, origin = 'origin') { ) } -/** - * Get the steps for sending to the server - * - * @param {object[]} queue - queue for the outgoing steps - */ -export function getSteps(queue) { - return queue.map(s => encodeArrayBuffer(s)) - .filter(s => s < 'AQ') - .slice(-1) -} - -/** - * Encode the latest awareness message for sending - * - * @param {object[]} queue - queue for the outgoing steps - */ -export function getAwareness(queue) { - return queue.map(s => encodeArrayBuffer(s)) - .findLast(s => s > 'AQ') || '' -} - /** * Log y.js messages with their type and initiator call stack * diff --git a/src/services/Outbox.ts b/src/services/Outbox.ts new file mode 100644 index 00000000000..2ca82416b8d --- /dev/null +++ b/src/services/Outbox.ts @@ -0,0 +1,64 @@ +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +import { encodeArrayBuffer } from '../helpers/base64.js' +import { logger } from '../helpers/logger.js' + +type Sendable = { + steps: string[], awareness: string +} + +export default class Outbox { + #awarenessUpdate = '' + #syncUpdate = '' + #syncQuery = '' + + storeStep(step: Uint8Array) { + const encoded = encodeArrayBuffer(step) + if (encoded < 'AAA' || encoded > 'Ag') { + logger.warn('Unexpected step type:', { step, encoded }) + return + } + if (encoded < 'AAE') { + this.#syncQuery = encoded + return + } + if (encoded < 'AQ') { + this.#syncUpdate = encoded + return + } + this.#awarenessUpdate = encoded + } + + getDataToSend(): Sendable { + return { + steps: [this.#syncUpdate, this.#syncQuery].filter(s => s), + awareness: this.#awarenessUpdate, + } + } + + get hasUpdate(): boolean { + return !!this.#syncUpdate + } + + /* + * Clear data that has just been sent. + * + * Only clear data that has not changed in the meantime. + * @param {Sendable} - data that was to the server + */ + clearSentData({ steps, awareness }: Sendable) { + if (steps.includes(this.#syncUpdate)) { + this.#syncUpdate = '' + } + if (steps.includes(this.#syncQuery)) { + this.#syncQuery = '' + } + if (this.#awarenessUpdate === awareness) { + this.#awarenessUpdate = '' + } + } + +} diff --git a/src/services/SyncService.js b/src/services/SyncService.js index e31e51804e8..e1d3dd2d8e9 100644 --- a/src/services/SyncService.js +++ b/src/services/SyncService.js @@ -9,8 +9,9 @@ import mitt from 'mitt' import debounce from 'debounce' import PollingBackend from './PollingBackend.js' +import Outbox from './Outbox.ts' import SessionApi, { Connection } from './SessionApi.js' -import { getSteps, getAwareness, documentStateToStep } from '../helpers/yjs.js' +import { documentStateToStep } from '../helpers/yjs.js' import { logger } from '../helpers/logger.js' /** @@ -56,6 +57,7 @@ class SyncService { #sendIntervalId #connection + #outbox = new Outbox() constructor({ baseVersionEtag, serialize, getDocumentState, ...options }) { /** @type {import('mitt').Emitter} _bus */ @@ -152,30 +154,37 @@ class SyncService { }) } - sendSteps(getSendable) { + sendStep(step) { + this.#outbox.storeStep(step) + this.sendSteps() + } + + sendSteps() { // If already waiting to send, do nothing. if (this.#sendIntervalId) { return } - return new Promise((resolve, reject) => { - this.#sendIntervalId = setInterval(() => { - if (this.#connection && !this.sending) { - this.sendStepsNow(getSendable).then(resolve).catch(reject) - } - }, 200) - }) + this.#sendIntervalId = setInterval(() => { + if (this.#connection && !this.sending) { + this.sendStepsNow() + } + }, 200) } - sendStepsNow(getSendable) { + async sendStepsNow() { this.sending = true clearInterval(this.#sendIntervalId) this.#sendIntervalId = null - const sendable = getSendable() + const sendable = this.#outbox.getDataToSend() if (sendable.steps.length > 0) { this.emit('stateChange', { dirty: true }) } - return this.#connection.push(sendable) + if (!this.hasActiveConnection) { + return + } + return this.#connection.push({ ...sendable, version: this.version }) .then((response) => { + this.#outbox.clearSentData(sendable) const { steps, documentState } = response.data if (documentState) { const documentStateStep = documentStateToStep(documentState) @@ -194,6 +203,7 @@ class SyncService { const { response, code } = err this.sending = false this.pushError++ + logger.error('Failed to push the steps to the server', err) if (!response || code === 'ECONNABORTED') { this.emit('error', { type: ERROR_TYPE.CONNECTION_FAILED, data: {} }) } @@ -298,25 +308,12 @@ class SyncService { }) } - async sendRemainingSteps(queue) { - if (queue.length === 0) { + async sendRemainingSteps() { + if (!this.#outbox.hasUpdate) { return } - let outbox = [] - const steps = getSteps(queue) - const awareness = getAwareness(queue) - return this.sendStepsNow(() => { - const data = { steps, awareness, version: this.version } - outbox = [...queue] - logger.debug('sending final steps ', data) - return data - })?.then(() => { - // only keep the steps that were not send yet - queue.splice(0, - queue.length, - ...queue.filter(s => !outbox.includes(s)), - ) - }, err => logger.error(err)) + logger.debug('sending final steps') + return this.sendStepsNow().catch(err => logger.error(err)) } async close() { diff --git a/src/services/WebSocketPolyfill.js b/src/services/WebSocketPolyfill.js index 0fb7b5477e5..e9a885566dd 100644 --- a/src/services/WebSocketPolyfill.js +++ b/src/services/WebSocketPolyfill.js @@ -5,7 +5,6 @@ import { logger } from '../helpers/logger.js' import { decodeArrayBuffer } from '../helpers/base64.ts' -import { getSteps, getAwareness } from '../helpers/yjs.js' import getNotifyBus from './NotifyService.js' /** @@ -13,14 +12,11 @@ import getNotifyBus from './NotifyService.js' * @param {object} syncService - the sync service to build upon * @param {number} fileId - id of the file to open * @param {object} initialSession - initial session to open - * @param {object[]} queue - queue for the outgoing steps */ -export default function initWebSocketPolyfill(syncService, fileId, initialSession, queue) { +export default function initWebSocketPolyfill(syncService, fileId, initialSession) { return class WebSocketPolyfill { #url - #session - #version binaryType onmessage onerror @@ -35,34 +31,19 @@ export default function initWebSocketPolyfill(syncService, fileId, initialSessio this.url = url logger.debug('WebSocketPolyfill#constructor', { url, fileId, initialSession }) this.#registerHandlers({ - opened: ({ version, session }) => { - logger.debug('opened ', { version, session }) - this.#version = version - this.#session = session - }, - loaded: ({ version, session, content }) => { - logger.debug('loaded ', { version, session }) - this.#version = version - this.#session = session - }, sync: ({ steps, version }) => { - logger.debug('synced ', { version, steps }) - this.#version = version if (steps) { steps.forEach(s => { const data = decodeArrayBuffer(s.step) this.onmessage({ data }) }) + logger.debug('synced ', { version, steps }) } }, }) syncService.open({ fileId, initialSession }).then((data) => { if (syncService.hasActiveConnection) { - const { version, session } = data - this.#version = version - this.#session = session - this.onopen?.() } }) @@ -74,32 +55,10 @@ export default function initWebSocketPolyfill(syncService, fileId, initialSessio .forEach(([key, value]) => syncService.on(key, value)) } - send(...data) { + send(step) { // Useful for debugging what steps are sent and how they were initiated - // data.forEach(logStep) - - queue.push(...data) - let outbox = [] - return syncService.sendSteps(() => { - const data = { - steps: getSteps(queue), - awareness: getAwareness(queue), - version: this.#version, - } - outbox = [...queue] - logger.debug('sending steps ', data) - return data - })?.then(ret => { - // only keep the steps that were not send yet - queue.splice(0, - queue.length, - ...queue.filter(s => !outbox.includes(s)), - ) - return ret - }, err => { - logger.error(`Failed to push the queue with ${queue.length} steps to the server`, err) - this.onerror?.(err) - }) + // logStep(step) + syncService.sendStep(step) } async close() { diff --git a/src/tests/services/WebsocketPolyfill.spec.js b/src/tests/services/WebsocketPolyfill.spec.js index 6eb4ad72f3e..9a42c2a4e86 100644 --- a/src/tests/services/WebsocketPolyfill.spec.js +++ b/src/tests/services/WebsocketPolyfill.spec.js @@ -30,93 +30,4 @@ describe('Init function', () => { expect(syncService.open).toHaveBeenCalledWith({ fileId, initialSession }) }) - it('sends steps to sync service', async () => { - const syncService = { - on: jest.fn(), - open: jest.fn(() => Promise.resolve({ version: 123, session: {} })), - sendSteps: async getData => getData(), - } - const queue = [ 'initial' ] - const data = { dummy: 'data' } - const Polyfill = initWebSocketPolyfill(syncService, null, null, queue) - const websocket = new Polyfill('url') - const result = websocket.send(data) - expect(result).toBeInstanceOf(Promise) - expect(queue).toEqual([ 'initial' , data ]) - const dataSendOut = await result - expect(queue).toEqual([]) - expect(dataSendOut).toHaveProperty('awareness') - expect(dataSendOut).toHaveProperty('steps') - expect(dataSendOut).toHaveProperty('version') - }) - - it('handles early reject', async () => { - jest.spyOn(console, 'error').mockImplementation(() => {}) - const syncService = { - on: jest.fn(), - open: jest.fn(() => Promise.resolve({ version: 123, session: {} })), - sendSteps: jest.fn().mockRejectedValue('error before reading steps in sync service'), - } - const queue = [ 'initial' ] - const data = { dummy: 'data' } - const Polyfill = initWebSocketPolyfill(syncService, null, null, queue) - const websocket = new Polyfill('url') - const result = websocket.send(data) - expect(queue).toEqual([ 'initial' , data ]) - expect(result).toBeInstanceOf(Promise) - const returned = await result - expect(returned).toBeUndefined() - expect(queue).toEqual([ 'initial' , data ]) - }) - - it('handles reject after reading data', async () => { - jest.spyOn(console, 'error').mockImplementation(() => {}) - const syncService = { - on: jest.fn(), - open: jest.fn(() => Promise.resolve({ version: 123, session: {} })), - sendSteps: jest.fn().mockImplementation( async getData => { - getData() - throw 'error when sending in sync service' - }), - } - const queue = [ 'initial' ] - const data = { dummy: 'data' } - const Polyfill = initWebSocketPolyfill(syncService, null, null, queue) - const websocket = new Polyfill('url') - const result = websocket.send(data) - expect(queue).toEqual([ 'initial' , data ]) - expect(result).toBeInstanceOf(Promise) - const returned = await result - expect(returned).toBeUndefined() - expect(queue).toEqual([ 'initial' , data ]) - }) - - it('queue survives a close', async () => { - jest.spyOn(console, 'error').mockImplementation(() => {}) - const syncService = { - on: jest.fn(), - open: jest.fn(() => Promise.resolve({ version: 123, session: {} })), - sendSteps: jest.fn().mockImplementation( async getData => { - getData() - throw 'error when sending in sync service' - }), - sendStepsNow: jest.fn().mockImplementation( async getData => { - getData() - throw 'sendStepsNow error when sending' - }), - off: jest.fn(), - close: jest.fn( async data => data ), - } - const queue = [ 'initial' ] - const data = { dummy: 'data' } - const Polyfill = initWebSocketPolyfill(syncService, null, null, queue) - const websocket = new Polyfill('url') - websocket.onclose = jest.fn() - await websocket.send(data) - const promise = websocket.close() - expect(queue).toEqual([ 'initial' , data ]) - await promise - expect(queue).toEqual([ 'initial' , data ]) - }) - }) From a5a915604b8e84c9145cbb433b944e1f16dcc516 Mon Sep 17 00:00:00 2001 From: Max Date: Tue, 5 Nov 2024 12:47:57 +0100 Subject: [PATCH 14/16] refactor(Editor): instantiate api outside sync service Signed-off-by: Max --- src/components/Editor.vue | 10 ++++++++-- src/services/SyncService.js | 7 ++++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/components/Editor.vue b/src/components/Editor.vue index ab52d349402..df0eae7fe2a 100644 --- a/src/components/Editor.vue +++ b/src/components/Editor.vue @@ -94,6 +94,7 @@ import ReadonlyBar from './Menu/ReadonlyBar.vue' import { logger } from '../helpers/logger.js' import { getDocumentState, applyDocumentState } from '../helpers/yjs.js' import { SyncService, ERROR_TYPE, IDLE_TIMEOUT } from './../services/SyncService.js' +import SessionApi from '../services/SessionApi.js' import createSyncServiceProvider from './../services/SyncServiceProvider.js' import AttachmentResolver from './../services/AttachmentResolver.js' import { extensionHighlight } from '../helpers/mappings.js' @@ -348,6 +349,7 @@ export default { // }); this.$providers = [] this.$editor = null + this.$api = null this.$syncService = null this.$attachmentResolver = null }, @@ -373,16 +375,20 @@ export default { } const guestName = localStorage.getItem('nick') ? localStorage.getItem('nick') : '' - this.$syncService = new SyncService({ + this.$api = new SessionApi({ guestName, shareToken: this.shareToken, filePath: this.relativePath, - baseVersionEtag: this.$baseVersionEtag, forceRecreate: this.forceRecreate, + }) + + this.$syncService = new SyncService({ + baseVersionEtag: this.$baseVersionEtag, serialize: this.isRichEditor ? (content) => createMarkdownSerializer(this.$editor.schema).serialize(content ?? this.$editor.state.doc) : (content) => serializePlainText(content ?? this.$editor.state.doc), getDocumentState: () => getDocumentState(this.$ydoc), + api: this.$api, }) this.listenSyncServiceEvents() diff --git a/src/services/SyncService.js b/src/services/SyncService.js index e1d3dd2d8e9..bc55e0519cb 100644 --- a/src/services/SyncService.js +++ b/src/services/SyncService.js @@ -10,7 +10,7 @@ import debounce from 'debounce' import PollingBackend from './PollingBackend.js' import Outbox from './Outbox.ts' -import SessionApi, { Connection } from './SessionApi.js' +import { Connection } from './SessionApi.js' import { documentStateToStep } from '../helpers/yjs.js' import { logger } from '../helpers/logger.js' @@ -59,13 +59,13 @@ class SyncService { #connection #outbox = new Outbox() - constructor({ baseVersionEtag, serialize, getDocumentState, ...options }) { + constructor({ baseVersionEtag, serialize, getDocumentState, api }) { /** @type {import('mitt').Emitter} _bus */ this._bus = mitt() this.serialize = serialize this.getDocumentState = getDocumentState - this._api = new SessionApi(options) + this._api = api this.#connection = null this.stepClientIDs = [] @@ -124,6 +124,7 @@ class SyncService { this.baseVersionEtag = this.#connection.document.baseVersionEtag this.emit('opened', this.connectionState) this.emit('loaded', this.connectionState) + return this.connectionState } From 57d78d74bea097a471b1790784a7090f43b488ad Mon Sep 17 00:00:00 2001 From: Jonas Date: Tue, 26 Nov 2024 13:34:30 +0100 Subject: [PATCH 15/16] test(cypress): Pass SessionApi into the sync service Signed-off-by: Jonas --- cypress/e2e/api/SyncServiceProvider.spec.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cypress/e2e/api/SyncServiceProvider.spec.js b/cypress/e2e/api/SyncServiceProvider.spec.js index 6dcc5ba2b46..614e0983cc4 100644 --- a/cypress/e2e/api/SyncServiceProvider.spec.js +++ b/cypress/e2e/api/SyncServiceProvider.spec.js @@ -4,6 +4,7 @@ */ import { randUser } from '../../utils/index.js' +import SessionApi from '../../../src/services/SessionApi.js' import { SyncService } from '../../../src/services/SyncService.js' import createSyncServiceProvider from '../../../src/services/SyncServiceProvider.js' import { Doc } from 'yjs' @@ -39,9 +40,11 @@ describe('Sync service provider', function() { */ function createProvider(ydoc) { const queue = [] + const api = new SessionApi() const syncService = new SyncService({ serialize: () => 'Serialized', getDocumentState: () => null, + api, }) syncService.on('opened', () => syncService.startSync()) return createSyncServiceProvider({ From 563357b11cd8d658c93be0acee093c8a73d22cb5 Mon Sep 17 00:00:00 2001 From: Max Date: Tue, 26 Nov 2024 14:46:56 +0100 Subject: [PATCH 16/16] fix(SyncService): only log errors for now Signed-off-by: Max --- src/services/SyncService.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/services/SyncService.js b/src/services/SyncService.js index bc55e0519cb..61860b18fd9 100644 --- a/src/services/SyncService.js +++ b/src/services/SyncService.js @@ -167,7 +167,7 @@ class SyncService { } this.#sendIntervalId = setInterval(() => { if (this.#connection && !this.sending) { - this.sendStepsNow() + this.sendStepsNow().catch(err => logger.error(err)) } }, 200) }