Skip to content

Commit

Permalink
Merge pull request #82 from cloudflare/use-storage-for-sessions
Browse files Browse the repository at this point in the history
Use `this.ctx.storage` to store session data
  • Loading branch information
third774 authored Aug 15, 2024
2 parents daa921d + b202213 commit 28041a0
Showing 1 changed file with 104 additions and 62 deletions.
166 changes: 104 additions & 62 deletions app/durableObjects/ChatRoom.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ export class ChatRoom extends Server<Env> {
hibernate: true,
}

// a small typesafe wrapper around connection.send
sendMessage<M extends ServerMessage>(connection: Connection, message: M) {
connection.send(JSON.stringify(message))
}

async onStart(): Promise<void> {
// TODO: make this a part of partyserver
this.ctx.setWebSocketAutoResponse(
Expand All @@ -34,21 +39,22 @@ export class ChatRoom extends Server<Env> {
)
)

if (!(await this.ctx.storage.getAlarm())) {
// start the alarm to broadcast state every 30 seconds
this.ctx.storage.setAlarm(Date.now() + 30000)
}

// cleaning out storage used by older versions of this code
// this.ctx.storage.delete('sessions').catch(() => {
// console.warn('Failed to delete old sessions')
// })
await this.ctx.storage.delete('sessions').catch(() => {
console.warn('Failed to delete old sessions storage')
})
// We can remove this line later
}
async onConnect(
connection: Connection<User>,
ctx: ConnectionContext
): Promise<void> {
// let's start the periodic alarm if it's not already started
if (!(await this.ctx.storage.getAlarm())) {
// start the alarm to broadcast state every 30 seconds
this.ctx.storage.setAlarm(Date.now() + 30000)
}

const username = await getUsername(ctx.request)
assertNonNullable(username)

Expand All @@ -65,37 +71,42 @@ export class ChatRoom extends Server<Env> {
},
}

connection.setState(user)
this.broadcastState()
}
// store the user's data in storage
await this.ctx.storage.put(`session-${connection.id}`, user)

sendMessage<M extends ServerMessage>(connection: Connection, message: M) {
connection.send(JSON.stringify(message))
await this.broadcastRoomState()
}

broadcastState() {
async broadcastRoomState() {
let didSomeoneQuit = false
const roomStateMessage = {
const roomState = {
type: 'roomState',
state: {
users: [...this.getConnections<User>()]
.map((connection) => connection.state)
.filter((x) => !!x),
users: [
...(
await this.ctx.storage.list<User>({
prefix: 'session-',
})
).values(),
],
},
} satisfies ServerMessage

for (const connection of this.getConnections<User>()) {
const roomStateMessage = JSON.stringify(roomState)

for (const connection of this.getConnections()) {
try {
connection.send(JSON.stringify(roomStateMessage))
connection.send(roomStateMessage)
} catch (err) {
connection.close(1011, 'Failed to broadcast state')
await this.ctx.storage.delete(`session-${connection.id}`)
didSomeoneQuit = true
}
}

if (didSomeoneQuit) {
// broadcast again to remove the user who quit
this.broadcastState()
await this.broadcastRoomState()
}
}

Expand All @@ -112,70 +123,89 @@ export class ChatRoom extends Server<Env> {
let data: ClientMessage = JSON.parse(message)

switch (data.type) {
case 'userLeft':
case 'userLeft': {
connection.close(1000, 'User left')
this.broadcastState()
await this.ctx.storage
.delete(`session-${connection.id}`)
.catch(() => {
console.warn(
`Failed to delete session session-${connection.id} on userLeft`
)
})
await this.broadcastRoomState()
break
case 'userUpdate':
connection.setState(data.user)
this.broadcastState()
}
case 'userUpdate': {
this.ctx.storage.put(`session-${connection.id}`, data.user)
await this.broadcastRoomState()
break
case 'directMessage':
}
case 'directMessage': {
const { to, message } = data
const fromUser = await this.ctx.storage.get<User>(
`session-${connection.id}`
)

for (const otherConnection of this.getConnections<User>()) {
if (otherConnection.id === to) {
this.sendMessage(otherConnection, {
type: 'directMessage',
from: connection.state!.name,
from: fromUser!.name,
message,
})
break
}
}
console.warn(
`User with id "${to}" not found, cannot send DM from "${connection.state!.name}"`
`User with id "${to}" not found, cannot send DM from "${fromUser!.name}"`
)
break

case 'muteUser':
{
let mutedUser = false
for (const otherConnection of this.getConnections<User>()) {
if (otherConnection.id === data.id) {
otherConnection.setState({
...otherConnection.state!,
tracks: {
...otherConnection.state!.tracks,
audioEnabled: false,
},
})
this.sendMessage(otherConnection, {
type: 'muteMic',
})

this.broadcastState()
mutedUser = true
break
}
}
if (!mutedUser) {
console.warn(
`User with id "${data.id}" not found, cannot mute user from "${connection.state!.name}"`
}
case 'muteUser': {
const user = await this.ctx.storage.get<User>(
`session-${connection.id}`
)
let mutedUser = false
for (const otherConnection of this.getConnections<User>()) {
if (otherConnection.id === data.id) {
const otherUser = await this.ctx.storage.get<User>(
`session-${data.id}`
)
await this.ctx.storage.put(`session-${data.id}`, {
...otherUser!,
tracks: {
...otherUser!.tracks,
audioEnabled: false,
},
})
this.sendMessage(otherConnection, {
type: 'muteMic',
})

await this.broadcastRoomState()
mutedUser = true
break
}
}

if (!mutedUser) {
console.warn(
`User with id "${data.id}" not found, cannot mute user from "${user!.name}"`
)
}
break
case 'partyserver-ping':
}

case 'partyserver-ping': {
// do nothing, this should never be received
console.warn(
"Received partyserver-ping from client. You shouldn't be seeing this message. Did you forget to enable hibernation?"
)
break
default:
}
default: {
assertNever(data)
break
}
}
} catch (err) {
assertError(err)
Expand Down Expand Up @@ -205,10 +235,22 @@ export class ChatRoom extends Server<Env> {
// this.broadcastState()
}

alarm(): void | Promise<void> {
// technically we don't need to broadcast state on an alarm,
// but let's keep it for a while and see if it's useful
this.broadcastState()
this.ctx.storage.setAlarm(Date.now() + 30000)
async cleanupOldConnections() {
const connections = [...this.getConnections()]
const sessionsToCleanUp = await this.ctx.storage.list<User>()
connections.forEach((connection) =>
sessionsToCleanUp.delete(`session-${connection.id}`)
)
return this.ctx.storage
.delete([...sessionsToCleanUp.keys()])
.catch(() => console.warn('Failed to clean up orphaned sessions'))
}

async alarm(): Promise<void> {
await this.cleanupOldConnections()
await this.broadcastRoomState()
if ([...this.getConnections()].length !== 0) {
this.ctx.storage.setAlarm(Date.now() + 30000)
}
}
}

0 comments on commit 28041a0

Please sign in to comment.