Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Signozlogspipelineprocessor: use own copy of stanza operators #403

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/golang-lru v1.0.2
github.com/jellydator/ttlcache/v3 v3.2.0
github.com/knadh/koanf v1.5.0
github.com/lightstep/go-expohisto v1.0.0
github.com/oklog/ulid v1.3.1
Expand Down Expand Up @@ -168,6 +169,7 @@ require (
go.opentelemetry.io/collector/semconv v0.102.0
go.opentelemetry.io/otel/trace v1.27.0
go.uber.org/atomic v1.11.0
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/text v0.16.0
Expand Down Expand Up @@ -312,7 +314,6 @@ require (
github.com/imdario/mergo v0.3.16 // indirect
github.com/ionos-cloud/sdk-go/v6 v6.1.11 // indirect
github.com/jcmturner/goidentity/v6 v6.0.1 // indirect
github.com/jellydator/ttlcache/v3 v3.2.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/karrick/godirwalk v1.17.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions pkg/parser/grok/grok.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

signozlogspipelinestanzaoperator "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
Expand All @@ -15,6 +16,7 @@ const operatorType = "grok_parser"

func init() {
operator.Register(operatorType, func() operator.Builder { return NewConfig() })
signozlogspipelinestanzaoperator.Register(operatorType, func() operator.Builder { return NewConfig() })
}

// NewConfig creates a new grok parser config with default values
Expand Down
15 changes: 12 additions & 3 deletions processor/signozlogspipelineprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ package signozlogspipelineprocessor
import (
"errors"

signozlogspipelinestanzaadapter "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/adapter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"go.opentelemetry.io/collector/component"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
)

type Config struct {
adapter.BaseConfig `mapstructure:",squash"`
signozlogspipelinestanzaadapter.BaseConfig `mapstructure:",squash"`
}

var _ component.Config = (*Config)(nil)
Expand All @@ -21,3 +21,12 @@ func (cfg *Config) Validate() error {
}
return nil
}

func (cfg *Config) OperatorConfigs() []operator.Config {
ops := []operator.Config{}

for _, op := range cfg.BaseConfig.Operators {
ops = append(ops, operator.Config(op))
}
return ops
}
11 changes: 6 additions & 5 deletions processor/signozlogspipelineprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap/confmaptest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex"

signozlogspipelinestanzaadapter "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/adapter"
signozlogspipelinestanzaoperator "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator"
"github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator/operators/regex"
)

