Skip to content

Commit

Permalink
Merge pull request #135 from cloudflare/restructure-connection-checks
Browse files Browse the repository at this point in the history
Restructure connection checks to look for heartbeat timestamps
  • Loading branch information
third774 authored Oct 25, 2024
2 parents d40dba2 + 630ea66 commit 79fcf2f
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 30 deletions.
57 changes: 37 additions & 20 deletions app/durableObjects/ChatRoom.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
import { getDb, Meetings } from 'schema'
import { log } from '~/utils/logging'

const alarmInterval = 30_000
const alarmInterval = 15_000

/**
* The ChatRoom Durable Object Class
Expand Down Expand Up @@ -88,6 +88,7 @@ export class ChatRoom extends Server<Env> {

// store the user's data in storage
await this.ctx.storage.put(`session-${connection.id}`, user)
await this.ctx.storage.put(`heartbeat-${connection.id}`, Date.now())
await this.trackPeakUserCount()
await this.broadcastRoomState()
const meetingId = await this.getMeetingId()
Expand Down Expand Up @@ -232,6 +233,13 @@ export class ChatRoom extends Server<Env> {
`Failed to delete session session-${connection.id} on userLeft`
)
})
await this.ctx.storage
.delete(`heartbeat-${connection.id}`)
.catch(() => {
console.warn(
`Failed to delete session session-heartbeat-${connection.id} on userLeft`
)
})
log({ eventName: 'userLeft', meetingId, connectionId: connection.id })

await this.broadcastRoomState()
Expand Down Expand Up @@ -304,6 +312,10 @@ export class ChatRoom extends Server<Env> {
)
break
}
case 'heartbeat': {
await this.ctx.storage.put(`heartbeat-${connection.id}`, Date.now())
break
}
default: {
assertNever(data)
break
Expand Down Expand Up @@ -367,33 +379,38 @@ export class ChatRoom extends Server<Env> {
async cleanupOldConnections() {
const meetingId = await this.getMeetingId()
if (!meetingId) log({ eventName: 'meetingIdNotFoundInCleanup' })
const websockets = this.ctx.getWebSockets()
const now = Date.now()
const users = await this.getUsers()
let removedUsers = 0
const connections = [...this.getConnections()]
log({
eventName: 'cleaningUpConnections',
meetingId,
connectionsFound: connections.length,
websocketsFound: websockets.length,
websocketStatuses: websockets.map((w) => w.readyState),
})
const sessionsToCleanUp = await this.getUsers()

connections.forEach((connection) =>
sessionsToCleanUp.delete(`session-${connection.id}`)
)

sessionsToCleanUp.forEach((user) => {
log({ eventName: 'userTimedOut', connectionId: user.id, meetingId })
})
for (const [key, user] of users) {
const connectionId = key.replace('session-', '')
const heartbeat = await this.ctx.storage.get<number>(
`heartbeat-${connectionId}`
)
if (heartbeat === undefined || heartbeat + alarmInterval < now) {
removedUsers++
await this.ctx.storage.delete(key).catch(() => {
console.warn(
`Failed to delete session ${key} in cleanupOldConnections`
)
})

await this.ctx.storage
.delete([...sessionsToCleanUp.keys()])
.catch(() => console.warn('Failed to clean up orphaned sessions'))
const connection = connections.find((c) => c.id === connectionId)
if (connection) {
connection.close(1011)
}
log({ eventName: 'userTimedOut', connectionId: user.id, meetingId })
}
}

const activeUserCount = (await this.getUsers()).size

if (meetingId && activeUserCount === 0) {
this.endMeeting(meetingId)
} else if (removedUsers > 0) {
this.broadcastRoomState()
}

return activeUserCount
Expand Down
20 changes: 10 additions & 10 deletions app/hooks/useRoom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,16 @@ export default function useRoom({
}
}, [websocket])

// // setup a simple ping pong
// useEffect(() => {
// const interval = setInterval(() => {
// websocket.send(
// JSON.stringify({ type: 'partyserver-ping' } satisfies ClientMessage)
// )
// }, 5000)
//
// return () => clearInterval(interval)
// }, [websocket])
// setup a heartbeat
useEffect(() => {
const interval = setInterval(() => {
websocket.send(
JSON.stringify({ type: 'heartbeat' } satisfies ClientMessage)
)
}, 5_000)

return () => clearInterval(interval)
}, [websocket])

const identity = useMemo(
() => roomState.users.find((u) => u.id === websocket.id),
Expand Down
3 changes: 3 additions & 0 deletions app/types/Messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,6 @@ export type ClientMessage =
| {
type: 'partyserver-ping'
}
| {
type: 'heartbeat'
}

0 comments on commit 79fcf2f

Please sign in to comment.