diff --git a/Makefile b/Makefile index 216f4eb6..a9f0022b 100644 --- a/Makefile +++ b/Makefile @@ -29,7 +29,7 @@ test-and-lint: test fmt lint .PHONY: test test: - go test -count=1 -v -race -cover ./... + go test --timeout=30s -count=1 -v -race -cover ./... .PHONY: build build: diff --git a/cmd/signozcollector/main.go b/cmd/signozcollector/main.go index d3e41947..e7609edb 100644 --- a/cmd/signozcollector/main.go +++ b/cmd/signozcollector/main.go @@ -48,22 +48,23 @@ func main() { signozcol.WrappedCollectorSettings{ ConfigPaths: []string{collectorConfig}, // TODO: Build version from git tag - Version: "0.63.0", - Desc: "SigNoz OpenTelemetry Collector", - LoggingOpts: []zap.Option{zap.WithCaller(true)}, + Version: "0.66.5", + Desc: "SigNoz OpenTelemetry Collector", + LoggingOpts: []zap.Option{zap.WithCaller(true)}, + PollInterval: 200 * time.Millisecond, }, ) svc, err := service.New(coll, logger, managerConfig, collectorConfig) if err != nil { - log.Fatalf("failed to create collector service: %v", err) + logger.Fatal("failed to create collector service:", zap.Error(err)) } ctx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) defer cancel() if err := runInteractive(ctx, logger, svc); err != nil { - log.Fatalf("failed to run service: %v", err) + logger.Fatal("failed to run service:", zap.Error(err)) } } @@ -72,8 +73,12 @@ func runInteractive(ctx context.Context, logger *zap.Logger, svc service.Service return fmt.Errorf("failed to start collector service: %w", err) } + // Wait for context done or service error select { case <-ctx.Done(): + logger.Info("Context done, shutting down...") + case err := <-svc.Error(): + logger.Error("Service error, shutting down...", zap.Error(err)) } stopTimeoutCtx, stopCancel := context.WithTimeout(context.Background(), 5*time.Second) diff --git a/opamp/client.go b/opamp/client.go index d35f60f6..791322b2 100644 --- a/opamp/client.go +++ b/opamp/client.go @@ -1,11 +1,56 @@ package opamp -import "context" +import ( + "context" + "fmt" + "time" + + "github.com/SigNoz/signoz-otel-collector/signozcol" + "go.opentelemetry.io/collector/otelcol" + "go.uber.org/zap" +) type Client interface { Start(ctx context.Context) error Stop(ctx context.Context) error - Error() error + Error() <-chan error +} + +type baseClient struct { + err chan error + stopped chan bool + coll *signozcol.WrappedCollector + logger *zap.Logger +} + +// Error returns the error channel +func (c baseClient) Error() <-chan error { + return c.err +} + +// ensureRunning checks if the collector is running +// and sends an error to the error channel if it is not +// running +// +// The error channel is used to signal the main function +// to shutdown the service +// +// The collector may stop running unexpectedly. This can +// happen if a component reports a fatal error or some other +// async error occurs +// See https://github.com/open-telemetry/opentelemetry-collector/blob/8d425480b0dd1270b408582d9e21dd644299cd7e/service/host.go#L34-L39 +func (c baseClient) ensureRunning() { + c.logger.Info("Ensuring collector is running") + for { + select { + case <-c.stopped: + return + case <-time.After(c.coll.PollInterval): + if c.coll.GetState() == otelcol.StateClosed { + c.err <- fmt.Errorf("collector stopped unexpectedly") + } + } + } } diff --git a/opamp/server_client.go b/opamp/server_client.go index 6ef3945b..e72f8e6c 100644 --- a/opamp/server_client.go +++ b/opamp/server_client.go @@ -14,6 +14,7 @@ import ( "github.com/open-telemetry/opamp-go/client" "github.com/open-telemetry/opamp-go/client/types" "github.com/open-telemetry/opamp-go/protobufs" + "go.uber.org/multierr" "go.uber.org/zap" ) @@ -28,10 +29,10 @@ import ( // 4. Applying the remote configuration to the agent // 5. Sending the updated agent configuration to the Opamp server type serverClient struct { + baseClient logger *zap.Logger opampClient client.OpAMPClient configManager *agentConfigManager - collector *signozcol.WrappedCollector managerConfig AgentManagerConfig instanceId ulid.ULID } @@ -51,9 +52,14 @@ func NewServerClient(args *NewServerClientOpts) (Client, error) { configManager := NewAgentConfigManager(args.Logger) svrClient := &serverClient{ + baseClient: baseClient{ + coll: args.WrappedCollector, + err: make(chan error, 1), + stopped: make(chan bool), + logger: clientLogger, + }, logger: clientLogger, configManager: configManager, - collector: args.WrappedCollector, managerConfig: *args.Config, } @@ -147,23 +153,23 @@ func (s *serverClient) Start(ctx context.Context) error { s.logger.Error("Error while starting opamp client", zap.Error(err)) return err } - return s.collector.Run(ctx) + err = s.coll.Run(ctx) + if err != nil { + return err + } + go s.ensureRunning() + return nil } // Stop stops the Opamp client // It stops the Opamp client and disconnects from the Opamp server func (s *serverClient) Stop(ctx context.Context) error { - s.collector.Shutdown() - return s.opampClient.Stop(ctx) -} - -func (s *serverClient) Error() error { - var err error - select { - case err = <-s.collector.ErrorChan(): - default: - } - return err + s.logger.Info("Stopping OpAMP server client") + close(s.stopped) + s.coll.Shutdown() + opampErr := s.opampClient.Stop(ctx) + collErr := <-s.coll.ErrorChan() + return multierr.Combine(opampErr, collErr) } // onMessageFuncHandler is the callback function that is called when the Opamp client receives a message from the Opamp server @@ -223,14 +229,14 @@ func (s *serverClient) reload(contents []byte) error { return fmt.Errorf("failed to update config file %s: %w", collectorConfigPath, err) } - if err := s.collector.Restart(context.Background()); err != nil { + if err := s.coll.Restart(context.Background()); err != nil { if rollbackErr := rollbackFunc(); rollbackErr != nil { s.logger.Error("Failed to rollbakc the config", zap.Error(rollbackErr)) } // Restart collector with original file - if rollbackErr := s.collector.Restart(context.Background()); rollbackErr != nil { + if rollbackErr := s.coll.Restart(context.Background()); rollbackErr != nil { s.logger.Error("Collector failed for restart during rollback", zap.Error(rollbackErr)) } diff --git a/opamp/simple_client.go b/opamp/simple_client.go index 5402ac40..8fba3b2b 100644 --- a/opamp/simple_client.go +++ b/opamp/simple_client.go @@ -4,30 +4,37 @@ import ( "context" "github.com/SigNoz/signoz-otel-collector/signozcol" + "go.uber.org/zap" ) type simpleClient struct { - coll *signozcol.WrappedCollector + baseClient } -func NewSimpleClient(coll *signozcol.WrappedCollector) *simpleClient { - return &simpleClient{coll: coll} +func NewSimpleClient(coll *signozcol.WrappedCollector, logger *zap.Logger) *simpleClient { + return &simpleClient{ + baseClient: baseClient{ + coll: coll, + err: make(chan error), + stopped: make(chan bool), + logger: logger.With(zap.String("component", "simple-client")), + }, + } } func (c simpleClient) Start(ctx context.Context) error { - return c.coll.Run(ctx) + c.logger.Info("Starting simple client") + err := c.coll.Run(ctx) + if err != nil { + return err + } + go c.ensureRunning() + return nil } func (c simpleClient) Stop(ctx context.Context) error { + c.logger.Info("Stopping simple client") + close(c.stopped) c.coll.Shutdown() - return c.Error() -} - -func (c simpleClient) Error() error { - var err error - select { - case err = <-c.coll.ErrorChan(): - default: - } - return err + return <-c.coll.ErrorChan() } diff --git a/opamp/simple_client_test.go b/opamp/simple_client_test.go index 8e9a3757..e81499c2 100644 --- a/opamp/simple_client_test.go +++ b/opamp/simple_client_test.go @@ -17,7 +17,7 @@ func TestNopClientWithCollector(t *testing.T) { LoggingOpts: []zap.Option{zap.AddStacktrace(zap.ErrorLevel)}, }) - client := NewSimpleClient(coll) + client := NewSimpleClient(coll, zap.NewNop()) err := client.Start(context.Background()) if err != nil { @@ -42,7 +42,7 @@ func TestNopClientWithCollectorError(t *testing.T) { LoggingOpts: []zap.Option{zap.AddStacktrace(zap.ErrorLevel)}, }) - client := NewSimpleClient(coll) + client := NewSimpleClient(coll, zap.NewNop()) err := client.Start(context.Background()) if err == nil { @@ -67,7 +67,7 @@ func TestNopClientWithCollectorErrorRead(t *testing.T) { LoggingOpts: []zap.Option{zap.AddStacktrace(zap.ErrorLevel)}, }) - client := NewSimpleClient(coll) + client := NewSimpleClient(coll, zap.NewNop()) err := client.Start(context.Background()) if err == nil { @@ -77,39 +77,4 @@ func TestNopClientWithCollectorErrorRead(t *testing.T) { if coll.GetState() != otelcol.StateClosed { t.Errorf("expected collector to be in closed state") } - - err = client.Error() - if err == nil { - t.Errorf("expected error") - } -} - -func TestNopClientWithCollectorErrorChanMultipleTime(t *testing.T) { - coll := signozcol.New(signozcol.WrappedCollectorSettings{ - ConfigPaths: []string{"testdata/invalid.yaml"}, - Version: "0.0.1", - Desc: "test", - LoggingOpts: []zap.Option{zap.AddStacktrace(zap.ErrorLevel)}, - }) - - client := NewSimpleClient(coll) - - err := client.Start(context.Background()) - if err == nil { - t.Errorf("expected error") - } - - if coll.GetState() != otelcol.StateClosed { - t.Errorf("expected collector to be in closed state") - } - - err = client.Error() - if err == nil { - t.Errorf("expected error") - } - - err = client.Error() - if err != nil { - t.Errorf("expected no error") - } } diff --git a/service/service.go b/service/service.go index 57a5e66d..cf632221 100644 --- a/service/service.go +++ b/service/service.go @@ -19,6 +19,7 @@ import ( type Service interface { Start(ctx context.Context) error Shutdown(ctx context.Context) error + Error() <-chan error } type service struct { @@ -33,7 +34,7 @@ func New(wrappedCollector *signozcol.WrappedCollector, logger *zap.Logger, manag // Running without Opamp if managerConfigPath == "" { - client = opamp.NewSimpleClient(wrappedCollector) + client = opamp.NewSimpleClient(wrappedCollector, logger) } else { managerConfig, err := opamp.ParseAgentManagerConfig(managerConfigPath) // Invalid config file @@ -47,6 +48,9 @@ func New(wrappedCollector *signozcol.WrappedCollector, logger *zap.Logger, manag CollectorConfgPath: collectorConfigPath, } client, err = opamp.NewServerClient(serverClientOpts) + if err != nil { + return nil, fmt.Errorf("failed to create server client: %w", err) + } } return &service{ @@ -55,18 +59,27 @@ func New(wrappedCollector *signozcol.WrappedCollector, logger *zap.Logger, manag }, err } -// Start starts the Opamp connection and collector +// Start starts the (Opamp connection and) collector func (s *service) Start(ctx context.Context) error { + s.l.Info("Starting service") if err := s.client.Start(ctx); err != nil { return fmt.Errorf("failed to start : %w", err) } + s.l.Info("Client started successfully") return nil } -// Shutdown stops the Opamp connection and collector +// Shutdown stops the (Opamp connection and) collector func (s *service) Shutdown(ctx context.Context) error { + s.l.Info("Shutting down service") if err := s.client.Stop(ctx); err != nil { return fmt.Errorf("failed to stop: %w", err) } + s.l.Info("Client stopped successfully") return nil } + +// Error returns the error channel +func (s *service) Error() <-chan error { + return s.client.Error() +} diff --git a/signozcol/collector.go b/signozcol/collector.go index e1b1a6ab..a9dbb299 100644 --- a/signozcol/collector.go +++ b/signozcol/collector.go @@ -24,31 +24,42 @@ import ( // On restart, the collector is stopped and a new instance is started. // The Opamp client implementation is responsible for restarting the collector. type WrappedCollector struct { - configPaths []string - version string - desc string - loggingOpts []zap.Option - wg sync.WaitGroup - errChan chan error - mux sync.Mutex - svc *otelcol.Collector + configPaths []string + version string + desc string + loggingOpts []zap.Option + wg sync.WaitGroup + errChan chan error + mux sync.Mutex + svc *otelcol.Collector + logger *zap.Logger + PollInterval time.Duration } type WrappedCollectorSettings struct { - ConfigPaths []string - Version string - Desc string - LoggingOpts []zap.Option + ConfigPaths []string + Version string + Desc string + LoggingOpts []zap.Option + PollInterval time.Duration + Logger *zap.Logger } +var StateUknown = otelcol.State(-1) + // New returns a new collector. func New(settings WrappedCollectorSettings) *WrappedCollector { + if settings.Logger == nil { + settings.Logger = zap.NewNop() + } return &WrappedCollector{ - configPaths: settings.ConfigPaths, - version: settings.Version, - desc: settings.Desc, - loggingOpts: settings.LoggingOpts, - errChan: make(chan error, 1), + configPaths: settings.ConfigPaths, + version: settings.Version, + desc: settings.Desc, + loggingOpts: settings.LoggingOpts, + PollInterval: settings.PollInterval, + errChan: make(chan error, 1), + logger: settings.Logger, } } @@ -65,6 +76,7 @@ func (wCol *WrappedCollector) Run(ctx context.Context) error { if err != nil { return err } + wCol.logger.Debug("Created new settings for collector", zap.Any("settings", settings)) // Create a new instance of collector to be used svc, err := otelcol.NewCollector(*settings) @@ -87,6 +99,7 @@ func (wCol *WrappedCollector) Run(ctx context.Context) error { wCol.wg.Add(1) go func() { defer wCol.wg.Done() + wCol.logger.Info("Starting collector service") err := svc.Run(ctx) // https://github.com/open-telemetry/opentelemetry-collector/blob/release/v0.66.x/service/collector.go#L124 // @@ -104,6 +117,7 @@ func (wCol *WrappedCollector) Run(ctx context.Context) error { for { state := svc.GetState() if state == otelcol.StateRunning { + wCol.logger.Info("Collector service is running") // TODO: collector may panic or exit unexpectedly, need to handle that colErrorChannel <- nil break @@ -127,6 +141,7 @@ func (wCol *WrappedCollector) Run(ctx context.Context) error { // Shutdown stops the collector. func (wCol *WrappedCollector) Shutdown() { + wCol.logger.Info("Shutting down collector service") wCol.mux.Lock() defer wCol.mux.Unlock() @@ -134,6 +149,7 @@ func (wCol *WrappedCollector) Shutdown() { wCol.svc.Shutdown() wCol.wg.Wait() wCol.svc = nil + wCol.logger.Info("Collector service is shut down") } } @@ -146,6 +162,7 @@ func (wCol *WrappedCollector) reportError(err error) { // Restart restarts the collector. func (wCol *WrappedCollector) Restart(ctx context.Context) error { + wCol.logger.Info("Restarting collector service") wCol.Shutdown() return wCol.Run(ctx) } @@ -161,7 +178,7 @@ func (wCol *WrappedCollector) GetState() otelcol.State { if wCol.svc != nil { return wCol.svc.GetState() } - return otelcol.StateClosed + return StateUknown } func newOtelColSettings(configPaths []string, version string, desc string, loggingOpts []zap.Option) (*otelcol.CollectorSettings, error) {