diff --git a/.github/workflows/plugins.yml b/.github/workflows/plugins.yml index 7502e256749..3afc8417621 100644 --- a/.github/workflows/plugins.yml +++ b/.github/workflows/plugins.yml @@ -798,6 +798,15 @@ jobs: - uses: actions/checkout@v4 - uses: ./.github/actions/plugins/test + protobufjs: + runs-on: ubuntu-latest + env: + PLUGINS: protobufjs + DD_DATA_STREAMS_ENABLED: true + steps: + - uses: actions/checkout@v4 + - uses: ./.github/actions/plugins/test-and-upstream + q: runs-on: ubuntu-latest env: diff --git a/docs/API.md b/docs/API.md index 68cdc3747cb..713c17032cf 100644 --- a/docs/API.md +++ b/docs/API.md @@ -87,6 +87,7 @@ tracer.use('pg', {
+
@@ -142,6 +143,7 @@ tracer.use('pg', { * [pg](./interfaces/export_.plugins.pg.html) * [promise](./interfaces/export_.plugins.promise.html) * [promise-js](./interfaces/export_.plugins.promise_js.html) +* [protobufjs](./interfaces/export_.plugins.protobufjs.html) * [q](./interfaces/export_.plugins.q.html) * [redis](./interfaces/export_.plugins.redis.html) * [restify](./interfaces/export_.plugins.restify.html) diff --git a/docs/add-redirects.sh b/docs/add-redirects.sh index fd0590a934a..732c8a83607 100755 --- a/docs/add-redirects.sh +++ b/docs/add-redirects.sh @@ -55,6 +55,7 @@ declare -a plugins=( "pg" "promise" "promise_js" + "protobufjs" "q" "redis" "restify" diff --git a/docs/test.ts b/docs/test.ts index e37177e0898..c45d0f3d515 100644 --- a/docs/test.ts +++ b/docs/test.ts @@ -362,6 +362,7 @@ tracer.use('playwright'); tracer.use('pg'); tracer.use('pg', { service: params => `${params.host}-${params.database}` }); tracer.use('pino'); +tracer.use('protobufjs'); tracer.use('redis'); tracer.use('redis', redisOptions); tracer.use('restify'); diff --git a/index.d.ts b/index.d.ts index 55a67cc8f8b..86da17620d6 100644 --- a/index.d.ts +++ b/index.d.ts @@ -190,6 +190,7 @@ interface Plugins { "playwright": tracer.plugins.playwright; "pg": tracer.plugins.pg; "pino": tracer.plugins.pino; + "protobufjs": tracer.plugins.protobufjs; "redis": tracer.plugins.redis; "restify": tracer.plugins.restify; "rhea": tracer.plugins.rhea; @@ -1731,6 +1732,12 @@ declare namespace tracer { * on the tracer. */ interface pino extends Integration {} + + /** + * This plugin automatically patches the [protobufjs](https://protobufjs.github.io/protobuf.js/) + * to collect protobuf message schemas when Datastreams Monitoring is enabled. + */ + interface protobufjs extends Integration {} /** * This plugin automatically instruments the diff --git a/packages/datadog-instrumentations/src/helpers/hooks.js b/packages/datadog-instrumentations/src/helpers/hooks.js index 284e4ed5950..eb2cbcb794c 100644 --- a/packages/datadog-instrumentations/src/helpers/hooks.js +++ b/packages/datadog-instrumentations/src/helpers/hooks.js @@ -100,6 +100,7 @@ module.exports = { playwright: () => require('../playwright'), 'promise-js': () => require('../promise-js'), promise: () => require('../promise'), + protobufjs: () => require('../protobufjs'), q: () => require('../q'), qs: () => require('../qs'), redis: () => require('../redis'), diff --git a/packages/datadog-instrumentations/src/protobufjs.js b/packages/datadog-instrumentations/src/protobufjs.js new file mode 100644 index 00000000000..79cbb4ee3a1 --- /dev/null +++ b/packages/datadog-instrumentations/src/protobufjs.js @@ -0,0 +1,127 @@ +const shimmer = require('../../datadog-shimmer') +const { addHook } = require('./helpers/instrument') + +const dc = require('dc-polyfill') +const serializeChannel = dc.channel('apm:protobufjs:serialize-start') +const deserializeChannel = dc.channel('apm:protobufjs:deserialize-end') + +function wrapSerialization (messageClass) { + if (messageClass?.encode) { + shimmer.wrap(messageClass, 'encode', original => function () { + if (!serializeChannel.hasSubscribers) { + return original.apply(this, arguments) + } + serializeChannel.publish({ messageClass: this }) + return original.apply(this, arguments) + }) + } +} + +function wrapDeserialization (messageClass) { + if (messageClass?.decode) { + shimmer.wrap(messageClass, 'decode', original => function () { + if (!deserializeChannel.hasSubscribers) { + return original.apply(this, arguments) + } + const result = original.apply(this, arguments) + deserializeChannel.publish({ messageClass: result }) + return result + }) + } +} + +function wrapSetup (messageClass) { + if (messageClass?.setup) { + shimmer.wrap(messageClass, 'setup', original => function () { + const result = original.apply(this, arguments) + + wrapSerialization(messageClass) + wrapDeserialization(messageClass) + + return result + }) + } +} + +function wrapProtobufClasses (root) { + if (!root) { + return + } + + if (root.decode) { + wrapSetup(root) + } + + if (root.nestedArray) { + for (const subRoot of root.nestedArray) { + wrapProtobufClasses(subRoot) + } + } +} + +function wrapReflection (protobuf) { + const reflectionMethods = [ + { + target: protobuf.Root, + name: 'fromJSON' + }, + { + target: protobuf.Type.prototype, + name: 'fromObject' + } + ] + + reflectionMethods.forEach(method => { + shimmer.wrap(method.target, method.name, original => function () { + const result = original.apply(this, arguments) + if (result.nested) { + for (const type in result.nested) { + wrapSetup(result.nested[type]) + } + } + if (result.$type) { + wrapSetup(result.$type) + } + return result + }) + }) +} + +function isPromise (obj) { + return !!obj && (typeof obj === 'object' || typeof obj === 'function') && typeof obj.then === 'function' +} + +addHook({ + name: 'protobufjs', + versions: ['>=6.8.0'] +}, protobuf => { + shimmer.wrap(protobuf.Root.prototype, 'load', original => function () { + const result = original.apply(this, arguments) + if (isPromise(result)) { + return result.then(root => { + wrapProtobufClasses(root) + return root + }) + } else { + // If result is not a promise, directly wrap the protobuf classes + wrapProtobufClasses(result) + return result + } + }) + + shimmer.wrap(protobuf.Root.prototype, 'loadSync', original => function () { + const root = original.apply(this, arguments) + wrapProtobufClasses(root) + return root + }) + + shimmer.wrap(protobuf, 'Type', Original => function () { + const typeInstance = new Original(...arguments) + wrapSetup(typeInstance) + return typeInstance + }) + + wrapReflection(protobuf) + + return protobuf +}) diff --git a/packages/datadog-plugin-protobufjs/src/index.js b/packages/datadog-plugin-protobufjs/src/index.js new file mode 100644 index 00000000000..800c3d9e3cb --- /dev/null +++ b/packages/datadog-plugin-protobufjs/src/index.js @@ -0,0 +1,14 @@ +const SchemaPlugin = require('../../dd-trace/src/plugins/schema') +const SchemaExtractor = require('./schema_iterator') + +class ProtobufjsPlugin extends SchemaPlugin { + static get id () { + return 'protobufjs' + } + + static get schemaExtractor () { + return SchemaExtractor + } +} + +module.exports = ProtobufjsPlugin diff --git a/packages/datadog-plugin-protobufjs/src/schema_iterator.js b/packages/datadog-plugin-protobufjs/src/schema_iterator.js new file mode 100644 index 00000000000..a06f7eb313a --- /dev/null +++ b/packages/datadog-plugin-protobufjs/src/schema_iterator.js @@ -0,0 +1,183 @@ +const PROTOBUF = 'protobuf' +const { + SCHEMA_DEFINITION, + SCHEMA_ID, + SCHEMA_NAME, + SCHEMA_OPERATION, + SCHEMA_WEIGHT, + SCHEMA_TYPE +} = require('../../dd-trace/src/constants') +const { + SchemaBuilder +} = require('../../dd-trace/src/datastreams/schemas/schema_builder') + +class SchemaExtractor { + constructor (schema) { + this.schema = schema + } + + static getTypeAndFormat (type) { + const typeFormatMapping = { + int32: ['integer', 'int32'], + int64: ['integer', 'int64'], + uint32: ['integer', 'uint32'], + uint64: ['integer', 'uint64'], + sint32: ['integer', 'sint32'], + sint64: ['integer', 'sint64'], + fixed32: ['integer', 'fixed32'], + fixed64: ['integer', 'fixed64'], + sfixed32: ['integer', 'sfixed32'], + sfixed64: ['integer', 'sfixed64'], + float: ['number', 'float'], + double: ['number', 'double'], + bool: ['boolean', null], + string: ['string', null], + bytes: ['string', 'byte'], + Enum: ['enum', null], + Type: ['type', null], + map: ['map', null], + repeated: ['array', null] + } + + return typeFormatMapping[type] || ['string', null] + } + + static extractProperty (field, schemaName, fieldName, builder, depth) { + let array = false + let description + let ref + let enumValues + + const resolvedType = field.resolvedType ? field.resolvedType.constructor.name : field.type + + const isRepeatedField = field.rule === 'repeated' + + let typeFormat = this.getTypeAndFormat(isRepeatedField ? 'repeated' : resolvedType) + let type = typeFormat[0] + let format = typeFormat[1] + + if (type === 'array') { + array = true + typeFormat = this.getTypeAndFormat(resolvedType) + type = typeFormat[0] + format = typeFormat[1] + } + + if (type === 'type') { + format = null + ref = `#/components/schemas/${removeLeadingPeriod(field.resolvedType.fullName)}` + // keep a reference to the original builder iterator since when we recurse this reference will get reset to + // deeper schemas + const originalSchemaExtractor = builder.iterator + if (!this.extractSchema(field.resolvedType, builder, depth, this)) { + return false + } + type = 'object' + builder.iterator = originalSchemaExtractor + } else if (type === 'enum') { + enumValues = [] + let i = 0 + while (field.resolvedType.valuesById[i]) { + enumValues.push(field.resolvedType.valuesById[i]) + i += 1 + } + } + return builder.addProperty(schemaName, fieldName, array, type, description, ref, format, enumValues) + } + + static extractSchema (schema, builder, depth, extractor) { + depth += 1 + const schemaName = removeLeadingPeriod(schema.resolvedType ? schema.resolvedType.fullName : schema.fullName) + if (extractor) { + // if we already have a defined extractor, this is a nested schema. create a new extractor for the nested + // schema, ensure it is added to our schema builder's cache, and replace the builders iterator with our + // nested schema iterator / extractor. Once complete, add the new schema to our builder's schemas. + const nestedSchemaExtractor = new SchemaExtractor(schema) + builder.iterator = nestedSchemaExtractor + const nestedSchema = SchemaBuilder.getSchema(schemaName, nestedSchemaExtractor, builder) + for (const nestedSubSchemaName in nestedSchema.components.schemas) { + if (nestedSchema.components.schemas.hasOwnProperty(nestedSubSchemaName)) { + builder.schema.components.schemas[nestedSubSchemaName] = nestedSchema.components.schemas[nestedSubSchemaName] + } + } + return true + } else { + if (!builder.shouldExtractSchema(schemaName, depth)) { + return false + } + try { + for (const field of schema.fieldsArray) { + if (!this.extractProperty(field, schemaName, field.name, builder, depth)) { + return false + } + } + } catch (error) { + return false + } + return true + } + } + + static extractSchemas (descriptor, dataStreamsProcessor) { + const schemaName = removeLeadingPeriod( + descriptor.resolvedType ? descriptor.resolvedType.fullName : descriptor.fullName + ) + return dataStreamsProcessor.getSchema(schemaName, new SchemaExtractor(descriptor)) + } + + iterateOverSchema (builder) { + this.constructor.extractSchema(this.schema, builder, 0) + } + + static attachSchemaOnSpan (args, span, operation, tracer) { + const { messageClass } = args + const descriptor = messageClass.$type ?? messageClass + + if (!descriptor || !span) { + return + } + + if (span.context()._tags[SCHEMA_TYPE] && operation === 'serialization') { + // we have already added a schema to this span, this call is an encode of nested schema types + return + } + + span.setTag(SCHEMA_TYPE, PROTOBUF) + span.setTag(SCHEMA_NAME, removeLeadingPeriod(descriptor.fullName)) + span.setTag(SCHEMA_OPERATION, operation) + + if (!tracer._dataStreamsProcessor.canSampleSchema(operation)) { + return + } + + // if the span is unsampled, do not sample the schema + if (!tracer._prioritySampler.isSampled(span)) { + return + } + + const weight = tracer._dataStreamsProcessor.trySampleSchema(operation) + if (weight === 0) { + return + } + + const schemaData = SchemaBuilder.getSchemaDefinition( + this.extractSchemas(descriptor, tracer._dataStreamsProcessor) + ) + + span.setTag(SCHEMA_DEFINITION, schemaData.definition) + span.setTag(SCHEMA_WEIGHT, weight) + span.setTag(SCHEMA_ID, schemaData.id) + } +} + +function removeLeadingPeriod (str) { + // Check if the first character is a period + if (str.charAt(0) === '.') { + // Remove the first character + return str.slice(1) + } + // Return the original string if the first character is not a period + return str +} + +module.exports = SchemaExtractor diff --git a/packages/datadog-plugin-protobufjs/test/helpers.js b/packages/datadog-plugin-protobufjs/test/helpers.js new file mode 100644 index 00000000000..d91be2e496b --- /dev/null +++ b/packages/datadog-plugin-protobufjs/test/helpers.js @@ -0,0 +1,104 @@ +async function loadMessage (protobuf, messageTypeName) { + if (messageTypeName === 'OtherMessage') { + const root = await protobuf.load('packages/datadog-plugin-protobufjs/test/schemas/other_message.proto') + const OtherMessage = root.lookupType('OtherMessage') + const message = OtherMessage.create({ + name: ['Alice'], + age: 30 + }) + return { + OtherMessage: { + type: OtherMessage, + instance: message + } + } + } else if (messageTypeName === 'MyMessage') { + const messageProto = await protobuf.load('packages/datadog-plugin-protobufjs/test/schemas/message.proto') + const otherMessageProto = await protobuf.load( + 'packages/datadog-plugin-protobufjs/test/schemas/other_message.proto' + ) + const Status = messageProto.lookupEnum('Status') + const MyMessage = messageProto.lookupType('MyMessage') + const OtherMessage = otherMessageProto.lookupType('OtherMessage') + const message = MyMessage.create({ + id: '123', + value: 'example_value', + status: Status.values.ACTIVE, + otherMessage: [ + OtherMessage.create({ name: ['Alice'], age: 30 }), + OtherMessage.create({ name: ['Bob'], age: 25 }) + ] + }) + return { + OtherMessage: { + type: OtherMessage, + instance: null + }, + MyMessage: { + type: MyMessage, + instance: message + } + } + } else if (messageTypeName === 'MainMessage') { + const root = await protobuf.load('packages/datadog-plugin-protobufjs/test/schemas/all_types.proto') + + const Status = root.lookupEnum('example.Status') + const Scalars = root.lookupType('example.Scalars') + const NestedMessage = root.lookupType('example.NestedMessage') + const ComplexMessage = root.lookupType('example.ComplexMessage') + const MainMessage = root.lookupType('example.MainMessage') + + // Create instances of the messages + const scalarsInstance = Scalars.create({ + int32Field: 42, + int64Field: 123456789012345, + uint32Field: 123, + uint64Field: 123456789012345, + sint32Field: -42, + sint64Field: -123456789012345, + fixed32Field: 42, + fixed64Field: 123456789012345, + sfixed32Field: -42, + sfixed64Field: -123456789012345, + floatField: 3.14, + doubleField: 2.718281828459, + boolField: true, + stringField: 'Hello, world!', + bytesField: Buffer.from('bytes data') + }) + + const nestedMessageInstance = NestedMessage.create({ + id: 'nested_id_123', + scalars: scalarsInstance + }) + + const complexMessageInstance = ComplexMessage.create({ + repeatedField: ['item1', 'item2', 'item3'], + mapField: { + key1: scalarsInstance, + key2: Scalars.create({ + int32Field: 24, + stringField: 'Another string' + }) + } + }) + + const mainMessageInstance = MainMessage.create({ + status: Status.values.ACTIVE, + scalars: scalarsInstance, + nested: nestedMessageInstance, + complex: complexMessageInstance + }) + + return { + MainMessage: { + type: MainMessage, + instance: mainMessageInstance + } + } + } +} + +module.exports = { + loadMessage +} diff --git a/packages/datadog-plugin-protobufjs/test/index.spec.js b/packages/datadog-plugin-protobufjs/test/index.spec.js new file mode 100644 index 00000000000..30e95687bac --- /dev/null +++ b/packages/datadog-plugin-protobufjs/test/index.spec.js @@ -0,0 +1,352 @@ +'use strict' + +const fs = require('fs') +const { expect } = require('chai') +const agent = require('../../dd-trace/test/plugins/agent') +const path = require('path') +const { + SCHEMA_DEFINITION, + SCHEMA_ID, + SCHEMA_NAME, + SCHEMA_OPERATION, + SCHEMA_WEIGHT, + SCHEMA_TYPE +} = require('../../dd-trace/src/constants') +const sinon = require('sinon') +const { loadMessage } = require('./helpers') +const { SchemaBuilder } = require('../../dd-trace/src/datastreams/schemas/schema_builder') + +const schemas = JSON.parse(fs.readFileSync(path.join(__dirname, 'schemas/expected_schemas.json'), 'utf8')) +const MESSAGE_SCHEMA_DEF = schemas.MESSAGE_SCHEMA_DEF +const OTHER_MESSAGE_SCHEMA_DEF = schemas.OTHER_MESSAGE_SCHEMA_DEF +const ALL_TYPES_MESSAGE_SCHEMA_DEF = schemas.ALL_TYPES_MESSAGE_SCHEMA_DEF + +const MESSAGE_SCHEMA_ID = '666607144722735562' +const OTHER_MESSAGE_SCHEMA_ID = '2691489402935632768' +const ALL_TYPES_MESSAGE_SCHEMA_ID = '15890948796193489151' + +function compareJson (expected, span) { + const actual = JSON.parse(span.context()._tags[SCHEMA_DEFINITION]) + return JSON.stringify(actual) === JSON.stringify(expected) +} + +describe('Plugin', () => { + describe('protobufjs', function () { + let tracer + let protobuf + let dateNowStub + let mockTime = 0 + + withVersions('protobufjs', ['protobufjs'], (version) => { + before(() => { + tracer = require('../../dd-trace').init() + // reset sampled schemas + if (tracer._dataStreamsProcessor?._schemaSamplers) { + tracer._dataStreamsProcessor._schemaSamplers = [] + } + }) + + describe('without configuration', () => { + before(() => { + dateNowStub = sinon.stub(Date, 'now').callsFake(() => { + const returnValue = mockTime + mockTime += 50000 // Increment by 50000 ms to ensure each DSM schema is sampled + return returnValue + }) + const cache = SchemaBuilder.getCache() + cache.clear() + return agent.load('protobufjs').then(() => { + protobuf = require(`../../../versions/protobufjs@${version}`).get() + }) + }) + + after(() => { + dateNowStub.restore() + return agent.close({ ritmReset: false }) + }) + + it('should serialize basic schema correctly', async () => { + const loadedMessages = await loadMessage(protobuf, 'OtherMessage') + + tracer.trace('other_message.serialize', span => { + loadedMessages.OtherMessage.type.encode(loadedMessages.OtherMessage.instance).finish() + + expect(span._name).to.equal('other_message.serialize') + + expect(compareJson(OTHER_MESSAGE_SCHEMA_DEF, span)).to.equal(true) + expect(span.context()._tags).to.have.property(SCHEMA_TYPE, 'protobuf') + expect(span.context()._tags).to.have.property(SCHEMA_NAME, 'OtherMessage') + expect(span.context()._tags).to.have.property(SCHEMA_OPERATION, 'serialization') + expect(span.context()._tags).to.have.property(SCHEMA_ID, OTHER_MESSAGE_SCHEMA_ID) + expect(span.context()._tags).to.have.property(SCHEMA_WEIGHT, 1) + }) + }) + + it('should load using a callback instead of promise', async () => { + const loadedMessages = loadMessage(protobuf, 'OtherMessage', () => { + tracer.trace('other_message.serialize', span => { + loadedMessages.OtherMessage.type.encode(loadedMessages.OtherMessage.instance).finish() + + expect(span._name).to.equal('other_message.serialize') + + expect(compareJson(OTHER_MESSAGE_SCHEMA_DEF, span)).to.equal(true) + expect(span.context()._tags).to.have.property(SCHEMA_TYPE, 'protobuf') + expect(span.context()._tags).to.have.property(SCHEMA_NAME, 'OtherMessage') + expect(span.context()._tags).to.have.property(SCHEMA_OPERATION, 'serialization') + expect(span.context()._tags).to.have.property(SCHEMA_ID, OTHER_MESSAGE_SCHEMA_ID) + expect(span.context()._tags).to.have.property(SCHEMA_WEIGHT, 1) + }) + }) + }) + + it('should serialize complex schema correctly', async () => { + const loadedMessages = await loadMessage(protobuf, 'MyMessage') + + tracer.trace('message_pb2.serialize', span => { + loadedMessages.MyMessage.type.encode(loadedMessages.MyMessage.instance).finish() + + expect(span._name).to.equal('message_pb2.serialize') + + expect(compareJson(MESSAGE_SCHEMA_DEF, span)).to.equal(true) + expect(span.context()._tags).to.have.property(SCHEMA_TYPE, 'protobuf') + expect(span.context()._tags).to.have.property(SCHEMA_NAME, 'MyMessage') + expect(span.context()._tags).to.have.property(SCHEMA_OPERATION, 'serialization') + expect(span.context()._tags).to.have.property(SCHEMA_ID, MESSAGE_SCHEMA_ID) + expect(span.context()._tags).to.have.property(SCHEMA_WEIGHT, 1) + }) + }) + + it('should serialize schema with all types correctly', async () => { + const loadedMessages = await loadMessage(protobuf, 'MainMessage') + + tracer.trace('all_types.serialize', span => { + loadedMessages.MainMessage.type.encode(loadedMessages.MainMessage.instance).finish() + + expect(span._name).to.equal('all_types.serialize') + + expect(compareJson(ALL_TYPES_MESSAGE_SCHEMA_DEF, span)).to.equal(true) + expect(span.context()._tags).to.have.property(SCHEMA_TYPE, 'protobuf') + expect(span.context()._tags).to.have.property(SCHEMA_NAME, 'example.MainMessage') + expect(span.context()._tags).to.have.property(SCHEMA_OPERATION, 'serialization') + expect(span.context()._tags).to.have.property(SCHEMA_ID, ALL_TYPES_MESSAGE_SCHEMA_ID) + expect(span.context()._tags).to.have.property(SCHEMA_WEIGHT, 1) + }) + }) + + it('should deserialize basic schema correctly', async () => { + const loadedMessages = await loadMessage(protobuf, 'OtherMessage') + + const bytes = loadedMessages.OtherMessage.type.encode(loadedMessages.OtherMessage.instance).finish() + + tracer.trace('other_message.deserialize', span => { + loadedMessages.OtherMessage.type.decode(bytes) + + expect(span._name).to.equal('other_message.deserialize') + + expect(compareJson(OTHER_MESSAGE_SCHEMA_DEF, span)).to.equal(true) + expect(span.context()._tags).to.have.property(SCHEMA_TYPE, 'protobuf') + expect(span.context()._tags).to.have.property(SCHEMA_NAME, 'OtherMessage') + expect(span.context()._tags).to.have.property(SCHEMA_OPERATION, 'deserialization') + expect(span.context()._tags).to.have.property(SCHEMA_ID, OTHER_MESSAGE_SCHEMA_ID) + expect(span.context()._tags).to.have.property(SCHEMA_WEIGHT, 1) + }) + }) + + it('should deserialize complex schema correctly', async () => { + const loadedMessages = await loadMessage(protobuf, 'MyMessage') + + const bytes = loadedMessages.MyMessage.type.encode(loadedMessages.MyMessage.instance).finish() + + tracer.trace('my_message.deserialize', span => { + loadedMessages.MyMessage.type.decode(bytes) + + expect(span._name).to.equal('my_message.deserialize') + + expect(compareJson(MESSAGE_SCHEMA_DEF, span)).to.equal(true) + expect(span.context()._tags).to.have.property(SCHEMA_TYPE, 'protobuf') + expect(span.context()._tags).to.have.property(SCHEMA_NAME, 'MyMessage') + expect(span.context()._tags).to.have.property(SCHEMA_OPERATION, 'deserialization') + expect(span.context()._tags).to.have.property(SCHEMA_ID, MESSAGE_SCHEMA_ID) + expect(span.context()._tags).to.have.property(SCHEMA_WEIGHT, 1) + }) + }) + + it('should deserialize all types schema correctly', async () => { + const loadedMessages = await loadMessage(protobuf, 'MainMessage') + + const bytes = loadedMessages.MainMessage.type.encode(loadedMessages.MainMessage.instance).finish() + + tracer.trace('all_types.deserialize', span => { + loadedMessages.MainMessage.type.decode(bytes) + + expect(span._name).to.equal('all_types.deserialize') + + expect(compareJson(ALL_TYPES_MESSAGE_SCHEMA_DEF, span)).to.equal(true) + expect(span.context()._tags).to.have.property(SCHEMA_TYPE, 'protobuf') + expect(span.context()._tags).to.have.property(SCHEMA_NAME, 'example.MainMessage') + expect(span.context()._tags).to.have.property(SCHEMA_OPERATION, 'deserialization') + expect(span.context()._tags).to.have.property(SCHEMA_ID, ALL_TYPES_MESSAGE_SCHEMA_ID) + expect(span.context()._tags).to.have.property(SCHEMA_WEIGHT, 1) + }) + }) + + it('should wrap encode and decode for fromObject', async () => { + const root = await protobuf.load('packages/datadog-plugin-protobufjs/test/schemas/other_message.proto') + const OtherMessage = root.lookupType('OtherMessage') + const messageObject = { + name: ['Alice'], + age: 30 + } + const message = OtherMessage.fromObject(messageObject) + + const bytes = OtherMessage.encode(message).finish() + + tracer.trace('other_message.deserialize', span => { + OtherMessage.decode(bytes) + + expect(span._name).to.equal('other_message.deserialize') + + expect(compareJson(OTHER_MESSAGE_SCHEMA_DEF, span)).to.equal(true) + expect(span.context()._tags).to.have.property(SCHEMA_TYPE, 'protobuf') + expect(span.context()._tags).to.have.property(SCHEMA_NAME, 'OtherMessage') + expect(span.context()._tags).to.have.property(SCHEMA_OPERATION, 'deserialization') + expect(span.context()._tags).to.have.property(SCHEMA_ID, OTHER_MESSAGE_SCHEMA_ID) + expect(span.context()._tags).to.have.property(SCHEMA_WEIGHT, 1) + }) + }) + + it('should wrap decodeDelimited', async () => { + const root = await protobuf.load('packages/datadog-plugin-protobufjs/test/schemas/other_message.proto') + const OtherMessage = root.lookupType('OtherMessage') + const message = OtherMessage.create({ + name: ['Alice'], + age: 30 + }) + + const bytes = OtherMessage.encodeDelimited(message).finish() + + tracer.trace('other_message.deserialize', span => { + OtherMessage.decodeDelimited(bytes) + + expect(span._name).to.equal('other_message.deserialize') + + expect(compareJson(OTHER_MESSAGE_SCHEMA_DEF, span)).to.equal(true) + expect(span.context()._tags).to.have.property(SCHEMA_TYPE, 'protobuf') + expect(span.context()._tags).to.have.property(SCHEMA_NAME, 'OtherMessage') + expect(span.context()._tags).to.have.property(SCHEMA_OPERATION, 'deserialization') + expect(span.context()._tags).to.have.property(SCHEMA_ID, OTHER_MESSAGE_SCHEMA_ID) + expect(span.context()._tags).to.have.property(SCHEMA_WEIGHT, 1) + }) + }) + + it('should load using direct type creation', () => { + const OtherMessage = new protobuf.Type('OtherMessage') + .add(new protobuf.Field('name', 1, 'string', 'repeated')) + .add(new protobuf.Field('age', 2, 'int32')) + + const message = OtherMessage.create({ + name: ['Alice'], + age: 30 + }) + + const bytes = OtherMessage.encodeDelimited(message).finish() + + tracer.trace('other_message.deserialize', span => { + OtherMessage.decodeDelimited(bytes) + + expect(span._name).to.equal('other_message.deserialize') + + expect(compareJson(OTHER_MESSAGE_SCHEMA_DEF, span)).to.equal(true) + expect(span.context()._tags).to.have.property(SCHEMA_TYPE, 'protobuf') + expect(span.context()._tags).to.have.property(SCHEMA_NAME, 'OtherMessage') + expect(span.context()._tags).to.have.property(SCHEMA_OPERATION, 'deserialization') + expect(span.context()._tags).to.have.property(SCHEMA_ID, OTHER_MESSAGE_SCHEMA_ID) + expect(span.context()._tags).to.have.property(SCHEMA_WEIGHT, 1) + }) + }) + + it('should load using JSON descriptors', () => { + const jsonDescriptor = require('./schemas/other_message_proto.json') + const root = protobuf.Root.fromJSON(jsonDescriptor) + const OtherMessage = root.lookupType('OtherMessage') + + const message = OtherMessage.create({ + name: ['Alice'], + age: 30 + }) + + const bytes = OtherMessage.encodeDelimited(message).finish() + + tracer.trace('other_message.deserialize', span => { + OtherMessage.decodeDelimited(bytes) + + expect(span._name).to.equal('other_message.deserialize') + + expect(compareJson(OTHER_MESSAGE_SCHEMA_DEF, span)).to.equal(true) + expect(span.context()._tags).to.have.property(SCHEMA_TYPE, 'protobuf') + expect(span.context()._tags).to.have.property(SCHEMA_NAME, 'OtherMessage') + expect(span.context()._tags).to.have.property(SCHEMA_OPERATION, 'deserialization') + expect(span.context()._tags).to.have.property(SCHEMA_ID, OTHER_MESSAGE_SCHEMA_ID) + expect(span.context()._tags).to.have.property(SCHEMA_WEIGHT, 1) + }) + }) + + describe('during schema sampling', function () { + let cacheSetSpy + let cacheGetSpy + + beforeEach(() => { + const cache = SchemaBuilder.getCache() + cache.clear() + cacheSetSpy = sinon.spy(cache, 'set') + cacheGetSpy = sinon.spy(cache, 'get') + }) + + afterEach(() => { + cacheSetSpy.restore() + cacheGetSpy.restore() + }) + + it('should use the schema cache and not re-extract an already sampled schema', async () => { + const loadedMessages = await loadMessage(protobuf, 'MyMessage') + + tracer.trace('message_pb2.serialize', span => { + loadedMessages.MyMessage.type.encode(loadedMessages.MyMessage.instance).finish() + + expect(span._name).to.equal('message_pb2.serialize') + + expect(compareJson(MESSAGE_SCHEMA_DEF, span)).to.equal(true) + expect(span.context()._tags).to.have.property(SCHEMA_TYPE, 'protobuf') + expect(span.context()._tags).to.have.property(SCHEMA_NAME, 'MyMessage') + expect(span.context()._tags).to.have.property(SCHEMA_OPERATION, 'serialization') + expect(span.context()._tags).to.have.property(SCHEMA_ID, MESSAGE_SCHEMA_ID) + expect(span.context()._tags).to.have.property(SCHEMA_WEIGHT, 1) + + // we sampled 1 schema with 1 subschema, so the constructor should've only been called twice + expect(cacheSetSpy.callCount).to.equal(2) + expect(cacheGetSpy.callCount).to.equal(2) + }) + + tracer.trace('message_pb2.serialize', span => { + loadedMessages.MyMessage.type.encode(loadedMessages.MyMessage.instance).finish() + + expect(span._name).to.equal('message_pb2.serialize') + + expect(compareJson(MESSAGE_SCHEMA_DEF, span)).to.equal(true) + expect(span.context()._tags).to.have.property(SCHEMA_TYPE, 'protobuf') + expect(span.context()._tags).to.have.property(SCHEMA_NAME, 'MyMessage') + expect(span.context()._tags).to.have.property(SCHEMA_OPERATION, 'serialization') + expect(span.context()._tags).to.have.property(SCHEMA_ID, MESSAGE_SCHEMA_ID) + expect(span.context()._tags).to.have.property(SCHEMA_WEIGHT, 1) + + // ensure schema was sampled and returned via the cache, so no extra cache set + // calls were needed, only gets + expect(cacheSetSpy.callCount).to.equal(2) + expect(cacheGetSpy.callCount).to.equal(3) + }) + }) + }) + }) + }) + }) +}) diff --git a/packages/datadog-plugin-protobufjs/test/schemas/all_types.proto b/packages/datadog-plugin-protobufjs/test/schemas/all_types.proto new file mode 100644 index 00000000000..6cfc3b3ee3d --- /dev/null +++ b/packages/datadog-plugin-protobufjs/test/schemas/all_types.proto @@ -0,0 +1,49 @@ +syntax = "proto3"; + +package example; + +// Enum definition +enum Status { + UNKNOWN = 0; + ACTIVE = 1; + INACTIVE = 2; +} + +// Message with various number types and other scalar types +message Scalars { + int32 int32Field = 1; + int64 int64Field = 2; + uint32 uint32Field = 3; + uint64 uint64Field = 4; + sint32 sint32Field = 5; + sint64 sint64Field = 6; + fixed32 fixed32Field = 7; + fixed64 fixed64Field = 8; + sfixed32 sfixed32Field = 9; + sfixed64 sfixed64Field = 10; + float floatField = 11; + double doubleField = 12; + bool boolField = 13; + string stringField = 14; + bytes bytesField = 15; +} + +// Nested message definition +message NestedMessage { + string id = 1; + Scalars scalars = 2; +} + +// Message demonstrating the use of repeated fields and maps +message ComplexMessage { + repeated string repeatedField = 1; + map mapField = 2; +} + +// Main message that uses all the above elements +message MainMessage { + Status status = 1; + Scalars scalars = 2; + NestedMessage nested = 3; + ComplexMessage complex = 4; +} \ No newline at end of file diff --git a/packages/datadog-plugin-protobufjs/test/schemas/expected_schemas.json b/packages/datadog-plugin-protobufjs/test/schemas/expected_schemas.json new file mode 100644 index 00000000000..1825013519d --- /dev/null +++ b/packages/datadog-plugin-protobufjs/test/schemas/expected_schemas.json @@ -0,0 +1,195 @@ +{ + "MESSAGE_SCHEMA_DEF":{ + "openapi":"3.0.0", + "components":{ + "schemas":{ + "MyMessage":{ + "type":"object", + "properties":{ + "id":{ + "type":"string" + }, + "value":{ + "type":"string" + }, + "otherMessage":{ + "type":"array", + "items":{ + "type":"object", + "$ref":"#/components/schemas/OtherMessage" + } + }, + "status":{ + "type":"enum", + "enum":[ + "UNKNOWN", + "ACTIVE", + "INACTIVE", + "DELETED" + ] + } + } + }, + "OtherMessage":{ + "type":"object", + "properties":{ + "name":{ + "type":"array", + "items":{ + "type":"string" + } + }, + "age":{ + "type":"integer", + "format":"int32" + } + } + } + } + } + }, + "OTHER_MESSAGE_SCHEMA_DEF":{ + "openapi":"3.0.0", + "components":{ + "schemas":{ + "OtherMessage":{ + "type":"object", + "properties":{ + "name":{ + "type":"array", + "items":{ + "type":"string" + } + }, + "age":{ + "type":"integer", + "format":"int32" + } + } + } + } + } + }, + "ALL_TYPES_MESSAGE_SCHEMA_DEF":{ + "openapi":"3.0.0", + "components":{ + "schemas":{ + "example.MainMessage":{ + "type":"object", + "properties":{ + "status":{ + "type":"enum", + "enum":[ + "UNKNOWN", + "ACTIVE", + "INACTIVE" + ] + }, + "scalars":{ + "type":"object", + "$ref":"#/components/schemas/example.Scalars" + }, + "nested":{ + "type":"object", + "$ref":"#/components/schemas/example.NestedMessage" + }, + "complex":{ + "type":"object", + "$ref":"#/components/schemas/example.ComplexMessage" + } + } + }, + "example.Scalars":{ + "type":"object", + "properties":{ + "int32Field":{ + "type":"integer", + "format":"int32" + }, + "int64Field":{ + "type":"integer", + "format":"int64" + }, + "uint32Field":{ + "type":"integer", + "format":"uint32" + }, + "uint64Field":{ + "type":"integer", + "format":"uint64" + }, + "sint32Field":{ + "type":"integer", + "format":"sint32" + }, + "sint64Field":{ + "type":"integer", + "format":"sint64" + }, + "fixed32Field":{ + "type":"integer", + "format":"fixed32" + }, + "fixed64Field":{ + "type":"integer", + "format":"fixed64" + }, + "sfixed32Field":{ + "type":"integer", + "format":"sfixed32" + }, + "sfixed64Field":{ + "type":"integer", + "format":"sfixed64" + }, + "floatField":{ + "type":"number", + "format":"float" + }, + "doubleField":{ + "type":"number", + "format":"double" + }, + "boolField":{ + "type":"boolean" + }, + "stringField":{ + "type":"string" + }, + "bytesField":{ + "type":"string", + "format":"byte" + } + } + }, + "example.NestedMessage":{ + "type":"object", + "properties":{ + "id":{ + "type":"string" + }, + "scalars":{ + "type":"object", + "$ref":"#/components/schemas/example.Scalars" + } + } + }, + "example.ComplexMessage":{ + "type":"object", + "properties":{ + "repeatedField":{ + "type":"array", + "items":{ + "type":"string" + } + }, + "mapField":{ + "type":"object", + "$ref":"#/components/schemas/example.Scalars" + } + } + } + } + } + } +} \ No newline at end of file diff --git a/packages/datadog-plugin-protobufjs/test/schemas/message.proto b/packages/datadog-plugin-protobufjs/test/schemas/message.proto new file mode 100644 index 00000000000..6fd1c65fe06 --- /dev/null +++ b/packages/datadog-plugin-protobufjs/test/schemas/message.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +import "other_message.proto"; + +enum Status { + UNKNOWN = 0; + ACTIVE = 1; + INACTIVE = 2; + DELETED = 3; +} + +message MyMessage { + string id = 1; + string value = 2; + repeated OtherMessage otherMessage = 3; + Status status = 4; +} \ No newline at end of file diff --git a/packages/datadog-plugin-protobufjs/test/schemas/other_message.proto b/packages/datadog-plugin-protobufjs/test/schemas/other_message.proto new file mode 100644 index 00000000000..dbd6f368d7d --- /dev/null +++ b/packages/datadog-plugin-protobufjs/test/schemas/other_message.proto @@ -0,0 +1,6 @@ +syntax = "proto3"; + +message OtherMessage { + repeated string name = 1; + int32 age = 2; +} \ No newline at end of file diff --git a/packages/datadog-plugin-protobufjs/test/schemas/other_message_proto.json b/packages/datadog-plugin-protobufjs/test/schemas/other_message_proto.json new file mode 100644 index 00000000000..5a682ec89ca --- /dev/null +++ b/packages/datadog-plugin-protobufjs/test/schemas/other_message_proto.json @@ -0,0 +1,17 @@ +{ + "nested": { + "OtherMessage": { + "fields": { + "name": { + "rule": "repeated", + "type": "string", + "id": 1 + }, + "age": { + "type": "int32", + "id": 2 + } + } + } + } + } \ No newline at end of file diff --git a/packages/dd-trace/src/datastreams/schemas/schema_builder.js b/packages/dd-trace/src/datastreams/schemas/schema_builder.js index a65863d4d87..092f5b45101 100644 --- a/packages/dd-trace/src/datastreams/schemas/schema_builder.js +++ b/packages/dd-trace/src/datastreams/schemas/schema_builder.js @@ -4,13 +4,36 @@ const { Schema } = require('./schema') const maxDepth = 10 const maxProperties = 1000 -const CACHE = new LRUCache({ max: 32 }) +const CACHE = new LRUCache({ max: 256 }) class SchemaBuilder { constructor (iterator) { this.schema = new OpenApiSchema() this.iterator = iterator - this.proerties = 0 + this.properties = 0 + } + + static getCache () { + return CACHE + } + + static getSchemaDefinition (schema) { + const noNones = convertToJsonCompatible(schema) + const definition = jsonStringify(noNones) + const id = fnv64(Buffer.from(definition, 'utf-8')).toString() + return new Schema(definition, id) + } + + static getSchema (schemaName, iterator, builder) { + if (!CACHE.has(schemaName)) { + CACHE.set(schemaName, (builder ?? new SchemaBuilder(iterator)).build()) + } + return CACHE.get(schemaName) + } + + build () { + this.iterator.iterateOverSchema(this) + return this.schema } addProperty (schemaName, fieldName, isArray, type, description, ref, format, enumValues) { @@ -26,14 +49,6 @@ class SchemaBuilder { return true } - build () { - this.iterator.iterateOverSchema(this) - const noNones = convertToJsonCompatible(this.schema) - const definition = jsonStringify(noNones) - const id = fnv64(Buffer.from(definition, 'utf-8')).toString() - return new Schema(definition, id) - } - shouldExtractSchema (schemaName, depth) { if (depth > maxDepth) { return false @@ -44,13 +59,6 @@ class SchemaBuilder { this.schema.components.schemas[schemaName] = new OpenApiSchema.SCHEMA() return true } - - static getSchema (schemaName, iterator) { - if (!CACHE.has(schemaName)) { - CACHE.set(schemaName, new SchemaBuilder(iterator).build()) - } - return CACHE.get(schemaName) - } } class OpenApiSchema { diff --git a/packages/dd-trace/src/plugins/index.js b/packages/dd-trace/src/plugins/index.js index 06325724b71..2e949fff7e2 100644 --- a/packages/dd-trace/src/plugins/index.js +++ b/packages/dd-trace/src/plugins/index.js @@ -77,6 +77,7 @@ module.exports = { get pino () { return require('../../../datadog-plugin-pino/src') }, get 'pino-pretty' () { return require('../../../datadog-plugin-pino/src') }, get playwright () { return require('../../../datadog-plugin-playwright/src') }, + get protobufjs () { return require('../../../datadog-plugin-protobufjs/src') }, get redis () { return require('../../../datadog-plugin-redis/src') }, get restify () { return require('../../../datadog-plugin-restify/src') }, get rhea () { return require('../../../datadog-plugin-rhea/src') }, diff --git a/packages/dd-trace/src/plugins/schema.js b/packages/dd-trace/src/plugins/schema.js new file mode 100644 index 00000000000..675ba6a715f --- /dev/null +++ b/packages/dd-trace/src/plugins/schema.js @@ -0,0 +1,35 @@ +'use strict' + +const Plugin = require('./plugin') + +const SERIALIZATION = 'serialization' +const DESERIALIZATION = 'deserialization' + +class SchemaPlugin extends Plugin { + constructor (...args) { + super(...args) + + this.addSub(`apm:${this.constructor.id}:serialize-start`, this.handleSerializeStart.bind(this)) + this.addSub(`apm:${this.constructor.id}:deserialize-end`, this.handleDeserializeFinish.bind(this)) + } + + handleSerializeStart (args) { + const activeSpan = this.tracer.scope().active() + if (activeSpan && this.config.dsmEnabled) { + this.constructor.schemaExtractor.attachSchemaOnSpan( + args, activeSpan, SERIALIZATION, this.tracer + ) + } + } + + handleDeserializeFinish (args) { + const activeSpan = this.tracer.scope().active() + if (activeSpan && this.config.dsmEnabled) { + this.constructor.schemaExtractor.attachSchemaOnSpan( + args, activeSpan, DESERIALIZATION, this.tracer + ) + } + } +} + +module.exports = SchemaPlugin diff --git a/packages/dd-trace/test/datastreams/schemas/schema_builder.spec.js b/packages/dd-trace/test/datastreams/schemas/schema_builder.spec.js index db602ef83aa..134724b593a 100644 --- a/packages/dd-trace/test/datastreams/schemas/schema_builder.spec.js +++ b/packages/dd-trace/test/datastreams/schemas/schema_builder.spec.js @@ -24,7 +24,7 @@ describe('SchemaBuilder', () => { const shouldExtractAddress = builder.shouldExtractSchema('address', 1) const shouldExtractPerson2 = builder.shouldExtractSchema('person', 0) const shouldExtractTooDeep = builder.shouldExtractSchema('city', 11) - const schema = builder.build() + const schema = SchemaBuilder.getSchemaDefinition(builder.build()) const expectedSchema = { components: {