Skip to content

Commit

Permalink
use docId as lock identifier for persistor
Browse files Browse the repository at this point in the history
  • Loading branch information
vieiralucas committed Jan 15, 2025
1 parent 04ff4c1 commit c8bff10
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 28 deletions.
2 changes: 1 addition & 1 deletion apps/api/src/schedule/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ async function executeNotebook(
await updateAppState(ydoc, app, socketServer)
return false
},
new AppPersistor(app.id, null) // user is null when running a schedule
new AppPersistor(id, app.id, null) // user is null when running a schedule
)

if (emptyLayout) {
Expand Down
16 changes: 10 additions & 6 deletions apps/api/src/v1/workspaces/workspace/components.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ export default function componentsRouter(socketServer: IOServer) {
}

const component = await createReusableComponent(payload.data)
const docId = getDocId(payload.data.documentId, null)
await getYDocForUpdate(
getDocId(payload.data.documentId, null),
docId,
socketServer,
payload.data.documentId,
workspaceId,
Expand All @@ -76,7 +77,7 @@ export default function componentsRouter(socketServer: IOServer) {
onPivotTable: () => {},
})
},
new DocumentPersistor(payload.data.documentId)
new DocumentPersistor(docId, payload.data.documentId)
)
await broadcastComponent(socketServer, component)

Expand Down Expand Up @@ -260,8 +261,9 @@ async function updateReusableComponentInstanceOld(
const componentBlock = decodeComponentState(component.state)
for (const doc of docs) {
queue.add(async () => {
const docId = getDocId(doc.id, null)
await getYDocForUpdate(
getDocId(doc.id, null),
docId,
socketServer,
doc.id,
workspaceId,
Expand Down Expand Up @@ -319,7 +321,7 @@ async function updateReusableComponentInstanceOld(
}
}
},
new DocumentPersistor(doc.id)
new DocumentPersistor(docId, doc.id)
)
})
}
Expand Down Expand Up @@ -347,8 +349,10 @@ async function updateReusableComponentInstance(

Array.from(byDocument.entries()).forEach(([documentId, instances]) => {
queue.add(async () => {
const docId = getDocId(documentId, null)

await getYDocForUpdate(
getDocId(documentId, null),
docId,
socketServer,
documentId,
workspaceId,
Expand Down Expand Up @@ -387,7 +391,7 @@ async function updateReusableComponentInstance(
}
}
},
new DocumentPersistor(documentId)
new DocumentPersistor(docId, documentId)
)
})
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export default function publishRouter(socketServer: IOServer) {
})
setPristine(yDoc.ydoc)
},
new DocumentPersistor(documentId)
new DocumentPersistor(id, documentId)
)

await broadcastDocument(socketServer, workspaceId, documentId)
Expand Down
2 changes: 1 addition & 1 deletion apps/api/src/websocket/complete-python.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ const completPython =
suggestions,
})
},
new DocumentPersistor(documentId)
new DocumentPersistor(id, documentId)
)
} catch (err) {
logger().error(
Expand Down
18 changes: 11 additions & 7 deletions apps/api/src/yjs/v2/documents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,11 @@ export async function duplicateDocument(
})
}
},
new DocumentPersistor(newDoc.id),
new DocumentPersistor(newId, newDoc.id),
tx
)
},
new DocumentPersistor(prevDoc.id),
new DocumentPersistor(prevId, prevDoc.id),
tx
)
}
Expand All @@ -218,15 +218,19 @@ export async function updateAppState(
select: { userId: true },
})
await Promise.all(
usersApps.map(async (userApp) =>
getYDocForUpdate(
getDocId(app.documentId, { id: app.id, userId: userApp.userId }),
usersApps.map(async (userApp) => {
const docId = getDocId(app.documentId, {
id: app.id,
userId: userApp.userId,
})
return getYDocForUpdate(
docId,
socketServer,
app.documentId,
ydoc.workspaceId,
(ydoc) => ydoc.replaceState(state),
new AppPersistor(app.id, userApp.userId)
new AppPersistor(docId, app.id, userApp.userId)
)
)
})
)
}
16 changes: 9 additions & 7 deletions apps/api/src/yjs/v2/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,23 +182,25 @@ export function setupYJSSocketServerV2(
return
}

const docId = getDocId(document.id, {
id: yjsAppDocument.id,
userId: userId === null ? null : authUser.id,
})
ydoc = await getYDoc(
socketServer,
getDocId(document.id, {
id: yjsAppDocument.id,
userId: userId === null ? null : authUser.id,
}),
docId,
document.id,
document.workspaceId,
new AppPersistor(yjsAppDocument.id, userId)
new AppPersistor(docId, yjsAppDocument.id, userId)
)
} else {
const docId = getDocId(document.id, null)
ydoc = await getYDoc(
socketServer,
getDocId(document.id, null),
docId,
document.id,
document.workspaceId,
new DocumentPersistor(document.id)
new DocumentPersistor(docId, document.id)
)
}

Expand Down
14 changes: 9 additions & 5 deletions apps/api/src/yjs/v2/persistors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ export interface Persistor {
}

export class DocumentPersistor implements Persistor {
constructor(private readonly documentId: string) {}
constructor(
private readonly docId: string,
private readonly documentId: string
) {}

private applyUpdate(ydoc: Y.Doc, update: Buffer | Uint8Array) {
const start = Date.now()
Expand All @@ -62,7 +65,7 @@ export class DocumentPersistor implements Persistor {
}

public async load(tx?: PrismaTransaction) {
return acquireLock(`document-persistor:${this.documentId}`, async () => {
return acquireLock(`document-persistor:${this.docId}`, async () => {
try {
const ydoc = new Y.Doc()
const dbDoc = await (tx ?? prisma()).yjsDocument.findUnique({
Expand Down Expand Up @@ -115,7 +118,7 @@ export class DocumentPersistor implements Persistor {
ydoc: WSSharedDocV2,
tx?: PrismaTransaction
): Promise<void> {
return acquireLock(`document-persistor:${this.documentId}`, async () => {
return acquireLock(`document-persistor:${this.docId}`, async () => {
const yjsDoc = await this.getYjsDoc(ydoc, tx)

await (tx ?? prisma()).yjsDocument.update({
Expand Down Expand Up @@ -248,6 +251,7 @@ export class DocumentPersistor implements Persistor {

export class AppPersistor implements Persistor {
constructor(
private readonly docId: string,
private readonly yjsAppDocumentId: string,
// no user means we are manipulating the published state
private readonly userId: string | null
Expand All @@ -260,7 +264,7 @@ export class AppPersistor implements Persistor {
}

public async load(tx?: PrismaTransaction) {
return acquireLock(`app-persistor:${this.yjsAppDocumentId}`, async () => {
return acquireLock(`app-persistor:${this.docId}`, async () => {
try {
const yjsAppDoc = await (
prisma() ?? tx
Expand Down Expand Up @@ -341,7 +345,7 @@ export class AppPersistor implements Persistor {
ydoc: WSSharedDocV2,
tx?: PrismaTransaction
): Promise<void> {
await acquireLock(`app-persistor:${this.yjsAppDocumentId}`, async () => {
await acquireLock(`app-persistor:${this.docId}`, async () => {
if (this.userId) {
const userId = this.userId
const state = Buffer.from(Y.encodeStateAsUpdate(ydoc.ydoc))
Expand Down

0 comments on commit c8bff10

Please sign in to comment.