diff --git a/apps/server/src/modules/tldraw/config.ts b/apps/server/src/modules/tldraw/config.ts index 4292185c91d..b0a017dd59c 100644 --- a/apps/server/src/modules/tldraw/config.ts +++ b/apps/server/src/modules/tldraw/config.ts @@ -16,6 +16,7 @@ export interface TldrawConfig { ASSETS_ALLOWED_MIME_TYPES_LIST: string; API_HOST: number; TLDRAW_MAX_DOCUMENT_SIZE: number; + TLDRAW_FINALIZE_DELAY: number; } export const TLDRAW_DB_URL: string = Configuration.get('TLDRAW_DB_URL') as string; @@ -37,6 +38,7 @@ const tldrawConfig = { ASSETS_ALLOWED_MIME_TYPES_LIST: Configuration.get('TLDRAW__ASSETS_ALLOWED_MIME_TYPES_LIST') as string, API_HOST: Configuration.get('API_HOST') as string, TLDRAW_MAX_DOCUMENT_SIZE: Configuration.get('TLDRAW__MAX_DOCUMENT_SIZE') as number, + TLDRAW_FINALIZE_DELAY: Configuration.get('TLDRAW__FINALIZE_DELAY') as number, }; export const config = () => tldrawConfig; diff --git a/apps/server/src/modules/tldraw/controller/api-test/tldraw.ws.api.spec.ts b/apps/server/src/modules/tldraw/controller/api-test/tldraw.ws.api.spec.ts index fd02a8097da..2cf4314aa69 100644 --- a/apps/server/src/modules/tldraw/controller/api-test/tldraw.ws.api.spec.ts +++ b/apps/server/src/modules/tldraw/controller/api-test/tldraw.ws.api.spec.ts @@ -2,7 +2,7 @@ import { WsAdapter } from '@nestjs/platform-ws'; import { Test } from '@nestjs/testing'; import WebSocket from 'ws'; import { TextEncoder } from 'util'; -import { INestApplication } from '@nestjs/common'; +import { INestApplication, NotAcceptableException } from '@nestjs/common'; import { MongoMemoryDatabaseModule } from '@infra/database'; import { createConfigModuleOptions } from '@src/config'; import { Logger } from '@src/core/logger'; @@ -360,5 +360,29 @@ describe('WebSocketController (WsAdapter)', () => { setupConnectionSpy.mockRestore(); ws.close(); }); + + it('should close after setup connection throws NotAcceptableException', async () => { + const { setupConnectionSpy, wsCloseSpy } = setup(); + const { buffer } = getMessage(); + + const httpGetCallSpy = jest.spyOn(httpService, 'get'); + const axiosResponse: AxiosResponse = axiosResponseFactory.build({ + data: '', + }); + httpGetCallSpy.mockReturnValueOnce(of(axiosResponse)); + setupConnectionSpy.mockImplementationOnce(() => { + throw new NotAcceptableException(); + }); + + ws = await TestConnection.setupWs(wsUrl, 'TEST', { cookie: 'jwt=jwt-mocked' }); + ws.send(buffer); + + expect(setupConnectionSpy).toHaveBeenCalledWith(expect.anything(), 'TEST'); + expect(wsCloseSpy).toHaveBeenCalledWith(WsCloseCode.NOT_ACCEPTABLE, Buffer.from(WsCloseMessage.NOT_ACCEPTABLE)); + + wsCloseSpy.mockRestore(); + setupConnectionSpy.mockRestore(); + ws.close(); + }); }); }); diff --git a/apps/server/src/modules/tldraw/controller/tldraw.ws.ts b/apps/server/src/modules/tldraw/controller/tldraw.ws.ts index 61161001fe4..feefed9127f 100644 --- a/apps/server/src/modules/tldraw/controller/tldraw.ws.ts +++ b/apps/server/src/modules/tldraw/controller/tldraw.ws.ts @@ -3,7 +3,12 @@ import WebSocket, { Server } from 'ws'; import { Request } from 'express'; import { ConfigService } from '@nestjs/config'; import cookie from 'cookie'; -import { InternalServerErrorException, UnauthorizedException, NotFoundException } from '@nestjs/common'; +import { + InternalServerErrorException, + UnauthorizedException, + NotFoundException, + NotAcceptableException, +} from '@nestjs/common'; import { Logger } from '@src/core/logger'; import { isAxiosError } from 'axios'; import { firstValueFrom } from 'rxjs'; @@ -115,6 +120,11 @@ export class TldrawWs implements OnGatewayInit, OnGatewayConnection { return; } + if (err instanceof NotAcceptableException) { + this.closeClientAndLog(client, WsCloseCode.NOT_ACCEPTABLE, WsCloseMessage.NOT_ACCEPTABLE, docName); + return; + } + this.closeClientAndLog( client, WsCloseCode.INTERNAL_SERVER_ERROR, diff --git a/apps/server/src/modules/tldraw/domain/ws-shared-doc.do.ts b/apps/server/src/modules/tldraw/domain/ws-shared-doc.do.ts index 090b3eef89d..2ceec1962c2 100644 --- a/apps/server/src/modules/tldraw/domain/ws-shared-doc.do.ts +++ b/apps/server/src/modules/tldraw/domain/ws-shared-doc.do.ts @@ -11,6 +11,8 @@ export class WsSharedDocDo extends Doc { public awarenessChannel: string; + public isFinalizing = false; + constructor(name: string, gcEnabled = true) { super({ gc: gcEnabled }); this.name = name; diff --git a/apps/server/src/modules/tldraw/service/tldraw.ws.service.spec.ts b/apps/server/src/modules/tldraw/service/tldraw.ws.service.spec.ts index 724b0c1022d..68060c00663 100644 --- a/apps/server/src/modules/tldraw/service/tldraw.ws.service.spec.ts +++ b/apps/server/src/modules/tldraw/service/tldraw.ws.service.spec.ts @@ -529,7 +529,6 @@ describe('TldrawWSService', () => { boardRepo.getDocumentFromDb.mockResolvedValueOnce(new WsSharedDocDo('TEST')); ws = await TestConnection.setupWs(wsUrl); - boardRepo.compressDocument.mockResolvedValueOnce(); const redisUnsubscribeSpy = jest.spyOn(Ioredis.Redis.prototype, 'unsubscribe').mockResolvedValueOnce(1); const closeConnSpy = jest.spyOn(service, 'closeConnection'); jest.spyOn(Ioredis.Redis.prototype, 'subscribe').mockResolvedValueOnce({}); @@ -559,6 +558,7 @@ describe('TldrawWSService', () => { const ws2 = await TestConnection.setupWs(wsUrl); doc.connections.set(ws, new Set()); doc.connections.set(ws2, new Set()); + boardRepo.compressDocument.mockRestore(); return { doc, @@ -989,7 +989,7 @@ describe('TldrawWSService', () => { }); }); - describe('getYDoc', () => { + describe('getDocument', () => { describe('when getting yDoc by name', () => { it('should assign to service docs map and return instance', async () => { boardRepo.getDocumentFromDb.mockResolvedValueOnce(new WsSharedDocDo('get-test')); @@ -1060,6 +1060,22 @@ describe('TldrawWSService', () => { redisOnSpy.mockRestore(); }); }); + + describe('when found document is still finalizing', () => { + const setup = () => { + const doc = new WsSharedDocDo('test-finalizing'); + doc.isFinalizing = true; + service.docs.set('test-finalizing', doc); + boardRepo.getDocumentFromDb.mockResolvedValueOnce(doc); + }; + + it('should throw', async () => { + setup(); + + await expect(service.getDocument('test-finalizing')).rejects.toThrow(); + service.docs.delete('test-finalizing'); + }); + }); }); describe('redisMessageHandler', () => { diff --git a/apps/server/src/modules/tldraw/service/tldraw.ws.service.ts b/apps/server/src/modules/tldraw/service/tldraw.ws.service.ts index 69cca7c267f..2aba2aa813f 100644 --- a/apps/server/src/modules/tldraw/service/tldraw.ws.service.ts +++ b/apps/server/src/modules/tldraw/service/tldraw.ws.service.ts @@ -1,10 +1,10 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, NotAcceptableException } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import WebSocket from 'ws'; import { applyAwarenessUpdate, encodeAwarenessUpdate, removeAwarenessStates } from 'y-protocols/awareness'; import { decoding, encoding } from 'lib0'; -import { readSyncMessage, writeSyncStep1, writeUpdate } from 'y-protocols/sync'; -import { applyUpdate, encodeStateAsUpdate } from 'yjs'; +import { readSyncMessage, writeSyncStep1, writeSyncStep2, writeUpdate } from 'y-protocols/sync'; +import { applyUpdate } from 'yjs'; import { Buffer } from 'node:buffer'; import { Redis } from 'ioredis'; import { Logger } from '@src/core/logger'; @@ -63,11 +63,11 @@ export class TldrawWsService { doc.connections.delete(ws); removeAwarenessStates(doc.awareness, this.forceToArray(controlledIds), null); - await this.finalizeIfNoConnections(doc); this.metricsService.decrementNumberOfUsersOnServerCounter(); } ws.close(); + await this.finalizeIfNoConnections(doc); } public send(doc: WsSharedDocDo, ws: WebSocket, message: Uint8Array): void { @@ -113,11 +113,19 @@ export class TldrawWsService { public async getDocument(docName: string) { const existingDoc = this.docs.get(docName); + + if (this.isFinalizingOrNotYetLoaded(existingDoc)) { + // drop the connection, the client will have to reconnect + // and check again if the finalizing or loading has finished + throw new NotAcceptableException(); + } + if (existingDoc) { return existingDoc; } const doc = await this.tldrawBoardRepo.getDocumentFromDb(docName); + doc.isLoaded = false; this.registerAwarenessUpdateHandler(doc); this.registerUpdateHandler(doc); @@ -126,6 +134,7 @@ export class TldrawWsService { this.docs.set(docName, doc); this.metricsService.incrementNumberOfBoardsOnServerCounter(); + doc.isLoaded = true; return doc; } @@ -206,9 +215,6 @@ export class TldrawWsService { } }); - // send initial doc state to client as update - this.sendInitialState(ws, doc); - // check if connection is still alive const pingTimeout = this.configService.get('TLDRAW_PING_TIMEOUT'); let pongReceived = true; @@ -237,6 +243,9 @@ export class TldrawWsService { }); { + // send initial doc state to client as update + this.sendInitialState(ws, doc); + const syncEncoder = encoding.createEncoder(); encoding.writeVarUint(syncEncoder, WSMessageType.SYNC); writeSyncStep1(syncEncoder, doc); @@ -257,16 +266,26 @@ export class TldrawWsService { } private async finalizeIfNoConnections(doc: WsSharedDocDo) { + // wait before doing the check + // the only user on the pod might have lost connection for a moment + // or simply refreshed the page + await this.delay(this.configService.get('TLDRAW_FINALIZE_DELAY')); + if (doc.connections.size > 0) { return; } + if (doc.isFinalizing) { + return; + } + doc.isFinalizing = true; + try { - const usedAssets = this.syncDocumentAssetsWithShapes(doc); - await this.tldrawBoardRepo.compressDocument(doc.name); this.unsubscribeFromRedisChannels(doc); + await this.tldrawBoardRepo.compressDocument(doc.name); if (this.configService.get('TLDRAW_ASSETS_SYNC_ENABLED')) { + const usedAssets = this.syncDocumentAssetsWithShapes(doc); void this.filesStorageTldrawAdapterService.deleteUnusedFilesForDocument(doc.name, usedAssets).catch((err) => { this.logger.warning(new FileStorageErrorLoggable(doc.name, err)); }); @@ -391,10 +410,22 @@ export class TldrawWsService { private sendInitialState(ws: WebSocket, doc: WsSharedDocDo): void { const encoder = encoding.createEncoder(); encoding.writeVarUint(encoder, WSMessageType.SYNC); - writeUpdate(encoder, encodeStateAsUpdate(doc)); + writeSyncStep2(encoder, doc); this.send(doc, ws, encoding.toUint8Array(encoder)); } + private isFinalizingOrNotYetLoaded(doc: WsSharedDocDo | undefined): boolean { + const isFinalizing = doc !== undefined && doc.isFinalizing; + const isNotLoaded = doc !== undefined && !doc.isLoaded; + return isFinalizing || isNotLoaded; + } + + private delay(ms: number) { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); + } + private isClosedOrClosing(ws: WebSocket): boolean { return ws.readyState === WebSocket.CLOSING || ws.readyState === WebSocket.CLOSED; } diff --git a/apps/server/src/modules/tldraw/testing/testConfig.ts b/apps/server/src/modules/tldraw/testing/testConfig.ts index 82768116188..61f244d1a27 100644 --- a/apps/server/src/modules/tldraw/testing/testConfig.ts +++ b/apps/server/src/modules/tldraw/testing/testConfig.ts @@ -7,6 +7,7 @@ export const tldrawTestConfig = () => { } conf.TLDRAW_DB_COMPRESS_THRESHOLD = 2; conf.TLDRAW_PING_TIMEOUT = 0; + conf.TLDRAW_FINALIZE_DELAY = 0; conf.TLDRAW_MAX_DOCUMENT_SIZE = 1; return conf; }; diff --git a/apps/server/src/modules/tldraw/types/ws-close-enum.ts b/apps/server/src/modules/tldraw/types/ws-close-enum.ts index 4c0d7f2f10d..0e3333c46ab 100644 --- a/apps/server/src/modules/tldraw/types/ws-close-enum.ts +++ b/apps/server/src/modules/tldraw/types/ws-close-enum.ts @@ -2,6 +2,7 @@ export enum WsCloseCode { BAD_REQUEST = 4400, UNAUTHORIZED = 4401, NOT_FOUND = 4404, + NOT_ACCEPTABLE = 4406, INTERNAL_SERVER_ERROR = 4500, } export enum WsCloseMessage { @@ -9,5 +10,6 @@ export enum WsCloseMessage { BAD_REQUEST = 'Room name param not found in url.', UNAUTHORIZED = "You don't have permission to this drawing.", NOT_FOUND = 'Drawing not found.', + NOT_ACCEPTABLE = 'Could not get document, still finalizing or not yet loaded.', INTERNAL_SERVER_ERROR = 'Unable to establish websocket connection.', } diff --git a/config/default.schema.json b/config/default.schema.json index 20e77316332..98745a7f815 100644 --- a/config/default.schema.json +++ b/config/default.schema.json @@ -1488,11 +1488,13 @@ "description": "Configuration of tldraw related settings", "required": [ "PING_TIMEOUT", + "FINALIZE_DELAY", "SOCKET_PORT", "GC_ENABLED", "DB_COMPRESS_THRESHOLD", "MAX_DOCUMENT_SIZE", "ASSETS_ENABLED", + "ASSETS_SYNC_ENABLED", "ASSETS_MAX_SIZE", "ASSETS_ALLOWED_MIME_TYPES_LIST" ], @@ -1503,7 +1505,11 @@ }, "PING_TIMEOUT": { "type": "number", - "description": "Max time for waiting between calls for tldraw" + "description": "Websocket ping timeout in ms" + }, + "FINALIZE_DELAY": { + "type": "number", + "description": "Delay in milliseconds before checking if can finalize a tldraw board" }, "GC_ENABLED": { "type": "boolean", @@ -1538,6 +1544,7 @@ "default": { "SOCKET_PORT": 3345, "PING_TIMEOUT": 30000, + "FINALIZE_DELAY": 5000, "GC_ENABLED": true, "DB_COMPRESS_THRESHOLD": 400, "MAX_DOCUMENT_SIZE": 15000000, diff --git a/config/test.json b/config/test.json index 9ee70da1159..4c38f6618bd 100644 --- a/config/test.json +++ b/config/test.json @@ -69,6 +69,7 @@ "TLDRAW": { "SOCKET_PORT": 3346, "PING_TIMEOUT": 1, + "FINALIZE_DELAY": 1, "GC_ENABLED": true, "DB_COMPRESS_THRESHOLD": 400, "MAX_DOCUMENT_SIZE": 15000000,