Skip to content

Commit

Permalink
feat(trace): add ServiceName field to Span model for better trace con…
Browse files Browse the repository at this point in the history
…text

feat(repository): implement SaveSpans method to save spans to MongoDB
feat(trace_service): add ProcessSpan method to handle span processing
refactor(telemetry): update resource span conversion to return spans instead of traces
  • Loading branch information
HyunSu1768 committed Sep 25, 2024
1 parent d941ed4 commit ec65446
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 43 deletions.
1 change: 1 addition & 0 deletions internal/models/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Span struct {
Events []SpanEvent `bson:"events"`
Links []SpanLink `bson:"links"`
Status SpanStatus `bson:"status"`
ServiceName string `bson:"serviceName"`
}

type SpanEvent struct {
Expand Down
1 change: 1 addition & 0 deletions internal/repository/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ import (

type Repository interface {
SaveTraces(ctx context.Context, traces []*models.Trace) error
SaveSpans(ctx context.Context, spans *[]models.Span) error
}
33 changes: 33 additions & 0 deletions internal/repository/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,39 @@ func (r *MongoRepository) SaveTraces(ctx context.Context, traces []*models.Trace
return nil
}

func (r *MongoRepository) SaveSpans(ctx context.Context, spans *[]models.Span) error {
if len(*spans) == 0 {
return nil
}

documents := make([]interface{}, len(*spans))
for i, span := range *spans {
doc := bson.M{
"id": span.ID,
"traceId": span.TraceID,
"spanId": span.SpanID,
"parentSpanId": span.ParentSpanID,
"name": span.Name,
"kind": span.Kind,
"startTimeUnixNano": span.StartTimeUnixNano,
"endTimeUnixNano": span.EndTimeUnixNano,
"attributes": convertAttributes(span.Attributes),
"events": convertEvents(span.Events),
"links": convertLinks(span.Links),
"status": convertStatus(span.Status),
}
documents[i] = doc
}

result, err := r.traceCollection.InsertMany(ctx, documents)
if err != nil {
return fmt.Errorf("failed to insert spans: %v", err)
}

log.Printf("Successfully inserted %d spans", len(result.InsertedIDs))
return nil
}

func convertSpans(spans []models.Span) []bson.M {
result := make([]bson.M, len(spans))
for i, span := range spans {
Expand Down
13 changes: 6 additions & 7 deletions internal/server/trace_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@ func NewTraceServer(traceService *service.TraceService) *TraceServer {

func (s *TraceServer) Export(ctx context.Context, req *collectorpb.ExportTraceServiceRequest) (*collectorpb.ExportTraceServiceResponse, error) {
for _, resourceSpans := range req.ResourceSpans {
traces := telemetry.ConvertResourceSpansToTraces(resourceSpans)
if traces != nil {
err := s.traceService.ProcessTrace(ctx, traces)
if err != nil {
fmt.Sprintf("Failed to process trace: %v", err)
return nil, status.Errorf(codes.Internal, "Failed to process trace: %v", err)
}
spans := telemetry.ConvertResourceSpansToSpans(resourceSpans)

err := s.traceService.ProcessSpan(ctx, &spans)
if err != nil {
fmt.Printf("Failed to process span: %v\n", err)
return nil, status.Errorf(codes.Internal, "Failed to process span: %v", err)
}
}

Expand Down
4 changes: 4 additions & 0 deletions internal/service/trace_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ func NewTraceService(repo repository.Repository) *TraceService {
func (s *TraceService) ProcessTrace(ctx context.Context, traces []*models.Trace) error {
return s.repo.SaveTraces(ctx, traces)
}

func (s *TraceService) ProcessSpan(ctx context.Context, spans *[]models.Span) error {
return s.repo.SaveSpans(ctx, spans)
}
52 changes: 16 additions & 36 deletions pkg/telemetry/otel_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,56 +2,33 @@ package telemetry

import (
"encoding/hex"
resourcepb "go.opentelemetry.io/proto/otlp/resource/v1"
"math"
"otel-trace-reciever/internal/models"

commonpb "go.opentelemetry.io/proto/otlp/common/v1"
resourcepb "go.opentelemetry.io/proto/otlp/resource/v1"
tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
)

func ConvertResourceSpansToTraces(resourceSpans *tracepb.ResourceSpans) []*models.Trace {
traceMap := make(map[string]*models.Trace)
serviceName := getServiceName(resourceSpans.Resource)
func ConvertResourceSpansToSpans(resourceSpans *tracepb.ResourceSpans) []models.Span {
var spans []models.Span
serviceNamePtr := getServiceName(resourceSpans.Resource)
serviceName := ""
if serviceNamePtr != nil {
serviceName = *serviceNamePtr
}

for _, scopeSpans := range resourceSpans.ScopeSpans {
for _, span := range scopeSpans.Spans {
convertedSpan := convertSpan(span)
traceID := convertedSpan.TraceID

if trace, exists := traceMap[traceID]; exists {
trace.Spans = append(trace.Spans, convertedSpan)
updateTraceDuration(trace, convertedSpan)
} else {
traceMap[traceID] = &models.Trace{
TraceID: traceID,
Spans: []models.Span{convertedSpan},
ServiceName: serviceName,
DateNano: convertedSpan.StartTimeUnixNano,
DurationNano: convertedSpan.EndTimeUnixNano - convertedSpan.StartTimeUnixNano,
}
}
convertedSpan := convertSpan(span, serviceName)
spans = append(spans, convertedSpan)
}
}

var traces []*models.Trace
for _, trace := range traceMap {
traces = append(traces, trace)
}

return traces
return spans
}

func updateTraceDuration(trace *models.Trace, span models.Span) {
if span.StartTimeUnixNano < trace.DateNano {
trace.DateNano = span.StartTimeUnixNano
}
if span.EndTimeUnixNano-trace.DateNano > trace.DurationNano {
trace.DurationNano = span.EndTimeUnixNano - trace.DateNano
}
}

func convertSpan(pbSpan *tracepb.Span) models.Span {
func convertSpan(pbSpan *tracepb.Span, serviceName string) models.Span {
return models.Span{
ID: hex.EncodeToString(pbSpan.SpanId),
TraceID: hex.EncodeToString(pbSpan.TraceId),
Expand All @@ -65,6 +42,7 @@ func convertSpan(pbSpan *tracepb.Span) models.Span {
Events: convertEvents(pbSpan.Events),
Links: convertLinks(pbSpan.Links),
Status: convertStatus(pbSpan.Status),
ServiceName: serviceName, // 개별 서비스 이름 할당
}
}

Expand Down Expand Up @@ -99,6 +77,8 @@ func convertAttributesPb(attrs []*commonpb.KeyValue) map[string]interface{} {
attributes[attr.Key] = convertArrayValue(v.ArrayValue)
case *commonpb.AnyValue_KvlistValue:
attributes[attr.Key] = convertKeyValueList(v.KvlistValue)
default:
attributes[attr.Key] = nil
}
}
return attributes
Expand Down Expand Up @@ -181,4 +161,4 @@ func getServiceName(resource *resourcepb.Resource) *string {
}
}
return nil
}
}

0 comments on commit ec65446

Please sign in to comment.