Skip to content

Commit

Permalink
feat: httpreceiver updated
Browse files Browse the repository at this point in the history
  • Loading branch information
nityanandagohain committed Oct 16, 2023
1 parent e72df52 commit c8b43e2
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 40 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
github.com/golang/snappy v0.0.4
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/golang-lru v0.6.0
github.com/knadh/koanf v1.5.0
Expand Down Expand Up @@ -269,7 +270,6 @@ require (
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.10.0 // indirect
github.com/gophercloud/gophercloud v1.2.0 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd // indirect
github.com/grobie/gomemcache v0.0.0-20180201122607-1f779c573665 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.2 // indirect
Expand Down
19 changes: 17 additions & 2 deletions receiver/httpreceiver/bodyparser/default.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
package bodyparser

import "go.opentelemetry.io/collector/pdata/plog"
import (
"strings"

"go.opentelemetry.io/collector/pdata/plog"
)

type Default struct {
}

func (l *Default) Parse(body []byte, records plog.LogRecordSlice) {
func (l *Default) Parse(body []byte) plog.Logs {
// split by newline and return
// TODO: add configuration for multiline
ld := plog.NewLogs()
rl := ld.ResourceLogs().AppendEmpty()
sl := rl.ScopeLogs().AppendEmpty()
data := string(body)
loglines := strings.Split(data, "\n")
for _, log := range loglines {
sl.LogRecords().AppendEmpty().Body().SetStr(log)
}
return ld
}
51 changes: 51 additions & 0 deletions receiver/httpreceiver/bodyparser/default_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package bodyparser

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/plog"
)

func TestDefaultParse(t *testing.T) {
t.Parallel()
d := Default{}
tests := []struct {
name string
PayLoad string
Logs func() plog.Logs
isError bool
}{
{
name: "Test 1",
PayLoad: `This is a log line`,
Logs: func() plog.Logs {
ld := plog.NewLogs()
rl := ld.ResourceLogs().AppendEmpty()
sl := rl.ScopeLogs().AppendEmpty()
sl.LogRecords().AppendEmpty().Body().SetStr("This is a log line")
return ld
},
},
{
name: "Test 2 - multiple lines",
PayLoad: "This is a log line\nThis is another log line",
Logs: func() plog.Logs {
ld := plog.NewLogs()
rl := ld.ResourceLogs().AppendEmpty()
sl := rl.ScopeLogs().AppendEmpty()
sl.LogRecords().AppendEmpty().Body().SetStr("This is a log line")
sl.LogRecords().AppendEmpty().Body().SetStr("This is another log line")
return ld
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
res := d.Parse([]byte(tt.PayLoad))
logs := tt.Logs()
assert.Equal(t, logs, res)
})
}
}
3 changes: 2 additions & 1 deletion receiver/httpreceiver/bodyparser/google.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ import "go.opentelemetry.io/collector/pdata/plog"
type GCloud struct {
}

func (l *GCloud) Parse(body []byte, records plog.LogRecordSlice) {
func (l *GCloud) Parse(body []byte) plog.Logs {
return plog.Logs{}
}
83 changes: 61 additions & 22 deletions receiver/httpreceiver/bodyparser/heroku.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,73 @@ func NewHeroku() *Heroku {
names: names,
}
}
func (l *Heroku) Parse(body []byte, records plog.LogRecordSlice) {
data := string(body)
result := []map[string]interface{}{}
logStrings := octectCountingSplitter(data)
for _, logString := range logStrings {
res := l.parser.FindStringSubmatch(logString)
tresult := map[string]interface{}{}
for index, name := range l.names {
tresult[name] = res[index]
}
result = append(result, tresult)
}

records.EnsureCapacity(len(logStrings))

for _, event := range result {
lr := records.AppendEmpty()
type rAttrs struct {
priority string
version string
hostname string
appname string
procid string
}
type log struct {
timestamp string
msgid string
body string
}

attrs := lr.Attributes()
attrs.EnsureCapacity(7)
func (l *Heroku) Parse(body []byte) plog.Logs {
data := string(body)

for k, val := range event {
if k != "msg" && k != "" {
attrs.PutStr(k, val.(string))
loglines := octectCountingSplitter(data)

resdata := map[rAttrs][]log{}
for _, line := range loglines {
parsedLog := l.parser.FindStringSubmatch(line)

if len(parsedLog) != len(l.names) {
//TODO: do something here to conver that it wasn't parsed
resdata[rAttrs{}] = append(resdata[rAttrs{}], log{
body: line,
})
} else {
d := rAttrs{
priority: parsedLog[1],
version: parsedLog[2],
hostname: parsedLog[4],
appname: parsedLog[5],
procid: parsedLog[6],
}

resdata[d] = append(resdata[d], log{
// for timestamp as of now not parsing and leaving it to the user if they want to map it to actual timestamp
// can change it later if required
timestamp: parsedLog[3],
msgid: parsedLog[7],
body: parsedLog[8],
})
}
}

ld := plog.NewLogs()
for resource, logbodies := range resdata {
rl := ld.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().EnsureCapacity(5)
rl.Resource().Attributes().PutStr("priority", resource.priority)
rl.Resource().Attributes().PutStr("version", resource.version)
rl.Resource().Attributes().PutStr("hostname", resource.hostname)
rl.Resource().Attributes().PutStr("appname", resource.appname)
rl.Resource().Attributes().PutStr("procid", resource.procid)

sl := rl.ScopeLogs().AppendEmpty()
for _, log := range logbodies {
rec := sl.LogRecords().AppendEmpty()
rec.Body().SetStr(log.body)
rec.Attributes().EnsureCapacity(2)
rec.Attributes().PutStr("timestamp", log.timestamp)
rec.Attributes().PutStr("msgid", log.msgid)
}
lr.Body().SetStr(event["msg"].(string))
}
return ld

}

Expand Down
69 changes: 66 additions & 3 deletions receiver/httpreceiver/bodyparser/heroku_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/plog"
)

func TestOctectCountingSplitter(t *testing.T) {
t.Parallel()

// cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
// require.NoError(t, err)

tests := []struct {
name string
PayLoad string
Expand Down Expand Up @@ -68,3 +66,68 @@ func TestOctectCountingSplitter(t *testing.T) {
})
}
}

func AddDefaultResources(rl plog.ResourceLogs) {
attrs := rl.Resource().Attributes()
attrs.EnsureCapacity(5)
attrs.PutStr("priority", "190")
attrs.PutStr("version", "1")
attrs.PutStr("hostname", "host")
attrs.PutStr("appname", "app")
attrs.PutStr("procid", "otel-collector.1")
}

func TestHerokuParse(t *testing.T) {
t.Parallel()
d := NewHeroku()
tests := []struct {
name string
PayLoad string
Logs func() plog.Logs
isError bool
}{
{
name: "Test 1",
PayLoad: `151 <190>1 2023-10-12T07:25:48.393741+00:00 host app otel-collector.1 - 2023-10-12T07:25:48.393Z info service/telemetry.go:104 Setting up own telemetry...`,
Logs: func() plog.Logs {
ld := plog.NewLogs()
rl := ld.ResourceLogs().AppendEmpty()
AddDefaultResources(rl)
sl := rl.ScopeLogs().AppendEmpty()
log := sl.LogRecords().AppendEmpty()
log.Body().SetStr("2023-10-12T07:25:48.393Z info service/telemetry.go:104 Setting up own telemetry...")
log.Attributes().PutStr("timestamp", "2023-10-12T07:25:48.393741+00:00")
log.Attributes().PutStr("msgid", "-")
return ld
},
},
{
name: "Test 2 - multiline",
PayLoad: `151 <190>1 2023-10-12T07:25:48.393741+00:00 host app otel-collector.1 - 2023-10-12T07:25:48.393Z info service/telemetry.go:104 Setting up own telemetry...
189 <190>1 2023-10-12T07:25:48.393855+00:00 host app otel-collector.1 - 2023-10-12T07:25:48.393Z info service/telemetry.go:127 Serving Prometheus metrics {"address": ":8888", "level": "Basic"}`,
Logs: func() plog.Logs {
ld := plog.NewLogs()
rl := ld.ResourceLogs().AppendEmpty()
AddDefaultResources(rl)
sl := rl.ScopeLogs().AppendEmpty()
log := sl.LogRecords().AppendEmpty()
log.Body().SetStr("2023-10-12T07:25:48.393Z info service/telemetry.go:104 Setting up own telemetry...")
log.Attributes().PutStr("timestamp", "2023-10-12T07:25:48.393741+00:00")
log.Attributes().PutStr("msgid", "-")
log1 := sl.LogRecords().AppendEmpty()
log1.Body().SetStr(`2023-10-12T07:25:48.393Z info service/telemetry.go:127 Serving Prometheus metrics {"address": ":8888", "level": "Basic"}`)
log1.Attributes().PutStr("timestamp", "2023-10-12T07:25:48.393855+00:00")
log1.Attributes().PutStr("msgid", "-")
return ld
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
res := d.Parse([]byte(tt.PayLoad))
logs := tt.Logs()
assert.Equal(t, logs, res)
})
}
}
6 changes: 3 additions & 3 deletions receiver/httpreceiver/bodyparser/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package bodyparser
import "go.opentelemetry.io/collector/pdata/plog"

type Parser interface {
Parse(body []byte, records plog.LogRecordSlice)
Parse(body []byte) plog.Logs
}

func GetBodyParser(source string) Parser {
switch source {
case "google":
return &GCloud{}
// case "google":
// return &GCloud{}
case "heroku":
return NewHeroku()
default:
Expand Down
12 changes: 4 additions & 8 deletions receiver/httpreceiver/httpsreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/receiver"
)

Expand Down Expand Up @@ -185,22 +184,19 @@ func (r *httpreceiver) handleLogs(w http.ResponseWriter, req *http.Request) {
return
}

ld := plog.NewLogs()
rl := ld.ResourceLogs().AppendEmpty()
sl := rl.ScopeLogs().AppendEmpty()
r.parser.Parse(body, sl.LogRecords())
logs := r.parser.Parse(body)

err = r.logsConsumer.ConsumeLogs(ctx, ld)
err = r.logsConsumer.ConsumeLogs(ctx, logs)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

log.Printf("Received logs: %d\n", sl.LogRecords().Len())
log.Printf("Received logs: %d\n", logs.LogRecordCount)
r.obsrecv.EndMetricsOp(
ctx,
metadata.Type,
sl.LogRecords().Len(),
logs.LogRecordCount(),
err)

}

0 comments on commit c8b43e2

Please sign in to comment.