-
Notifications
You must be signed in to change notification settings - Fork 312
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add protobuf schemas support for DSM
- Loading branch information
Showing
13 changed files
with
1,076 additions
and
686 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
const shimmer = require('../../datadog-shimmer') | ||
const { channel, addHook, AsyncResource } = require('./helpers/instrument') | ||
|
||
const startSerializeCh = channel('datadog:protobuf:serialize:start') | ||
const finishSerializeCh = channel('datadog:protobuf:serialize:finish') | ||
const startDeserializeCh = channel('datadog:protobuf:deserialize:start') | ||
const finishDeserializeCh = channel('datadog:protobuf:deserialize:finish') | ||
|
||
function wrapSerialization (Class) { | ||
shimmer.wrap(Class, 'encode', original => { | ||
return function wrappedEncode (...args) { | ||
if (!startSerializeCh.hasSubscribers) { | ||
return original.apply(this, args) | ||
} | ||
|
||
const asyncResource = new AsyncResource('bound-anonymous-fn') | ||
|
||
asyncResource.runInAsyncScope(() => { | ||
startSerializeCh.publish({ message: this }) | ||
}) | ||
|
||
try { | ||
// when applying the original encode / decode functions, protobuf sets up the classes again | ||
// causing our function wrappers to dissappear, we should verify they exist and rewrap if not | ||
const wrappedDecode = this.decode | ||
const wrappedEncode = this.encode | ||
const result = original.apply(this, args) | ||
ensureMessageIsWrapped(this, wrappedEncode, wrappedDecode) | ||
|
||
if (original) { | ||
asyncResource.runInAsyncScope(() => { | ||
finishSerializeCh.publish({ message: this }) | ||
}) | ||
} | ||
return result | ||
} catch (err) { | ||
asyncResource.runInAsyncScope(() => { | ||
finishSerializeCh.publish({ message: this }) | ||
}) | ||
throw err | ||
} | ||
} | ||
}) | ||
} | ||
|
||
function ensureMessageIsWrapped (messageClass, wrappedEncode, wrappedDecode) { | ||
if (messageClass.encode !== wrappedEncode) { | ||
messageClass.encode = wrappedEncode | ||
} | ||
|
||
if (messageClass.decode !== wrappedDecode) { | ||
messageClass.decode = wrappedDecode | ||
} | ||
} | ||
|
||
function wrapDeserialization (Class) { | ||
shimmer.wrap(Class, 'decode', original => { | ||
return function wrappedDecode (...args) { | ||
if (!startDeserializeCh.hasSubscribers) { | ||
return original.apply(this, args) | ||
} | ||
|
||
const asyncResource = new AsyncResource('bound-anonymous-fn') | ||
|
||
asyncResource.runInAsyncScope(() => { | ||
startDeserializeCh.publish({ buffer: args[0] }) | ||
}) | ||
|
||
try { | ||
// when applying the original encode / decode functions, protobuf sets up the classes again | ||
// causing our function wrappers to dissappear, we should verify they exist and rewrap if not | ||
|
||
const wrappedDecode = this.decode | ||
const wrappedEncode = this.encode | ||
const result = original.apply(this, args) | ||
ensureMessageIsWrapped(this, wrappedEncode, wrappedDecode) | ||
|
||
asyncResource.runInAsyncScope(() => { | ||
finishDeserializeCh.publish({ message: result }) | ||
}) | ||
return result | ||
} catch (err) { | ||
asyncResource.runInAsyncScope(() => { | ||
finishDeserializeCh.publish({ buffer: args[0] }) | ||
}) | ||
throw err | ||
} | ||
} | ||
}) | ||
} | ||
|
||
function wrapProtobufClasses (root) { | ||
if (!root) { | ||
// pass | ||
} else if (root.decode) { | ||
wrapSerialization(root) | ||
wrapDeserialization(root) | ||
} else if (root.nestedArray) { | ||
for (const subRoot of root.nestedArray) { | ||
wrapProtobufClasses(subRoot) | ||
} | ||
} | ||
} | ||
|
||
addHook({ | ||
name: 'protobufjs', | ||
versions: ['>=6.0.0'] | ||
}, protobuf => { | ||
shimmer.wrap(protobuf.Root.prototype, 'load', original => { | ||
return function wrappedLoad (...args) { | ||
const result = original.apply(this, args) | ||
result.then(root => { | ||
wrapProtobufClasses(root) | ||
}) | ||
return result | ||
} | ||
}) | ||
|
||
shimmer.wrap(protobuf.Root.prototype, 'loadSync', original => { | ||
return function wrappedLoadSync (...args) { | ||
const root = original.apply(this, args) | ||
wrapProtobufClasses(root) | ||
return root | ||
} | ||
}) | ||
|
||
return protobuf | ||
}) |
Oops, something went wrong.