Skip to content

Commit

Permalink
[MLOB-1942] fix(llmobs): auto-annotations for wrapped functions happe…
Browse files Browse the repository at this point in the history
…n after manual annotations (#4960)

* auto-annotation done before span finish

* error cases

* callback scoped consistently to apm

* make clearer
  • Loading branch information
sabrenner authored and rochdev committed Jan 8, 2025
1 parent 82df743 commit b739299
Show file tree
Hide file tree
Showing 2 changed files with 267 additions and 26 deletions.
116 changes: 90 additions & 26 deletions packages/dd-trace/src/llmobs/sdk.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
'use strict'

const { SPAN_KIND, OUTPUT_VALUE } = require('./constants/tags')
const { SPAN_KIND, OUTPUT_VALUE, INPUT_VALUE } = require('./constants/tags')

const {
getFunctionArguments,
validateKind
} = require('./util')
const { isTrue } = require('../util')
const { isTrue, isError } = require('../util')

const { storage } = require('./storage')

Expand Down Expand Up @@ -134,29 +134,63 @@ class LLMObs extends NoopLLMObs {

function wrapped () {
const span = llmobs._tracer.scope().active()

const result = llmobs._activate(span, { kind, options: llmobsOptions }, () => {
if (!['llm', 'embedding'].includes(kind)) {
llmobs.annotate(span, { inputData: getFunctionArguments(fn, arguments) })
const fnArgs = arguments

const lastArgId = fnArgs.length - 1
const cb = fnArgs[lastArgId]
const hasCallback = typeof cb === 'function'

if (hasCallback) {
const scopeBoundCb = llmobs._bind(cb)
fnArgs[lastArgId] = function () {
// it is standard practice to follow the callback signature (err, result)
// however, we try to parse the arguments to determine if the first argument is an error
// if it is not, and is not undefined, we will use that for the output value
const maybeError = arguments[0]
const maybeResult = arguments[1]

llmobs._autoAnnotate(
span,
kind,
getFunctionArguments(fn, fnArgs),
isError(maybeError) || maybeError == null ? maybeResult : maybeError
)

return scopeBoundCb.apply(this, arguments)
}
}

return fn.apply(this, arguments)
})
try {
const result = llmobs._activate(span, { kind, options: llmobsOptions }, () => fn.apply(this, fnArgs))

if (result && typeof result.then === 'function') {
return result.then(
value => {
if (!hasCallback) {
llmobs._autoAnnotate(span, kind, getFunctionArguments(fn, fnArgs), value)
}
return value
},
err => {
llmobs._autoAnnotate(span, kind, getFunctionArguments(fn, fnArgs))
throw err
}
)
}

if (result && typeof result.then === 'function') {
return result.then(value => {
if (value && !['llm', 'retrieval'].includes(kind) && !LLMObsTagger.tagMap.get(span)?.[OUTPUT_VALUE]) {
llmobs.annotate(span, { outputData: value })
}
return value
})
}
// it is possible to return a value and have a callback
// however, since the span finishes when the callback is called, it is possible that
// the callback is called before the function returns (although unlikely)
// we do not want to throw for "annotating a finished span" in this case
if (!hasCallback) {
llmobs._autoAnnotate(span, kind, getFunctionArguments(fn, fnArgs), result)
}

if (result && !['llm', 'retrieval'].includes(kind) && !LLMObsTagger.tagMap.get(span)?.[OUTPUT_VALUE]) {
llmobs.annotate(span, { outputData: result })
return result
} catch (e) {
llmobs._autoAnnotate(span, kind, getFunctionArguments(fn, fnArgs))
throw e
}

return result
}

return this._tracer.wrap(name, spanOptions, wrapped)
Expand Down Expand Up @@ -333,20 +367,34 @@ class LLMObs extends NoopLLMObs {
flushCh.publish()
}

_autoAnnotate (span, kind, input, output) {
const annotations = {}
if (input && !['llm', 'embedding'].includes(kind) && !LLMObsTagger.tagMap.get(span)?.[INPUT_VALUE]) {
annotations.inputData = input
}

if (output && !['llm', 'retrieval'].includes(kind) && !LLMObsTagger.tagMap.get(span)?.[OUTPUT_VALUE]) {
annotations.outputData = output
}

this.annotate(span, annotations)
}

_active () {
const store = storage.getStore()
return store?.span
}

_activate (span, { kind, options } = {}, fn) {
_activate (span, options, fn) {
const parent = this._active()
if (this.enabled) storage.enterWith({ span })

this._tagger.registerLLMObsSpan(span, {
...options,
parent,
kind
})
if (options) {
this._tagger.registerLLMObsSpan(span, {
...options,
parent
})
}

try {
return fn()
Expand All @@ -355,6 +403,22 @@ class LLMObs extends NoopLLMObs {
}
}

// bind function to active LLMObs span
_bind (fn) {
if (typeof fn !== 'function') return fn

const llmobs = this
const activeSpan = llmobs._active()

const bound = function () {
return llmobs._activate(activeSpan, null, () => {
return fn.apply(this, arguments)
})
}

return bound
}

_extractOptions (options) {
const {
modelName,
Expand Down
177 changes: 177 additions & 0 deletions packages/dd-trace/test/llmobs/sdk/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ describe('sdk', () => {
let LLMObsSDK
let llmobs
let tracer
let clock

before(() => {
tracer = require('../../../../dd-trace')
Expand All @@ -43,6 +44,8 @@ describe('sdk', () => {

// remove max listener warnings, we don't care about the writer anyways
process.removeAllListeners('beforeExit')

clock = sinon.useFakeTimers()
})

afterEach(() => {
Expand Down Expand Up @@ -435,6 +438,180 @@ describe('sdk', () => {
})
})

it('does not crash for auto-annotation values that are overriden', () => {
const circular = {}
circular.circular = circular

let span
function myWorkflow (input) {
span = llmobs._active()
llmobs.annotate({
inputData: 'circular',
outputData: 'foo'
})
return ''
}

const wrappedMyWorkflow = llmobs.wrap({ kind: 'workflow' }, myWorkflow)
wrappedMyWorkflow(circular)

expect(LLMObsTagger.tagMap.get(span)).to.deep.equal({
'_ml_obs.meta.span.kind': 'workflow',
'_ml_obs.meta.ml_app': 'mlApp',
'_ml_obs.llmobs_parent_id': 'undefined',
'_ml_obs.meta.input.value': 'circular',
'_ml_obs.meta.output.value': 'foo'
})
})

it('only auto-annotates input on error', () => {
let span
function myTask (foo, bar) {
span = llmobs._active()
throw new Error('error')
}

const wrappedMyTask = llmobs.wrap({ kind: 'task' }, myTask)

expect(() => wrappedMyTask('foo', 'bar')).to.throw()

expect(LLMObsTagger.tagMap.get(span)).to.deep.equal({
'_ml_obs.meta.span.kind': 'task',
'_ml_obs.meta.ml_app': 'mlApp',
'_ml_obs.llmobs_parent_id': 'undefined',
'_ml_obs.meta.input.value': JSON.stringify({ foo: 'foo', bar: 'bar' })
})
})

it('only auto-annotates input on error for promises', () => {
let span
function myTask (foo, bar) {
span = llmobs._active()
return Promise.reject(new Error('error'))
}

const wrappedMyTask = llmobs.wrap({ kind: 'task' }, myTask)

return wrappedMyTask('foo', 'bar')
.catch(() => {
expect(LLMObsTagger.tagMap.get(span)).to.deep.equal({
'_ml_obs.meta.span.kind': 'task',
'_ml_obs.meta.ml_app': 'mlApp',
'_ml_obs.llmobs_parent_id': 'undefined',
'_ml_obs.meta.input.value': JSON.stringify({ foo: 'foo', bar: 'bar' })
})
})
})

it('auto-annotates the inputs of the callback function as the outputs for the span', () => {
let span
function myWorkflow (input, cb) {
span = llmobs._active()
setTimeout(() => {
cb(null, 'output')
}, 1000)
}

const wrappedMyWorkflow = llmobs.wrap({ kind: 'workflow' }, myWorkflow)
wrappedMyWorkflow('input', (err, res) => {
expect(err).to.not.exist
expect(res).to.equal('output')
})

clock.tick(1000)

expect(LLMObsTagger.tagMap.get(span)).to.deep.equal({
'_ml_obs.meta.span.kind': 'workflow',
'_ml_obs.meta.ml_app': 'mlApp',
'_ml_obs.llmobs_parent_id': 'undefined',
'_ml_obs.meta.input.value': JSON.stringify({ input: 'input' }),
'_ml_obs.meta.output.value': 'output'
})
})

it('ignores the error portion of the callback for auto-annotation', () => {
let span
function myWorkflow (input, cb) {
span = llmobs._active()
setTimeout(() => {
cb(new Error('error'), 'output')
}, 1000)
}

const wrappedMyWorkflow = llmobs.wrap({ kind: 'workflow' }, myWorkflow)
wrappedMyWorkflow('input', (err, res) => {
expect(err).to.exist
expect(res).to.equal('output')
})

clock.tick(1000)

expect(LLMObsTagger.tagMap.get(span)).to.deep.equal({
'_ml_obs.meta.span.kind': 'workflow',
'_ml_obs.meta.ml_app': 'mlApp',
'_ml_obs.llmobs_parent_id': 'undefined',
'_ml_obs.meta.input.value': JSON.stringify({ input: 'input' }),
'_ml_obs.meta.output.value': 'output'
})
})

it('auto-annotates the first argument of the callback as the output if it is not an error', () => {
let span
function myWorkflow (input, cb) {
span = llmobs._active()
setTimeout(() => {
cb('output', 'ignore')
}, 1000)
}

const wrappedMyWorkflow = llmobs.wrap({ kind: 'workflow' }, myWorkflow)
wrappedMyWorkflow('input', (res, irrelevant) => {
expect(res).to.equal('output')
expect(irrelevant).to.equal('ignore')
})

clock.tick(1000)

expect(LLMObsTagger.tagMap.get(span)).to.deep.equal({
'_ml_obs.meta.span.kind': 'workflow',
'_ml_obs.meta.ml_app': 'mlApp',
'_ml_obs.llmobs_parent_id': 'undefined',
'_ml_obs.meta.input.value': JSON.stringify({ input: 'input' }),
'_ml_obs.meta.output.value': 'output'
})
})

it('maintains context consistent with the tracer', () => {
let llmSpan, workflowSpan, taskSpan

function myLlm (input, cb) {
llmSpan = llmobs._active()
setTimeout(() => {
cb(null, 'output')
}, 1000)
}
const myWrappedLlm = llmobs.wrap({ kind: 'llm' }, myLlm)

llmobs.trace({ kind: 'workflow', name: 'myWorkflow' }, _workflow => {
workflowSpan = _workflow
tracer.trace('apmOperation', () => {
myWrappedLlm('input', (err, res) => {
expect(err).to.not.exist
expect(res).to.equal('output')
llmobs.trace({ kind: 'task', name: 'afterLlmTask' }, _task => {
taskSpan = _task

const llmParentId = LLMObsTagger.tagMap.get(llmSpan)['_ml_obs.llmobs_parent_id']
expect(llmParentId).to.equal(workflowSpan.context().toSpanId())

const taskParentId = LLMObsTagger.tagMap.get(taskSpan)['_ml_obs.llmobs_parent_id']
expect(taskParentId).to.equal(workflowSpan.context().toSpanId())
})
})
})
})
})

// TODO: need span kind optional for this test
it.skip('sets the span name to "unnamed-anonymous-function" if no name is provided', () => {
let span
Expand Down

0 comments on commit b739299

Please sign in to comment.