Skip to content

Commit

Permalink
internal/civisibility: changes and improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyredondo committed Nov 6, 2024
1 parent fed61eb commit 925f2c2
Show file tree
Hide file tree
Showing 15 changed files with 143 additions and 23 deletions.
13 changes: 12 additions & 1 deletion ddtrace/tracer/civisibility_payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ package tracer
import (
"bytes"
"sync/atomic"
"time"

"github.com/tinylib/msgp/msgp"
"gopkg.in/DataDog/dd-trace-go.v1/internal/civisibility/constants"
"gopkg.in/DataDog/dd-trace-go.v1/internal/civisibility/utils"
"gopkg.in/DataDog/dd-trace-go.v1/internal/civisibility/utils/telemetry"
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/version"
Expand All @@ -21,6 +23,7 @@ import (
// It embeds the generic payload structure and adds methods to handle CI Visibility specific data.
type ciVisibilityPayload struct {
*payload
serializationTime time.Duration
}

// push adds a new CI Visibility event to the payload buffer.
Expand All @@ -35,6 +38,10 @@ type ciVisibilityPayload struct {
// An error if encoding the event fails.
func (p *ciVisibilityPayload) push(event *ciVisibilityEvent) error {
p.buf.Grow(event.Msgsize())
startTime := time.Now()
defer func() {
p.serializationTime += time.Since(startTime)
}()
if err := msgp.Encode(&p.buf, event); err != nil {
return err
}
Expand All @@ -50,7 +57,7 @@ func (p *ciVisibilityPayload) push(event *ciVisibilityEvent) error {
// A pointer to a newly initialized civisibilitypayload instance.
func newCiVisibilityPayload() *ciVisibilityPayload {
log.Debug("ciVisibilityPayload: creating payload instance")
return &ciVisibilityPayload{newPayload()}
return &ciVisibilityPayload{newPayload(), 0}
}

// getBuffer retrieves the complete body of the CI Visibility payload, including metadata.
Expand All @@ -65,6 +72,7 @@ func newCiVisibilityPayload() *ciVisibilityPayload {
// A pointer to a bytes.Buffer containing the encoded CI Visibility payload.
// An error if reading from the buffer or encoding the payload fails.
func (p *ciVisibilityPayload) getBuffer(config *config) (*bytes.Buffer, error) {
startTime := time.Now()
log.Debug("ciVisibilityPayload: .getBuffer (count: %v)", p.itemCount())

// Create a buffer to read the current payload
Expand All @@ -82,6 +90,9 @@ func (p *ciVisibilityPayload) getBuffer(config *config) (*bytes.Buffer, error) {
return nil, err
}

telemetry.EndpointPayloadEventsCount(telemetry.TestCycleEndpointType, float64(p.itemCount()))
telemetry.EndpointPayloadBytes(telemetry.TestCycleEndpointType, float64(encodedBuf.Len()))
telemetry.EndpointEventsSerializationMs(telemetry.TestCycleEndpointType, float64(p.serializationTime.Milliseconds()+time.Since(startTime).Milliseconds()))
return encodedBuf, nil
}

Expand Down
7 changes: 5 additions & 2 deletions ddtrace/tracer/civisibility_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@ import (
"bytes"
"compress/gzip"
"fmt"
"gopkg.in/DataDog/dd-trace-go.v1/internal/civisibility/utils/telemetry"
"io"
"net/http"
"os"
"runtime"
"strings"
"time"

pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/civisibility/constants"
"gopkg.in/DataDog/dd-trace-go.v1/internal/civisibility/utils/telemetry"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/version"
)
Expand Down Expand Up @@ -127,7 +128,7 @@ func newCiVisibilityTransport(config *config) *ciVisibilityTransport {
//
// An io.ReadCloser for reading the response body, and an error if the operation fails.
func (t *ciVisibilityTransport) send(p *payload) (body io.ReadCloser, err error) {
ciVisibilityPayload := &ciVisibilityPayload{p}
ciVisibilityPayload := &ciVisibilityPayload{p, 0}
buffer, bufferErr := ciVisibilityPayload.getBuffer(t.config)
if bufferErr != nil {
return nil, fmt.Errorf("cannot create buffer payload: %v", bufferErr)
Expand Down Expand Up @@ -160,7 +161,9 @@ func (t *ciVisibilityTransport) send(p *payload) (body io.ReadCloser, err error)
}

log.Debug("ciVisibilityTransport: sending transport request: %v bytes", buffer.Len())
startTime := time.Now()
response, err := t.config.httpClient.Do(req)
telemetry.EndpointPayloadRequestsMs(telemetry.TestCycleEndpointType, float64(time.Since(startTime).Milliseconds()))
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"encoding/binary"
"io"
"sync/atomic"
"time"

"github.com/tinylib/msgp/msgp"
"gopkg.in/DataDog/dd-trace-go.v1/internal/civisibility/utils/telemetry"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
)

Expand All @@ -33,6 +35,9 @@ type coveragePayload struct {

// reader is used for reading the contents of buf.
reader *bytes.Reader

// serializationTime time to do serialization
serializationTime time.Duration
}

var _ io.Reader = (*coveragePayload)(nil)
Expand All @@ -49,6 +54,10 @@ func newCoveragePayload() *coveragePayload {
// push pushes a new item into the stream.
func (p *coveragePayload) push(testCoverageData *ciTestCoverageData) error {
p.buf.Grow(testCoverageData.Msgsize())
startTime := time.Now()
defer func() {
p.serializationTime += time.Since(startTime)
}()
if err := msgp.Encode(&p.buf, testCoverageData); err != nil {
return err
}
Expand Down Expand Up @@ -137,6 +146,7 @@ func (p *coveragePayload) Read(b []byte) (n int, err error) {
// A pointer to a bytes.Buffer containing the encoded CI Visibility coverage payload.
// An error if reading from the buffer or encoding the payload fails.
func (p *coveragePayload) getBuffer() (*bytes.Buffer, error) {
startTime := time.Now()
log.Debug("coveragePayload: .getBuffer (count: %v)", p.itemCount())

// Create a buffer to read the current payload
Expand All @@ -157,5 +167,8 @@ func (p *coveragePayload) getBuffer() (*bytes.Buffer, error) {
return nil, err
}

telemetry.EndpointPayloadBytes(telemetry.CodeCoverageEndpointType, float64(encodedBuf.Len()))
telemetry.EndpointPayloadEventsCount(telemetry.CodeCoverageEndpointType, float64(p.itemCount()))
telemetry.EndpointEventsSerializationMs(telemetry.CodeCoverageEndpointType, float64(p.serializationTime.Milliseconds()+time.Since(startTime).Milliseconds()))
return encodedBuf, nil
}
3 changes: 3 additions & 0 deletions internal/civisibility/integrations/gotesting/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"gopkg.in/DataDog/dd-trace-go.v1/internal/civisibility/integrations"
"gopkg.in/DataDog/dd-trace-go.v1/internal/civisibility/integrations/gotesting/coverage"
"gopkg.in/DataDog/dd-trace-go.v1/internal/civisibility/utils"
"gopkg.in/DataDog/dd-trace-go.v1/internal/civisibility/utils/telemetry"
)

const (
Expand Down Expand Up @@ -225,9 +226,11 @@ func (ddm *M) executeInternalTest(testInfo *testingTInfo) func(*testing.T) {
session.SetTag(constants.ITRTestsSkippingCount, numOfTestsSkipped.Add(1))
checkModuleAndSuite(module, suite)
t.Skip(constants.SkippedByITRReason)
telemetry.ITRSkipped(telemetry.TestEventType)
return
} else {
test.SetTag(constants.TestForcedToRun, "true")
telemetry.ITRForcedRun(telemetry.TestEventType)
}
}

Expand Down
1 change: 1 addition & 0 deletions internal/civisibility/integrations/manual_api_ddtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func (t *tslvTest) SetTestFunc(fn *runtime.Func) {
// if the function is marked as unskippable, set the appropriate tag
if isUnskippable {
t.SetTag(constants.TestUnskippable, "true")
telemetry.ITRUnskippable(telemetry.TestEventType)
t.ctx = context.WithValue(t.ctx, constants.TestUnskippable, true)
}
}
Expand Down
2 changes: 2 additions & 0 deletions internal/civisibility/utils/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ func isGitFound() bool {
// execGit executes a Git command with the given arguments.
func execGit(commandType telemetry.CommandType, args ...string) (val []byte, err error) {
if commandType != telemetry.NotSpecifiedCommandsType {
startTime := time.Now()
telemetry.GitCommand(commandType)
defer func() {
telemetry.GitCommandMs(commandType, float64(time.Since(startTime).Milliseconds()))
var exitErr *exec.ExitError
if errors.As(err, &exitErr) {
switch exitErr.ExitCode() {
Expand Down
4 changes: 4 additions & 0 deletions internal/civisibility/utils/net/coverage.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"gopkg.in/DataDog/dd-trace-go.v1/internal/civisibility/utils/telemetry"
"io"
"time"
)

const (
Expand Down Expand Up @@ -60,7 +61,10 @@ func (c *client) SendCoveragePayload(ciTestCovPayload io.Reader) error {
telemetry.EndpointPayloadRequests(telemetry.CodeCoverageUncompressedEndpointType)
}

startTime := time.Now()
response, responseErr := c.handler.SendRequest(request)
telemetry.EndpointPayloadRequestsMs(telemetry.CodeCoverageEndpointType, float64(time.Since(startTime).Milliseconds()))

if responseErr != nil {
telemetry.EndpointPayloadRequestsErrors(telemetry.CodeCoverageEndpointType, telemetry.NetworkErrorType)
telemetry.EndpointPayloadDropped(telemetry.CodeCoverageEndpointType)
Expand Down
16 changes: 15 additions & 1 deletion internal/civisibility/utils/net/efd_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package net

import (
"fmt"

"gopkg.in/DataDog/dd-trace-go.v1/internal/civisibility/utils/telemetry"
)

const (
Expand Down Expand Up @@ -62,11 +64,23 @@ func (c *client) GetEarlyFlakeDetectionData() (*EfdResponseData, error) {
},
}

response, err := c.handler.SendRequest(*c.getPostRequestConfig(efdURLPath, body))
request := c.getPostRequestConfig(efdURLPath, body)
if request.Compressed {
telemetry.EarlyFlakeDetectionRequest(telemetry.CompressedRequestCompressedType)
} else {
telemetry.EarlyFlakeDetectionRequest(telemetry.UncompressedRequestCompressedType)
}

response, err := c.handler.SendRequest(*request)
if err != nil {
telemetry.EarlyFlakeDetectionRequestErrors(telemetry.NetworkErrorType)
return nil, fmt.Errorf("sending early flake detection request: %s", err.Error())
}

if response.StatusCode < 200 || response.StatusCode >= 300 {
telemetry.EarlyFlakeDetectionRequestErrors(telemetry.GetErrorTypeFromStatusCode(response.StatusCode))
}

var responseObject efdResponse
err = response.Unmarshal(&responseObject)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion internal/civisibility/utils/net/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type Response struct {
Format string // Format of the response (json or msgpack)
StatusCode int // HTTP status code
CanUnmarshal bool // Whether the response body can be unmarshalled
Compressed bool // Whether to use gzip compression
}

// Unmarshal deserializes the response body into the provided target based on the response format.
Expand Down Expand Up @@ -269,7 +270,9 @@ func (rh *RequestHandler) internalSendRequest(config *RequestConfig, attempt int
}

// Decompress response if it is gzip compressed
compressedResponse := false
if resp.Header.Get(HeaderContentEncoding) == ContentEncodingGzip {
compressedResponse = true
responseBody, err = decompressData(responseBody)
if err != nil {
return true, nil, err
Expand All @@ -294,7 +297,7 @@ func (rh *RequestHandler) internalSendRequest(config *RequestConfig, attempt int
canUnmarshal := statusCode >= 200 && statusCode < 300

// Return the successful response with status code and unmarshal capability
return true, &Response{Body: responseBody, Format: responseFormat, StatusCode: statusCode, CanUnmarshal: canUnmarshal}, nil
return true, &Response{Body: responseBody, Format: responseFormat, StatusCode: statusCode, CanUnmarshal: canUnmarshal, Compressed: compressedResponse}, nil
}

// Helper functions for data serialization, compression, and handling multipart form data
Expand Down
24 changes: 23 additions & 1 deletion internal/civisibility/utils/net/searchcommits_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ package net

import (
"fmt"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/internal/civisibility/utils/telemetry"
)

const (
Expand Down Expand Up @@ -43,11 +46,30 @@ func (c *client) GetCommits(localCommits []string) ([]string, error) {
})
}

response, err := c.handler.SendRequest(*c.getPostRequestConfig(searchCommitsURLPath, body))
request := c.getPostRequestConfig(searchCommitsURLPath, body)
if request.Compressed {
telemetry.GitRequestsSearchCommits(telemetry.CompressedRequestCompressedType)
} else {
telemetry.GitRequestsSearchCommits(telemetry.UncompressedRequestCompressedType)
}

startTime := time.Now()
response, err := c.handler.SendRequest(*request)
if err != nil {
telemetry.GitRequestsSearchCommitsErrors(telemetry.NetworkErrorType)
return nil, fmt.Errorf("sending search commits request: %s", err.Error())
}

if response.Compressed {
telemetry.GitRequestsSearchCommitsMs(telemetry.CompressedResponseCompressedType, float64(time.Since(startTime).Milliseconds()))
} else {
telemetry.GitRequestsSearchCommitsMs(telemetry.UncompressedResponseCompressedType, float64(time.Since(startTime).Milliseconds()))
}

if response.StatusCode < 200 || response.StatusCode >= 300 {
telemetry.GitRequestsSearchCommitsErrors(telemetry.GetErrorTypeFromStatusCode(response.StatusCode))
}

var responseObject searchCommits
err = response.Unmarshal(&responseObject)
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions internal/civisibility/utils/net/sendpackfiles_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package net
import (
"fmt"
"os"

"gopkg.in/DataDog/dd-trace-go.v1/internal/civisibility/utils/telemetry"
)

const (
Expand Down Expand Up @@ -74,13 +76,21 @@ func (c *client) SendPackFiles(commitSha string, packFiles []string) (bytes int6
Backoff: DefaultBackoff,
}

if request.Compressed {
telemetry.GitRequestsObjectsPack(telemetry.CompressedRequestCompressedType)
} else {
telemetry.GitRequestsObjectsPack(telemetry.UncompressedRequestCompressedType)
}

response, responseErr := c.handler.SendRequest(request)
if responseErr != nil {
telemetry.GitRequestsObjectsPackErrors(telemetry.NetworkErrorType)
err = fmt.Errorf("failed to send packfile request: %s", responseErr.Error())
return
}

if response.StatusCode < 200 || response.StatusCode >= 300 {
telemetry.GitRequestsObjectsPackErrors(telemetry.GetErrorTypeFromStatusCode(response.StatusCode))
err = fmt.Errorf("unexpected response code %d: %s", response.StatusCode, string(response.Body))
}

Expand Down
31 changes: 30 additions & 1 deletion internal/civisibility/utils/net/settings_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ package net

import (
"fmt"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/internal/civisibility/utils/telemetry"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
)

Expand Down Expand Up @@ -78,11 +81,25 @@ func (c *client) GetSettings() (*SettingsResponseData, error) {
},
}

response, err := c.handler.SendRequest(*c.getPostRequestConfig(settingsURLPath, body))
request := c.getPostRequestConfig(settingsURLPath, body)
if request.Compressed {
telemetry.GitRequestsSettings(telemetry.CompressedRequestCompressedType)
} else {
telemetry.GitRequestsSettings(telemetry.UncompressedRequestCompressedType)
}

startTime := time.Now()
response, err := c.handler.SendRequest(*request)
telemetry.GitRequestsSettingsMs(float64(time.Since(startTime).Milliseconds()))
if err != nil {
telemetry.GitRequestsSettingsErrors(telemetry.NetworkErrorType)
return nil, fmt.Errorf("sending get settings request: %s", err.Error())
}

if response.StatusCode < 200 || response.StatusCode >= 300 {
telemetry.GitRequestsSettingsErrors(telemetry.GetErrorTypeFromStatusCode(response.StatusCode))
}

var responseObject settingsResponse
err = response.Unmarshal(&responseObject)
if err != nil {
Expand All @@ -92,5 +109,17 @@ func (c *client) GetSettings() (*SettingsResponseData, error) {
if log.DebugEnabled() {
log.Debug("civisibility.settings: %s", string(response.Body))
}

var settingsResponseType telemetry.SettingsResponseType
if responseObject.Data.Attributes.CodeCoverage {
settingsResponseType = append(settingsResponseType, telemetry.CoverageEnabledSettingsResponseType...)
}
if responseObject.Data.Attributes.TestsSkipping {
settingsResponseType = append(settingsResponseType, telemetry.ItrSkipEnabledSettingsResponseType...)
}
if responseObject.Data.Attributes.EarlyFlakeDetection.Enabled {
settingsResponseType = append(settingsResponseType, telemetry.EfdEnabledSettingsResponseType...)
}
telemetry.GitRequestsSettingsResponse(settingsResponseType)
return &responseObject.Data.Attributes, nil
}
Loading

0 comments on commit 925f2c2

Please sign in to comment.