func TestLoadConfig(t *testing.T) {
Expand All @@ -24,8 +25,8 @@ func TestLoadConfig(t *testing.T) {
cfg := factory.CreateDefaultConfig()
assert.NoError(t, component.UnmarshalConfig(cm, cfg))
assert.Equal(t, &Config{
BaseConfig: adapter.BaseConfig{
Operators: []operator.Config{
BaseConfig: signozlogspipelinestanzaadapter.BaseConfig{
Operators: []signozlogspipelinestanzaoperator.Config{
{
Builder: func() *regex.Config {
cfg := regex.NewConfig()
Expand Down
9 changes: 4 additions & 5 deletions processor/signozlogspipelineprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ import (
"errors"
"fmt"

signozlogspipelinestanzaadapter "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/adapter"
signozlogspipelinestanzaoperator "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)

func NewFactory() processor.Factory {
Expand All @@ -26,8 +25,8 @@ func NewFactory() processor.Factory {
// Note: This isn't a valid configuration (no operators would lead to no work being done)
func createDefaultConfig() component.Config {
return &Config{
BaseConfig: adapter.BaseConfig{
Operators: []operator.Config{},
BaseConfig: signozlogspipelinestanzaadapter.BaseConfig{
Operators: []signozlogspipelinestanzaoperator.Config{},
},
}
}
Expand Down
12 changes: 6 additions & 6 deletions processor/signozlogspipelineprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import (
"context"
"testing"

signozlogspipelinestanzaadapter "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/adapter"
signozlogspipelinestanzaoperator "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/processor/processortest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex"
)
Expand All @@ -27,8 +27,8 @@ func TestCreateDefaultConfig(t *testing.T) {
func TestCreateProcessor(t *testing.T) {
factory := NewFactory()
cfg := &Config{
BaseConfig: adapter.BaseConfig{
Operators: []operator.Config{
BaseConfig: signozlogspipelinestanzaadapter.BaseConfig{
Operators: []signozlogspipelinestanzaoperator.Config{
{
Builder: func() *regex.Config {
cfg := regex.NewConfig()
Expand Down Expand Up @@ -57,8 +57,8 @@ func TestCreateProcessor(t *testing.T) {
func TestInvalidOperators(t *testing.T) {
factory := NewFactory()
cfg := &Config{
BaseConfig: adapter.BaseConfig{
Operators: []operator.Config{
BaseConfig: signozlogspipelinestanzaadapter.BaseConfig{
Operators: []signozlogspipelinestanzaoperator.Config{
{
// invalid due to missing regex
Builder: regex.NewConfig(),
Expand Down
2 changes: 1 addition & 1 deletion processor/signozlogspipelineprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func newLogsPipelineProcessor(
}

stanzaPipeline, err := pipeline.Config{
Operators: processorConfig.BaseConfig.Operators,
Operators: processorConfig.OperatorConfigs(),
DefaultOutput: sink,
}.Build(telemetrySettings)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions processor/signozlogspipelineprocessor/stanza/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Copy of stanza components used for powering signozlogspipelineprocessor
10 changes: 10 additions & 0 deletions processor/signozlogspipelineprocessor/stanza/adapter/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package signozlogspipelinestanzaadapter

import (
signozlogspipelinestanzaoperator "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator"
)

type BaseConfig struct {
// Using our own version of Config allows using a dedicated registry of stanza ops for logs pipelines.
Operators []signozlogspipelinestanzaoperator.Config `mapstructure:"operators"`
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Register copies of stanza operators dedicated to signoz logs pipelines
package signozlogspipelinestanzaadapter

import (
_ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok"
_ "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator/operators/add"
_ "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator/operators/copy"
_ "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator/operators/json"
_ "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator/operators/move"
_ "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator/operators/noop"
_ "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator/operators/regex"
_ "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator/operators/remove"
_ "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator/operators/router"
_ "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator/operators/severity"
_ "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator/operators/time"
_ "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator/operators/trace"
)
48 changes: 48 additions & 0 deletions processor/signozlogspipelineprocessor/stanza/operator/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Mostly Brought in as is from opentelemetry-collector-contrib
// Maintaining our own copy/version of Config allows us to use our own
// registry of stanza operators used in Config.Unmarshal in this file

package signozlogspipelinestanzaoperator

import (
"fmt"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"go.opentelemetry.io/collector/confmap"
)

// Config is the configuration of an operator
type Config struct {
operator.Builder
}

// NewConfig wraps the builder interface in a concrete struct
func NewConfig(b operator.Builder) Config {
return Config{Builder: b}
}

func (c *Config) Unmarshal(component *confmap.Conf) error {
if !component.IsSet("type") {
return fmt.Errorf("missing required field 'type'")
}

typeInterface := component.Get("type")

typeString, ok := typeInterface.(string)
if !ok {
return fmt.Errorf("non-string type %T for field 'type'", typeInterface)
}

builderFunc, ok := SignozStanzaOperatorsRegistry.Lookup(typeString)
if !ok {
return fmt.Errorf("unsupported type '%s'", typeString)
}

builder := builderFunc()
if err := component.Unmarshal(builder, confmap.WithIgnoreUnused()); err != nil {
return fmt.Errorf("unmarshal to %s: %w", typeString, err)
}

c.Builder = builder
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Brought in as is from opentelemetry-collector-contrib

package add

import (
"fmt"
"strings"

"go.opentelemetry.io/collector/component"

signozlogspipelinestanzaoperator "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)

const operatorType = "add"

func init() {
signozlogspipelinestanzaoperator.Register(operatorType, func() operator.Builder { return NewConfig() })
}

// NewConfig creates a new add operator config with default values
func NewConfig() *Config {
return NewConfigWithID(operatorType)
}

// NewConfigWithID creates a new add operator config with default values
func NewConfigWithID(operatorID string) *Config {
return &Config{
TransformerConfig: helper.NewTransformerConfig(operatorID, operatorType),
}
}

// Config is the configuration of an add operator
type Config struct {
helper.TransformerConfig `mapstructure:",squash"`
Field entry.Field `mapstructure:"field"`
Value any `mapstructure:"value,omitempty"`
}

// Build will build an add operator from the supplied configuration
func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error) {
transformerOperator, err := c.TransformerConfig.Build(set)
if err != nil {
return nil, err
}

addOperator := &Transformer{
TransformerOperator: transformerOperator,
Field: c.Field,
}
strVal, ok := c.Value.(string)
if !ok || !isExpr(strVal) {
addOperator.Value = c.Value
return addOperator, nil
}
exprStr := strings.TrimPrefix(strVal, "EXPR(")
exprStr = strings.TrimSuffix(exprStr, ")")

compiled, err := helper.ExprCompile(exprStr)
if err != nil {
return nil, fmt.Errorf("failed to compile expression '%s': %w", c.IfExpr, err)
}

addOperator.program = compiled
return addOperator, nil
}
Loading