Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Signozlogspipelineprocessor: use processorhelper.NewLogsProcessor for creating processor #398

Merged
merged 2 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions processor/signozlogspipelineprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ package signozlogspipelineprocessor
import (
"context"
"errors"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
Expand All @@ -30,19 +32,35 @@ func createDefaultConfig() component.Config {
}
}

var processorCapabilities = consumer.Capabilities{MutatesData: true}

func createLogsProcessor(
_ context.Context,
ctx context.Context,
set processor.CreateSettings,
cfg component.Config,
nextConsumer consumer.Logs) (processor.Logs, error) {
nextConsumer consumer.Logs,
) (processor.Logs, error) {
pCfg, ok := cfg.(*Config)
if !ok {
return nil, errors.New("could not initialize signozlogspipeline processor")
}

if len(pCfg.BaseConfig.Operators) == 0 {
return nil, errors.New("no operators were configured for signozlogspipeline processor")
}

return newProcessor(pCfg, nextConsumer, set.TelemetrySettings)
proc, err := newLogsPipelineProcessor(pCfg, set.TelemetrySettings)
if err != nil {
return nil, fmt.Errorf("couldn't build \"signozlogspipeline\" processor %w", err)
}

return processorhelper.NewLogsProcessor(
ctx,
set,
cfg,
nextConsumer,
proc.ProcessLogs,
processorhelper.WithStart(proc.Start),
processorhelper.WithShutdown(proc.Shutdown),
processorhelper.WithCapabilities(processorCapabilities),
)
}
21 changes: 5 additions & 16 deletions processor/signozlogspipelineprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,20 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"

_ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok" // ensure grok parser gets registered.
)

func newProcessor(
func newLogsPipelineProcessor(
processorConfig *Config,
nextConsumer consumer.Logs,
telemetrySettings component.TelemetrySettings,
) (*logsPipelineProcessor, error) {

sink := &stanzaToOtelConsumer{
nextConsumer: nextConsumer,
logger: telemetrySettings.Logger,
logger: telemetrySettings.Logger,
}

stanzaPipeline, err := pipeline.Config{
Expand All @@ -40,7 +37,6 @@ func newProcessor(

return &logsPipelineProcessor{
telemetrySettings: telemetrySettings,
nextConsumer: nextConsumer,

processorConfig: processorConfig,
stanzaPipeline: stanzaPipeline,
Expand All @@ -49,10 +45,8 @@ func newProcessor(
}, nil
}

// Implements processor.Logs
type logsPipelineProcessor struct {
telemetrySettings component.TelemetrySettings
nextConsumer consumer.Logs

processorConfig *Config
stanzaPipeline *pipeline.DirectedPipeline
Expand Down Expand Up @@ -99,12 +93,7 @@ func (p *logsPipelineProcessor) Shutdown(ctx context.Context) error {
return nil
}

// baseConsumer interface
func (p *logsPipelineProcessor) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: true}
}

func (p *logsPipelineProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
func (p *logsPipelineProcessor) ProcessLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) {
p.telemetrySettings.Logger.Debug(
"logsPipelineProcessor received logs",
zap.Any("logs", ld),
Expand All @@ -118,8 +107,8 @@ func (p *logsPipelineProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) e
}
}

// Flush processed entries as a single plog.Logs to next consumer.
return p.sink.flush(ctx)
// Return processed entries as a single plog.Logs
return p.sink.flush(ctx), nil
}

// Helpers below have been brought in as is from stanza adapter for logstransform
Expand Down
15 changes: 4 additions & 11 deletions processor/signozlogspipelineprocessor/stanza_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
)

// A stanza operator that consumes stanza entries and converts them to pdata.Log
// for passing them on to the next otel consumer.
type stanzaToOtelConsumer struct {
nextConsumer consumer.Logs
logger *zap.Logger
logger *zap.Logger

// One plog.Logs can contain many log records. While the otel processor ConsumeLogs works
// with one plog.Logs at a time, the stanza pipeline works with one log entry at a time.
Expand Down Expand Up @@ -76,16 +75,10 @@ func (c *stanzaToOtelConsumer) Process(ctx context.Context, entry *entry.Entry)
// `processedEntries` accumulates processed entries for a single plog.Logs until the
// signozlogspipeline processor flushes these entries out - converting them back into
// a single plog.Logs that gets sent to `nextConsumer.ConsumeLogs`
func (c *stanzaToOtelConsumer) flush(ctx context.Context) error {
func (c *stanzaToOtelConsumer) flush(ctx context.Context) plog.Logs {
plogs := convertEntriesToPlogs(c.processedEntries)
c.processedEntries = []*entry.Entry{}

err := c.nextConsumer.ConsumeLogs(ctx, plogs)
if err != nil {
return err
}

return nil
return plogs
raj-k-singh marked this conversation as resolved.
Show resolved Hide resolved
}

func (c *stanzaToOtelConsumer) Logger() *zap.Logger {
Expand Down