diff --git a/exporter/signozkafkaexporter/kafka_exporter.go b/exporter/signozkafkaexporter/kafka_exporter.go index 16d18bd0..bfe63b4d 100644 --- a/exporter/signozkafkaexporter/kafka_exporter.go +++ b/exporter/signozkafkaexporter/kafka_exporter.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" @@ -108,6 +109,8 @@ type kafkaLogsProducer struct { } func (e *kafkaLogsProducer) logsDataPusher(ctx context.Context, ld plog.Logs) error { + e.normalizeLogData(&ld) + kafkaTopicPrefix, err := getKafkaTopicFromClientMetadata(client.FromContext(ctx).Metadata) if err != nil { return consumererror.NewPermanent(err) @@ -131,6 +134,34 @@ func (e *kafkaLogsProducer) logsDataPusher(ctx context.Context, ld plog.Logs) er return nil } +func (e *kafkaLogsProducer) normalizeLogData(ld *plog.Logs) { + for rlIdx := 0; rlIdx < ld.ResourceLogs().Len(); rlIdx++ { + rl := ld.ResourceLogs().At(rlIdx) + + for slIdx := 0; slIdx < rl.ScopeLogs().Len(); slIdx++ { + sl := rl.ScopeLogs().At(slIdx) + + for lrIdx := 0; lrIdx < sl.LogRecords().Len(); lrIdx++ { + lr := sl.LogRecords().At(lrIdx) + + // log body is always expected to be string in SigNoz + if lr.Body().Type() != pcommon.ValueTypeStr { + var strBody string + if lr.Body().Type() == pcommon.ValueTypeBytes { + strBody = string(lr.Body().Bytes().AsRaw()) + } else { + strBody = lr.Body().AsString() + } + + lr.Body().SetStr(strBody) + + } + + } + } + } +} + func (e *kafkaLogsProducer) Close(context.Context) error { return e.producer.Close() } diff --git a/exporter/signozkafkaexporter/kafka_exporter_test.go b/exporter/signozkafkaexporter/kafka_exporter_test.go index c3cc0057..96975bb8 100644 --- a/exporter/signozkafkaexporter/kafka_exporter_test.go +++ b/exporter/signozkafkaexporter/kafka_exporter_test.go @@ -6,6 +6,7 @@ package signozkafkaexporter import ( "context" "fmt" + "reflect" "testing" "github.com/Shopify/sarama" @@ -21,6 +22,7 @@ import ( "go.uber.org/zap" "github.com/SigNoz/signoz-otel-collector/internal/coreinternal/testdata" + "github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver" ) func TestNewExporter_err_version(t *testing.T) { @@ -242,6 +244,55 @@ func TestLogsDataPusher(t *testing.T) { require.NoError(t, err) } +func TestLogBodyBytesGetConvertedToTextString(t *testing.T) { + c := sarama.NewConfig() + producer := mocks.NewSyncProducer(t, c) + p := kafkaLogsProducer{ + producer: producer, + marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding), + } + t.Cleanup(func() { + require.NoError(t, p.Close(context.Background())) + }) + + testLogCount := 5 + testBody := []byte("test log") + + logs := testdata.GenerateLogsManyLogRecordsSameResource(testLogCount) + for i := 0; i < testLogCount; i++ { + lr := logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i) + logBodyBytes := lr.Body().SetEmptyBytes() + logBodyBytes.Append(testBody...) + } + + producer.ExpectSendMessageWithCheckerFunctionAndSucceed(func(val []byte) error { + unmarshaler := signozkafkareceiver.NewPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding) + producedLogs, err := unmarshaler.Unmarshal(val) + if err != nil { + return err + } + + for i := 0; i < testLogCount; i++ { + lr := producedLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i) + producedBody := lr.Body().AsRaw() + producedBodyStr, ok := producedBody.(string) + if !ok || producedBodyStr != string(testBody) { + return fmt.Errorf( + "unexpected log body produced: testLogIdx: %d, type %s, value: %v", + i, reflect.TypeOf(producedBody).String(), producedBody, + ) + } + } + + return nil + }) + + ctx := client.NewContext(context.Background(), client.Info{Metadata: client.NewMetadata(map[string][]string{"signoz_tenant_id": {"test_tenant_id"}})}) + err := p.logsDataPusher(ctx, logs) + + require.NoError(t, err) +} + func TestLogsDataPusher_err(t *testing.T) { c := sarama.NewConfig() producer := mocks.NewSyncProducer(t, c) diff --git a/receiver/signozkafkareceiver/kafka_receiver_test.go b/receiver/signozkafkareceiver/kafka_receiver_test.go index cd431357..87a2d6e8 100644 --- a/receiver/signozkafkareceiver/kafka_receiver_test.go +++ b/receiver/signozkafkareceiver/kafka_receiver_test.go @@ -652,7 +652,7 @@ func TestLogsConsumerGroupHandler(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := logsConsumerGroupHandler{ - unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), + unmarshaler: NewPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewNop(), @@ -701,7 +701,7 @@ func TestLogsConsumerGroupHandler_session_done(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := logsConsumerGroupHandler{ - unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), + unmarshaler: NewPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewNop(), @@ -746,7 +746,7 @@ func TestLogsConsumerGroupHandler_error_unmarshal(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := logsConsumerGroupHandler{ - unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), + unmarshaler: NewPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewNop(), @@ -773,7 +773,7 @@ func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := logsConsumerGroupHandler{ - unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), + unmarshaler: NewPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewErr(consumerError), diff --git a/receiver/signozkafkareceiver/pdata_unmarshaler.go b/receiver/signozkafkareceiver/pdata_unmarshaler.go index 818cc146..283bf342 100644 --- a/receiver/signozkafkareceiver/pdata_unmarshaler.go +++ b/receiver/signozkafkareceiver/pdata_unmarshaler.go @@ -22,7 +22,7 @@ func (p pdataLogsUnmarshaler) Encoding() string { return p.encoding } -func newPdataLogsUnmarshaler(unmarshaler plog.Unmarshaler, encoding string) LogsUnmarshaler { +func NewPdataLogsUnmarshaler(unmarshaler plog.Unmarshaler, encoding string) LogsUnmarshaler { return pdataLogsUnmarshaler{ Unmarshaler: unmarshaler, encoding: encoding, diff --git a/receiver/signozkafkareceiver/pdata_unmarshaler_test.go b/receiver/signozkafkareceiver/pdata_unmarshaler_test.go index d71b7ae2..3e74f9f0 100644 --- a/receiver/signozkafkareceiver/pdata_unmarshaler_test.go +++ b/receiver/signozkafkareceiver/pdata_unmarshaler_test.go @@ -23,6 +23,6 @@ func TestNewPdataMetricsUnmarshaler(t *testing.T) { } func TestNewPdataLogsUnmarshaler(t *testing.T) { - um := newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, "test") + um := NewPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, "test") assert.Equal(t, "test", um.Encoding()) } diff --git a/receiver/signozkafkareceiver/unmarshaler.go b/receiver/signozkafkareceiver/unmarshaler.go index e1e30fac..eac5d548 100644 --- a/receiver/signozkafkareceiver/unmarshaler.go +++ b/receiver/signozkafkareceiver/unmarshaler.go @@ -72,7 +72,7 @@ func defaultMetricsUnmarshalers() map[string]MetricsUnmarshaler { } func defaultLogsUnmarshalers() map[string]LogsUnmarshaler { - otlpPb := newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding) + otlpPb := NewPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding) raw := newRawLogsUnmarshaler() text := newTextLogsUnmarshaler() return map[string]LogsUnmarshaler{