From f3af05c6a5c047516b794a1a5da57bd21ff065d3 Mon Sep 17 00:00:00 2001 From: Raj Kamal Singh <1133322+raj-k-singh@users.noreply.github.com> Date: Sat, 14 Sep 2024 12:09:41 +0530 Subject: [PATCH] Signozlogspipelineprocessor: use processorhelper.NewLogsProcessor for creating processor (#398) Using the standard helper simplifies the implementation and should also take care of common processor observability concerns --- .../signozlogspipelineprocessor/factory.go | 26 ++++++++++++++++--- .../signozlogspipelineprocessor/processor.go | 21 ++++----------- .../stanza_consumer.go | 15 +++-------- 3 files changed, 31 insertions(+), 31 deletions(-) diff --git a/processor/signozlogspipelineprocessor/factory.go b/processor/signozlogspipelineprocessor/factory.go index f6fce3ee..1c0ca21d 100644 --- a/processor/signozlogspipelineprocessor/factory.go +++ b/processor/signozlogspipelineprocessor/factory.go @@ -5,10 +5,12 @@ package signozlogspipelineprocessor import ( "context" "errors" + "fmt" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processorhelper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" @@ -30,19 +32,35 @@ func createDefaultConfig() component.Config { } } +var processorCapabilities = consumer.Capabilities{MutatesData: true} + func createLogsProcessor( - _ context.Context, + ctx context.Context, set processor.CreateSettings, cfg component.Config, - nextConsumer consumer.Logs) (processor.Logs, error) { + nextConsumer consumer.Logs, +) (processor.Logs, error) { pCfg, ok := cfg.(*Config) if !ok { return nil, errors.New("could not initialize signozlogspipeline processor") } - if len(pCfg.BaseConfig.Operators) == 0 { return nil, errors.New("no operators were configured for signozlogspipeline processor") } - return newProcessor(pCfg, nextConsumer, set.TelemetrySettings) + proc, err := newLogsPipelineProcessor(pCfg, set.TelemetrySettings) + if err != nil { + return nil, fmt.Errorf("couldn't build \"signozlogspipeline\" processor %w", err) + } + + return processorhelper.NewLogsProcessor( + ctx, + set, + cfg, + nextConsumer, + proc.ProcessLogs, + processorhelper.WithStart(proc.Start), + processorhelper.WithShutdown(proc.Shutdown), + processorhelper.WithCapabilities(processorCapabilities), + ) } diff --git a/processor/signozlogspipelineprocessor/processor.go b/processor/signozlogspipelineprocessor/processor.go index 8f289425..056ce2f5 100644 --- a/processor/signozlogspipelineprocessor/processor.go +++ b/processor/signozlogspipelineprocessor/processor.go @@ -11,7 +11,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/extension/experimental/storage" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" @@ -19,15 +18,13 @@ import ( _ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok" // ensure grok parser gets registered. ) -func newProcessor( +func newLogsPipelineProcessor( processorConfig *Config, - nextConsumer consumer.Logs, telemetrySettings component.TelemetrySettings, ) (*logsPipelineProcessor, error) { sink := &stanzaToOtelConsumer{ - nextConsumer: nextConsumer, - logger: telemetrySettings.Logger, + logger: telemetrySettings.Logger, } stanzaPipeline, err := pipeline.Config{ @@ -40,7 +37,6 @@ func newProcessor( return &logsPipelineProcessor{ telemetrySettings: telemetrySettings, - nextConsumer: nextConsumer, processorConfig: processorConfig, stanzaPipeline: stanzaPipeline, @@ -49,10 +45,8 @@ func newProcessor( }, nil } -// Implements processor.Logs type logsPipelineProcessor struct { telemetrySettings component.TelemetrySettings - nextConsumer consumer.Logs processorConfig *Config stanzaPipeline *pipeline.DirectedPipeline @@ -99,12 +93,7 @@ func (p *logsPipelineProcessor) Shutdown(ctx context.Context) error { return nil } -// baseConsumer interface -func (p *logsPipelineProcessor) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: true} -} - -func (p *logsPipelineProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error { +func (p *logsPipelineProcessor) ProcessLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) { p.telemetrySettings.Logger.Debug( "logsPipelineProcessor received logs", zap.Any("logs", ld), @@ -118,8 +107,8 @@ func (p *logsPipelineProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) e } } - // Flush processed entries as a single plog.Logs to next consumer. - return p.sink.flush(ctx) + // Return processed entries as a single plog.Logs + return p.sink.flush(ctx), nil } // Helpers below have been brought in as is from stanza adapter for logstransform diff --git a/processor/signozlogspipelineprocessor/stanza_consumer.go b/processor/signozlogspipelineprocessor/stanza_consumer.go index aa3762ad..3527acfc 100644 --- a/processor/signozlogspipelineprocessor/stanza_consumer.go +++ b/processor/signozlogspipelineprocessor/stanza_consumer.go @@ -6,15 +6,14 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" - "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" ) // A stanza operator that consumes stanza entries and converts them to pdata.Log // for passing them on to the next otel consumer. type stanzaToOtelConsumer struct { - nextConsumer consumer.Logs - logger *zap.Logger + logger *zap.Logger // One plog.Logs can contain many log records. While the otel processor ConsumeLogs works // with one plog.Logs at a time, the stanza pipeline works with one log entry at a time. @@ -76,16 +75,10 @@ func (c *stanzaToOtelConsumer) Process(ctx context.Context, entry *entry.Entry) // `processedEntries` accumulates processed entries for a single plog.Logs until the // signozlogspipeline processor flushes these entries out - converting them back into // a single plog.Logs that gets sent to `nextConsumer.ConsumeLogs` -func (c *stanzaToOtelConsumer) flush(ctx context.Context) error { +func (c *stanzaToOtelConsumer) flush(ctx context.Context) plog.Logs { plogs := convertEntriesToPlogs(c.processedEntries) c.processedEntries = []*entry.Entry{} - - err := c.nextConsumer.ConsumeLogs(ctx, plogs) - if err != nil { - return err - } - - return nil + return plogs } func (c *stanzaToOtelConsumer) Logger() *zap.Logger {