From fa5e98ff45f2d9e314b2f19c6bb90f5bc2fbabfa Mon Sep 17 00:00:00 2001 From: Raj Date: Fri, 13 Sep 2024 18:29:26 +0530 Subject: [PATCH 1/2] feat: use processorhelper.NewLogsProcessor for creating signozlogspipeline processors --- .../signozlogspipelineprocessor/factory.go | 26 ++++++++++++++++--- .../signozlogspipelineprocessor/processor.go | 21 ++++----------- .../stanza_consumer.go | 15 +++-------- 3 files changed, 31 insertions(+), 31 deletions(-) diff --git a/processor/signozlogspipelineprocessor/factory.go b/processor/signozlogspipelineprocessor/factory.go index f6fce3ee..48b885f6 100644 --- a/processor/signozlogspipelineprocessor/factory.go +++ b/processor/signozlogspipelineprocessor/factory.go @@ -5,10 +5,12 @@ package signozlogspipelineprocessor import ( "context" "errors" + "fmt" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processorhelper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" @@ -30,19 +32,35 @@ func createDefaultConfig() component.Config { } } +var processorCapabilities = consumer.Capabilities{MutatesData: true} + func createLogsProcessor( - _ context.Context, + ctx context.Context, set processor.CreateSettings, cfg component.Config, - nextConsumer consumer.Logs) (processor.Logs, error) { + nextConsumer consumer.Logs, +) (processor.Logs, error) { pCfg, ok := cfg.(*Config) if !ok { return nil, errors.New("could not initialize signozlogspipeline processor") } - if len(pCfg.BaseConfig.Operators) == 0 { return nil, errors.New("no operators were configured for signozlogspipeline processor") } - return newProcessor(pCfg, nextConsumer, set.TelemetrySettings) + proc, err := newLogsPipelineProcessor(pCfg, set.TelemetrySettings) + if err != nil { + return nil, fmt.Errorf("couldn't build \"logspipeline\" processor %w", err) + } + + return processorhelper.NewLogsProcessor( + ctx, + set, + cfg, + nextConsumer, + proc.ProcessLogs, + processorhelper.WithStart(proc.Start), + processorhelper.WithShutdown(proc.Shutdown), + processorhelper.WithCapabilities(processorCapabilities), + ) } diff --git a/processor/signozlogspipelineprocessor/processor.go b/processor/signozlogspipelineprocessor/processor.go index 8f289425..056ce2f5 100644 --- a/processor/signozlogspipelineprocessor/processor.go +++ b/processor/signozlogspipelineprocessor/processor.go @@ -11,7 +11,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/extension/experimental/storage" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" @@ -19,15 +18,13 @@ import ( _ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok" // ensure grok parser gets registered. ) -func newProcessor( +func newLogsPipelineProcessor( processorConfig *Config, - nextConsumer consumer.Logs, telemetrySettings component.TelemetrySettings, ) (*logsPipelineProcessor, error) { sink := &stanzaToOtelConsumer{ - nextConsumer: nextConsumer, - logger: telemetrySettings.Logger, + logger: telemetrySettings.Logger, } stanzaPipeline, err := pipeline.Config{ @@ -40,7 +37,6 @@ func newProcessor( return &logsPipelineProcessor{ telemetrySettings: telemetrySettings, - nextConsumer: nextConsumer, processorConfig: processorConfig, stanzaPipeline: stanzaPipeline, @@ -49,10 +45,8 @@ func newProcessor( }, nil } -// Implements processor.Logs type logsPipelineProcessor struct { telemetrySettings component.TelemetrySettings - nextConsumer consumer.Logs processorConfig *Config stanzaPipeline *pipeline.DirectedPipeline @@ -99,12 +93,7 @@ func (p *logsPipelineProcessor) Shutdown(ctx context.Context) error { return nil } -// baseConsumer interface -func (p *logsPipelineProcessor) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: true} -} - -func (p *logsPipelineProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error { +func (p *logsPipelineProcessor) ProcessLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) { p.telemetrySettings.Logger.Debug( "logsPipelineProcessor received logs", zap.Any("logs", ld), @@ -118,8 +107,8 @@ func (p *logsPipelineProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) e } } - // Flush processed entries as a single plog.Logs to next consumer. - return p.sink.flush(ctx) + // Return processed entries as a single plog.Logs + return p.sink.flush(ctx), nil } // Helpers below have been brought in as is from stanza adapter for logstransform diff --git a/processor/signozlogspipelineprocessor/stanza_consumer.go b/processor/signozlogspipelineprocessor/stanza_consumer.go index aa3762ad..3527acfc 100644 --- a/processor/signozlogspipelineprocessor/stanza_consumer.go +++ b/processor/signozlogspipelineprocessor/stanza_consumer.go @@ -6,15 +6,14 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" - "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" ) // A stanza operator that consumes stanza entries and converts them to pdata.Log // for passing them on to the next otel consumer. type stanzaToOtelConsumer struct { - nextConsumer consumer.Logs - logger *zap.Logger + logger *zap.Logger // One plog.Logs can contain many log records. While the otel processor ConsumeLogs works // with one plog.Logs at a time, the stanza pipeline works with one log entry at a time. @@ -76,16 +75,10 @@ func (c *stanzaToOtelConsumer) Process(ctx context.Context, entry *entry.Entry) // `processedEntries` accumulates processed entries for a single plog.Logs until the // signozlogspipeline processor flushes these entries out - converting them back into // a single plog.Logs that gets sent to `nextConsumer.ConsumeLogs` -func (c *stanzaToOtelConsumer) flush(ctx context.Context) error { +func (c *stanzaToOtelConsumer) flush(ctx context.Context) plog.Logs { plogs := convertEntriesToPlogs(c.processedEntries) c.processedEntries = []*entry.Entry{} - - err := c.nextConsumer.ConsumeLogs(ctx, plogs) - if err != nil { - return err - } - - return nil + return plogs } func (c *stanzaToOtelConsumer) Logger() *zap.Logger { From 2287892d6830030c54ffc02ccf06ffec04e7a2f3 Mon Sep 17 00:00:00 2001 From: Raj Date: Fri, 13 Sep 2024 18:29:36 +0530 Subject: [PATCH 2/2] chore: minor cleanup --- processor/signozlogspipelineprocessor/factory.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/signozlogspipelineprocessor/factory.go b/processor/signozlogspipelineprocessor/factory.go index 48b885f6..1c0ca21d 100644 --- a/processor/signozlogspipelineprocessor/factory.go +++ b/processor/signozlogspipelineprocessor/factory.go @@ -50,7 +50,7 @@ func createLogsProcessor( proc, err := newLogsPipelineProcessor(pCfg, set.TelemetrySettings) if err != nil { - return nil, fmt.Errorf("couldn't build \"logspipeline\" processor %w", err) + return nil, fmt.Errorf("couldn't build \"signozlogspipeline\" processor %w", err) } return processorhelper.NewLogsProcessor(