Skip to content

Commit

Permalink
Exploit prevention SQLi in pg (#4566)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: simon-id <simon.id@datadoghq.com>
  • Loading branch information
uurien and simon-id authored Sep 11, 2024
1 parent 8490eae commit 14ebf97
Show file tree
Hide file tree
Showing 23 changed files with 1,527 additions and 410 deletions.
63 changes: 59 additions & 4 deletions packages/datadog-instrumentations/src/pg.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,59 @@ function wrapQuery (query) {
}

return asyncResource.runInAsyncScope(() => {
const abortController = new AbortController()

startCh.publish({
params: this.connectionParameters,
query: pgQuery,
processId
processId,
abortController
})

arguments[0] = pgQuery

const finish = asyncResource.bind(function (error) {
if (error) {
errorCh.publish(error)
}
finishCh.publish()
})

if (abortController.signal.aborted) {
const error = abortController.signal.reason || new Error('Aborted')

// eslint-disable-next-line max-len
// Based on: https://github.com/brianc/node-postgres/blob/54eb0fa216aaccd727765641e7d1cf5da2bc483d/packages/pg/lib/client.js#L510
const reusingQuery = typeof pgQuery.submit === 'function'
const callback = arguments[arguments.length - 1]

finish(error)

if (reusingQuery) {
if (!pgQuery.callback && typeof callback === 'function') {
pgQuery.callback = callback
}

if (pgQuery.callback) {
pgQuery.callback(error)
} else {
process.nextTick(() => {
pgQuery.emit('error', error)
})
}

return pgQuery
}

if (typeof callback === 'function') {
callback(error)

return
}

return Promise.reject(error)
}

arguments[0] = pgQuery

const retval = query.apply(this, arguments)
const queryQueue = this.queryQueue || this._queryQueue
const activeQuery = this.activeQuery || this._activeQuery
Expand Down Expand Up @@ -112,15 +150,32 @@ function wrapPoolQuery (query) {
const pgQuery = arguments[0] !== null && typeof arguments[0] === 'object' ? arguments[0] : { text: arguments[0] }

return asyncResource.runInAsyncScope(() => {
const abortController = new AbortController()

startPoolQueryCh.publish({
query: pgQuery
query: pgQuery,
abortController
})

const finish = asyncResource.bind(function () {
finishPoolQueryCh.publish()
})

const cb = arguments[arguments.length - 1]

if (abortController.signal.aborted) {
const error = abortController.signal.reason || new Error('Aborted')
finish()

if (typeof cb === 'function') {
cb(error)

return
} else {
return Promise.reject(error)
}
}

if (typeof cb === 'function') {
arguments[arguments.length - 1] = shimmer.wrapFunction(cb, cb => function () {
finish()
Expand Down
244 changes: 244 additions & 0 deletions packages/datadog-instrumentations/test/pg.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
'use strict'

const agent = require('../../dd-trace/test/plugins/agent')
const dc = require('dc-polyfill')
const { assert } = require('chai')

const clients = {
pg: pg => pg.Client
}

if (process.env.PG_TEST_NATIVE === 'true') {
clients['pg.native'] = pg => pg.native.Client
}

describe('pg instrumentation', () => {
withVersions('pg', 'pg', version => {
const queryClientStartChannel = dc.channel('apm:pg:query:start')
const queryPoolStartChannel = dc.channel('datadog:pg:pool:query:start')

let pg
let Query

function abortQuery ({ abortController }) {
const error = new Error('Test')
abortController.abort(error)

if (!abortController.signal.reason) {
abortController.signal.reason = error
}
}

before(() => {
return agent.load(['pg'])
})

describe('pg.Client', () => {
Object.keys(clients).forEach(implementation => {
describe(implementation, () => {
let client

beforeEach(done => {
pg = require(`../../../versions/pg@${version}`).get()
const Client = clients[implementation](pg)
Query = Client.Query

client = new Client({
host: '127.0.0.1',
user: 'postgres',
password: 'postgres',
database: 'postgres',
application_name: 'test'
})

client.connect(err => done(err))
})

afterEach(() => {
client.end()
})

describe('abortController', () => {
afterEach(() => {
if (queryClientStartChannel.hasSubscribers) {
queryClientStartChannel.unsubscribe(abortQuery)
}
})

describe('using callback', () => {
it('Should not fail if it is not aborted', (done) => {
client.query('SELECT 1', (err) => {
done(err)
})
})

it('Should abort query', (done) => {
queryClientStartChannel.subscribe(abortQuery)

client.query('SELECT 1', (err) => {
assert.propertyVal(err, 'message', 'Test')
done()
})
})
})

describe('using promise', () => {
it('Should not fail if it is not aborted', async () => {
await client.query('SELECT 1')
})

it('Should abort query', async () => {
queryClientStartChannel.subscribe(abortQuery)

try {
await client.query('SELECT 1')
} catch (err) {
assert.propertyVal(err, 'message', 'Test')

return
}

throw new Error('Query was not aborted')
})
})

describe('using query object', () => {
describe('without callback', () => {
it('Should not fail if it is not aborted', (done) => {
const query = new Query('SELECT 1')

client.query(query)

query.on('end', () => {
done()
})
})

it('Should abort query', (done) => {
queryClientStartChannel.subscribe(abortQuery)

const query = new Query('SELECT 1')

client.query(query)

query.on('error', err => {
assert.propertyVal(err, 'message', 'Test')
done()
})

query.on('end', () => {
done(new Error('Query was not aborted'))
})
})
})

describe('with callback in query object', () => {
it('Should not fail if it is not aborted', (done) => {
const query = new Query('SELECT 1')
query.callback = (err) => {
done(err)
}

client.query(query)
})

it('Should abort query', (done) => {
queryClientStartChannel.subscribe(abortQuery)

const query = new Query('SELECT 1')
query.callback = err => {
assert.propertyVal(err, 'message', 'Test')
done()
}

client.query(query)
})
})

describe('with callback in query parameter', () => {
it('Should not fail if it is not aborted', (done) => {
const query = new Query('SELECT 1')

client.query(query, (err) => {
done(err)
})
})

it('Should abort query', (done) => {
queryClientStartChannel.subscribe(abortQuery)

const query = new Query('SELECT 1')

client.query(query, err => {
assert.propertyVal(err, 'message', 'Test')
done()
})
})
})
})
})
})
})
})

describe('pg.Pool', () => {
let pool

beforeEach(() => {
const { Pool } = require(`../../../versions/pg@${version}`).get()

pool = new Pool({
host: '127.0.0.1',
user: 'postgres',
password: 'postgres',
database: 'postgres',
application_name: 'test'
})
})

describe('abortController', () => {
afterEach(() => {
if (queryPoolStartChannel.hasSubscribers) {
queryPoolStartChannel.unsubscribe(abortQuery)
}
})

describe('using callback', () => {
it('Should not fail if it is not aborted', (done) => {
pool.query('SELECT 1', (err) => {
done(err)
})
})

it('Should abort query', (done) => {
queryPoolStartChannel.subscribe(abortQuery)

pool.query('SELECT 1', (err) => {
assert.propertyVal(err, 'message', 'Test')
done()
})
})
})

describe('using promise', () => {
it('Should not fail if it is not aborted', async () => {
await pool.query('SELECT 1')
})

it('Should abort query', async () => {
queryPoolStartChannel.subscribe(abortQuery)

try {
await pool.query('SELECT 1')
} catch (err) {
assert.propertyVal(err, 'message', 'Test')
return
}

throw new Error('Query was not aborted')
})
})
})
})
})
})
4 changes: 3 additions & 1 deletion packages/dd-trace/src/appsec/addresses.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,7 @@ module.exports = {
USER_ID: 'usr.id',
WAF_CONTEXT_PROCESSOR: 'waf.context.processor',

HTTP_OUTGOING_URL: 'server.io.net.url'
HTTP_OUTGOING_URL: 'server.io.net.url',
DB_STATEMENT: 'server.db.statement',
DB_SYSTEM: 'server.db.system'
}
6 changes: 4 additions & 2 deletions packages/dd-trace/src/appsec/channels.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ module.exports = {
responseWriteHead: dc.channel('apm:http:server:response:writeHead:start'),
httpClientRequestStart: dc.channel('apm:http:client:request:start'),
responseSetHeader: dc.channel('datadog:http:server:response:set-header:start'),
setUncaughtExceptionCaptureCallbackStart: dc.channel('datadog:process:setUncaughtExceptionCaptureCallback:start')

setUncaughtExceptionCaptureCallbackStart: dc.channel('datadog:process:setUncaughtExceptionCaptureCallback:start'),
pgQueryStart: dc.channel('apm:pg:query:start'),
pgPoolQueryStart: dc.channel('datadog:pg:pool:query:start'),
wafRunFinished: dc.channel('datadog:waf:run:finish')
}
Loading

0 comments on commit 14ebf97

Please sign in to comment.