Skip to content

Commit

Permalink
fix race conditions in plug communication
Browse files Browse the repository at this point in the history
  • Loading branch information
huumn committed Jun 17, 2023
1 parent 2eb2101 commit 3df4a61
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 106 deletions.
26 changes: 18 additions & 8 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Deno.serve({
port: CONFIG.port,
hostname: CONFIG.bind,
reusePort: true,
}, (req) => {
}, async (req) => {
if (req.headers.get('upgrade')?.toLowerCase() !== 'websocket') {
const headers = {
'Access-Control-Allow-Origin': '*',
Expand All @@ -40,20 +40,24 @@ Deno.serve({
}
// nip 11
if (req.headers.get('accept') === 'application/nostr+json') {
return Response.json(conf.nip11, { status: 200, headers })
return Response.json(CONFIG.nip11, { status: 200, headers })
}
return new Response(null, { status: 404, headers })
}

const id = crypto.randomUUID()
const headers = Object.fromEntries(req.headers.entries())
try {
await plugsAction('connect', { id, headers })
} catch (e) {
return new Response(e.message, { status: 403 })
}

const { socket: ws, response: res } = Deno.upgradeWebSocket(req)
ws.onopen = async () => {
ws.booger = { id, headers }
ws.onopen = () => {
try {
ws.booger = {
id: crypto.randomUUID(),
headers: Object.fromEntries(req.headers.entries()),
}
addSocket(ws)
await plugsAction('connect', ws.booger)
} catch (e) {
sendNotice(ws, e.message)
delSocket(ws)
Expand Down Expand Up @@ -138,6 +142,9 @@ function delSocket(ws) {
}

function handleError(ws, error) {
// if socket closed, assume that's the source of the error
if (ws.readyState !== 1) return

console.error(error)
plugsAction('error', ws.booger, {
error: {
Expand All @@ -152,6 +159,9 @@ function handleError(ws, error) {
}

function sendNotice(ws, notice) {
// if socket closed, assume that's the source of the error
if (ws.readyState !== 1) return

console.info('notice', notice)
plugsAction('notice', ws.booger, { notice })
.catch(console.error)
Expand Down
86 changes: 54 additions & 32 deletions plugs.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export async function plugsInit() {
type: 'module',
name: basename(p.path, '.js') || basename(p.path, '.ts'),
})
await plugIn(worker, p.path)
await plugInit(worker, p.path)
}
} catch (e) {
if (
Expand All @@ -87,12 +87,44 @@ export async function plugsInit() {
name: basename(builtin, '.js') ||
basename(builtin, '.ts'),
})
await plugIn(worker, 'builtin ' + basename(builtin, '.js'))
await plugInit(worker, 'builtin ' + basename(builtin, '.js'))
}
}

async function plugIn(worker, name) {
return await new Promise((resolve, reject) => {
// provide async req<->resp messaging to plugs
// every message sent to a plug expecting a response gets a unique msgId
// plugs include the msgId in their response
const msgPromises = new Map() // msgId -> { resolve, reject }

let msgIdSeq = 0
function nextMsgId() {
msgIdSeq = msgIdSeq === Number.MAX_SAFE_INTEGER ? 0 : msgIdSeq + 1
return msgIdSeq
}

export async function plugsAction(action, conn, data) {
// if the action isn't something that can be rejected, resolve immediately
if (['eose', 'disconnect', 'error', 'notice', 'unsub'].includes(action)) {
plugs[action].forEach((worker) =>
worker.postMessage({ action, conn, data })
)
return
}

await Promise.all(
plugs[action].map((worker) =>
new Promise((resolve, reject) => {
const msgId = nextMsgId()
msgPromises.set(msgId, { resolve, reject })
worker.postMessage({ msgId, action, conn, data })
})
),
)
}

async function plugInit(worker, name) {
// register the plug
await new Promise((resolve, reject) => {
console.log(`plug registering ${name}...`)

setTimeout(() =>
Expand All @@ -103,44 +135,34 @@ async function plugIn(worker, name) {
), 5000)

worker.onmessage = ({ data }) => {
console.log(
`plug registered ${name} for actions: ${data.join(', ')}`,
)
for (const action of data) {
if (!Object.keys(plugs).includes(action.toLowerCase())) {
console.error(
`plug ${name} registered for unknown action ${action}`,
)
if (Object.keys(plugs).includes(action.toLowerCase())) {
plugs[action.toLowerCase()].push(worker)
} else {
console.error(`plug ${name} registered for unknown action ${action}`)
Deno.exit(1)
}
plugs[action.toLowerCase()].push(worker)
}
console.log(`plug registered ${name} for actions: ${data.join(', ')}`)
resolve()
}
worker.onerror = reject
worker.postMessage('getactions')
})
}

export async function plugsAction(action, conn, data) {
const result = await Promise.allSettled(plugs[action].map((worker) => {
return new Promise((resolve, reject) => {
worker.onmessage = ({ data }) => {
if (!data.accept) {
reject(new Error(data.reason))
}
resolve()
}
worker.onerror = reject
worker.postMessage({ action, conn, data })
// if the action isn't something that can be rejected, resolve immediately
if (['eose', 'disconnect', 'error', 'notice', 'unsub'].includes(action)) {
// listen for messages from the plug
worker.onmessage = ({ data }) => {
if (data.msgId && msgPromises.has(data.msgId)) {
const { resolve, reject } = msgPromises.get(data.msgId)
if (data.accept) {
resolve()
} else {
reject(new Error(data.reason))
}
})
}))

result.forEach((r) => {
if (r.status === 'rejected') throw r.reason
})
msgPromises.delete(data.msgId)
} else {
console.log(`plug ${name} unexpectedly emitted data: ${data}`)
}
}
worker.onerror = console.error
}
29 changes: 21 additions & 8 deletions plugs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ message from booger in the form:

```jsonc
{
msgId: Number, // msgId to include in response if responding
action: String, // e.g. 'connect'
conn: {
id: UUID // unique id to this connection
id: String // unique id to this connection
headers: Object // http headers as a json object
},
action: String, // e.g. 'connect'
data: Object // depends on the action and are documented further down
}
```
Expand All @@ -77,6 +78,7 @@ Responses from these actions must take the form:

```jsonc
{
msgId: Number, // the msgId of the action message we're responding to
accept: Boolean, // true to accept, false if booger should prevent
reason: String // reason for rejection if accept is false, undefined otherwise
// TODO: we'll probably add a replyRaw to send replies directly to clients
Expand Down Expand Up @@ -147,12 +149,17 @@ self.onmessage = ({ data }) => {
return
}
const { msgId } = data
if (data.action === 'event' && data.data.event.kind === 6) {
self.postMessage({ accept: false, reason: 'blocked: kind 6 not allowed' })
self.postMessage({
msgId,
accept: false,
reason: 'blocked: kind 6 not allowed',
})
return
}
self.postMessage({ accept: true })
self.postMessage({ msgId, accept: true })
}
```

Expand All @@ -165,20 +172,26 @@ self.onmessage = ({ data }) => {
return
}
const { msgId } = data
if (data.action === 'event' && data.data.event.kind === 6) {
self.postMessage({ accept: false, reason: 'blocked: kind 6 not allowed' })
self.postMessage({
msgId,
accept: false,
reason: 'blocked: kind 6 not allowed',
})
return
}
if (data.action === 'sub' && data.data.filters.length > 100) {
self.postMessage({
msgId,
accept: false,
reason: 'blocked: >100 filters not allowed',
})
return
}
self.postMessage({ accept: true })
self.postMessage({ msgId, accept: true })
}
```

Expand All @@ -191,10 +204,10 @@ self.onmessage = ({ data }) => {
self.postMessage(['sub', 'unsub'])
return
}
if (data.action === 'sub') {
const { msgId } = data
timers.set(data.data.subId, Date.now())
self.postMessage({ accept: true })
self.postMessage({ msgId, accept: true })
return
}
Expand Down
59 changes: 29 additions & 30 deletions plugs/builtin/limits/limits.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ export async function pgInit() {
self.onmessage = async ({ data }) => {
if (data === 'getactions') {
await pgInit()

self.postMessage(['event', 'sub', 'unsub', 'connect', 'disconnect'])
return
}

const { action, conn, data: dat } = data
const { msgId, action, conn, data: dat } = data
let res
try {
switch (action) {
Expand All @@ -43,8 +44,8 @@ self.onmessage = async ({ data }) => {
if (res?.at(0)?.count >= LIMITS.maxConnections) {
throw new Error('blocked: too many connections')
}
self.postMessage({ accept: true })
return
self.postMessage({ msgId, accept: true })
break
case 'sub':
res = await pg`
SELECT count(*) as sub_count, sum(filter_count) as filter_count
Expand All @@ -57,47 +58,48 @@ self.onmessage = async ({ data }) => {
throw new Error('blocked: too many filters')
}

self.postMessage({ accept: true })
self.postMessage({ msgId, accept: true })

await pg`INSERT INTO subs (ip, conn_id, nostr_sub_id, filter_count) VALUES
(${conn.ip}, ${conn.id}, ${dat.subId}, ${dat.filters.length})`
break
case 'event': {
// delete content older than interval
// insert the new event if below the count and not a duplicate
const code = LIMITS.maxEvents.duplicateContentIgnoreLen &&
LIMITS.maxEvents.duplicateContentIgnoreLen <
dat.event.content?.length
? hashCode(dat.event.content)
: null
case 'event':
{
// delete content older than interval
// insert the new event if below the count and not a duplicate
const code = LIMITS.maxEvents.duplicateContentIgnoreLen &&
LIMITS.maxEvents.duplicateContentIgnoreLen <
dat.event.content?.length
? hashCode(dat.event.content)
: null

res = await pg`
res = await pg`
SELECT count(*) as event_count,
count(*) FILTER (WHERE content_hash_code = ${code}) as dup_count
FROM events
WHERE (conn_id = ${conn.id} OR ip IS NOT DISTINCT FROM ${conn.ip})
AND created_at > (NOW() AT TIME ZONE 'UTC')
- MAKE_INTERVAL(secs => ${LIMITS.maxEvents.interval})`
if (res[0].event_count >= LIMITS.maxEvents.count) {
throw new Error('blocked: too many events')
}
if (res[0].event_count >= LIMITS.maxEvents.count) {
throw new Error('blocked: too many events')
}

res = await pg.begin((pg) => [
pg`DELETE FROM events
res = await pg.begin((pg) => [
pg`DELETE FROM events
WHERE created_at < (NOW() AT TIME ZONE 'UTC')
- MAKE_INTERVAL(secs => ${LIMITS.maxEvents.interval})`,
pg`INSERT INTO events (ip, conn_id, content_hash_code, kind)
pg`INSERT INTO events (ip, conn_id, content_hash_code, kind)
VALUES (${conn.ip}, ${conn.id}, ${code}, ${dat.event.kind})
ON CONFLICT (content_hash_code) DO NOTHING
RETURNING id`,
])
if (!res?.at(1)?.length) {
throw new Error('blocked: duplicate content')
}
])
if (!res?.at(1)?.length) {
throw new Error('blocked: duplicate content')
}

self.postMessage({ accept: true })
return
}
self.postMessage({ msgId, accept: true })
}
break
case 'disconnect':
// decrement the count accounting for the sentinel value
// if count < 0, delete the connection
Expand All @@ -114,9 +116,6 @@ self.onmessage = async ({ data }) => {
}
} catch (e) {
console.error(e)
self.postMessage({ accept: false, reason: e.message })
return
self.postMessage({ msgId, accept: false, reason: e.message })
}

return
}
Loading

0 comments on commit 3df4a61

Please sign in to comment.