Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle asynchronous errors #116

Merged
merged 4 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ test-and-lint: test fmt lint

.PHONY: test
test:
go test -count=1 -v -race -cover ./...
go test --timeout=30s -count=1 -v -race -cover ./...

.PHONY: build
build:
Expand Down
15 changes: 10 additions & 5 deletions cmd/signozcollector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,23 @@ func main() {
signozcol.WrappedCollectorSettings{
ConfigPaths: []string{collectorConfig},
// TODO: Build version from git tag
Version: "0.63.0",
Desc: "SigNoz OpenTelemetry Collector",
LoggingOpts: []zap.Option{zap.WithCaller(true)},
Version: "0.66.5",
Desc: "SigNoz OpenTelemetry Collector",
LoggingOpts: []zap.Option{zap.WithCaller(true)},
PollInterval: 200 * time.Millisecond,
},
)

svc, err := service.New(coll, logger, managerConfig, collectorConfig)
if err != nil {
log.Fatalf("failed to create collector service: %v", err)
logger.Fatal("failed to create collector service:", zap.Error(err))
}

ctx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer cancel()

if err := runInteractive(ctx, logger, svc); err != nil {
log.Fatalf("failed to run service: %v", err)
logger.Fatal("failed to run service:", zap.Error(err))
}
}

Expand All @@ -72,8 +73,12 @@ func runInteractive(ctx context.Context, logger *zap.Logger, svc service.Service
return fmt.Errorf("failed to start collector service: %w", err)
}

// Wait for context done or service error
select {
case <-ctx.Done():
logger.Info("Context done, shutting down...")
case err := <-svc.Error():
logger.Error("Service error, shutting down...", zap.Error(err))
}

stopTimeoutCtx, stopCancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down
49 changes: 47 additions & 2 deletions opamp/client.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,56 @@
package opamp

import "context"
import (
"context"
"fmt"
"time"

"github.com/SigNoz/signoz-otel-collector/signozcol"
"go.opentelemetry.io/collector/otelcol"
"go.uber.org/zap"
)

type Client interface {
Start(ctx context.Context) error

Stop(ctx context.Context) error

Error() error
Error() <-chan error
}

type baseClient struct {
err chan error
stopped chan bool
coll *signozcol.WrappedCollector
logger *zap.Logger
}

// Error returns the error channel
func (c baseClient) Error() <-chan error {
return c.err
}

// ensureRunning checks if the collector is running
// and sends an error to the error channel if it is not
// running
//
// The error channel is used to signal the main function
// to shutdown the service
//
// The collector may stop running unexpectedly. This can
// happen if a component reports a fatal error or some other
// async error occurs
// See https://github.com/open-telemetry/opentelemetry-collector/blob/8d425480b0dd1270b408582d9e21dd644299cd7e/service/host.go#L34-L39
func (c baseClient) ensureRunning() {
c.logger.Info("Ensuring collector is running")
for {
select {
case <-c.stopped:
return
case <-time.After(c.coll.PollInterval):
if c.coll.GetState() == otelcol.StateClosed {
c.err <- fmt.Errorf("collector stopped unexpectedly")
}
}
}
}
38 changes: 22 additions & 16 deletions opamp/server_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/open-telemetry/opamp-go/client"
"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
"go.uber.org/multierr"
"go.uber.org/zap"
)

Expand All @@ -28,10 +29,10 @@ import (
// 4. Applying the remote configuration to the agent
// 5. Sending the updated agent configuration to the Opamp server
type serverClient struct {
baseClient
logger *zap.Logger
opampClient client.OpAMPClient
configManager *agentConfigManager
collector *signozcol.WrappedCollector
managerConfig AgentManagerConfig
instanceId ulid.ULID
}
Expand All @@ -51,9 +52,14 @@ func NewServerClient(args *NewServerClientOpts) (Client, error) {
configManager := NewAgentConfigManager(args.Logger)

svrClient := &serverClient{
baseClient: baseClient{
coll: args.WrappedCollector,
err: make(chan error, 1),
stopped: make(chan bool),
logger: clientLogger,
},
logger: clientLogger,
configManager: configManager,
collector: args.WrappedCollector,
managerConfig: *args.Config,
}

Expand Down Expand Up @@ -147,23 +153,23 @@ func (s *serverClient) Start(ctx context.Context) error {
s.logger.Error("Error while starting opamp client", zap.Error(err))
return err
}
return s.collector.Run(ctx)
err = s.coll.Run(ctx)
if err != nil {
return err
}
go s.ensureRunning()
return nil
}

// Stop stops the Opamp client
// It stops the Opamp client and disconnects from the Opamp server
func (s *serverClient) Stop(ctx context.Context) error {
s.collector.Shutdown()
return s.opampClient.Stop(ctx)
}

func (s *serverClient) Error() error {
var err error
select {
case err = <-s.collector.ErrorChan():
default:
}
return err
s.logger.Info("Stopping OpAMP server client")
close(s.stopped)
s.coll.Shutdown()
opampErr := s.opampClient.Stop(ctx)
collErr := <-s.coll.ErrorChan()
return multierr.Combine(opampErr, collErr)
}

// onMessageFuncHandler is the callback function that is called when the Opamp client receives a message from the Opamp server
Expand Down Expand Up @@ -223,14 +229,14 @@ func (s *serverClient) reload(contents []byte) error {
return fmt.Errorf("failed to update config file %s: %w", collectorConfigPath, err)
}

if err := s.collector.Restart(context.Background()); err != nil {
if err := s.coll.Restart(context.Background()); err != nil {

if rollbackErr := rollbackFunc(); rollbackErr != nil {
s.logger.Error("Failed to rollbakc the config", zap.Error(rollbackErr))
}

// Restart collector with original file
if rollbackErr := s.collector.Restart(context.Background()); rollbackErr != nil {
if rollbackErr := s.coll.Restart(context.Background()); rollbackErr != nil {
s.logger.Error("Collector failed for restart during rollback", zap.Error(rollbackErr))
}

Expand Down
35 changes: 21 additions & 14 deletions opamp/simple_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,37 @@ import (
"context"

"github.com/SigNoz/signoz-otel-collector/signozcol"
"go.uber.org/zap"
)

type simpleClient struct {
coll *signozcol.WrappedCollector
baseClient
}

func NewSimpleClient(coll *signozcol.WrappedCollector) *simpleClient {
return &simpleClient{coll: coll}
func NewSimpleClient(coll *signozcol.WrappedCollector, logger *zap.Logger) *simpleClient {
return &simpleClient{
baseClient: baseClient{
coll: coll,
err: make(chan error),
stopped: make(chan bool),
logger: logger.With(zap.String("component", "simple-client")),
},
}
}

func (c simpleClient) Start(ctx context.Context) error {
return c.coll.Run(ctx)
c.logger.Info("Starting simple client")
err := c.coll.Run(ctx)
if err != nil {
return err
}
go c.ensureRunning()
return nil
}

func (c simpleClient) Stop(ctx context.Context) error {
c.logger.Info("Stopping simple client")
close(c.stopped)
c.coll.Shutdown()
return c.Error()
}

func (c simpleClient) Error() error {
var err error
select {
case err = <-c.coll.ErrorChan():
default:
}
return err
return <-c.coll.ErrorChan()
}
41 changes: 3 additions & 38 deletions opamp/simple_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestNopClientWithCollector(t *testing.T) {
LoggingOpts: []zap.Option{zap.AddStacktrace(zap.ErrorLevel)},
})

client := NewSimpleClient(coll)
client := NewSimpleClient(coll, zap.NewNop())

err := client.Start(context.Background())
if err != nil {
Expand All @@ -42,7 +42,7 @@ func TestNopClientWithCollectorError(t *testing.T) {
LoggingOpts: []zap.Option{zap.AddStacktrace(zap.ErrorLevel)},
})

client := NewSimpleClient(coll)
client := NewSimpleClient(coll, zap.NewNop())

err := client.Start(context.Background())
if err == nil {
Expand All @@ -67,7 +67,7 @@ func TestNopClientWithCollectorErrorRead(t *testing.T) {
LoggingOpts: []zap.Option{zap.AddStacktrace(zap.ErrorLevel)},
})

client := NewSimpleClient(coll)
client := NewSimpleClient(coll, zap.NewNop())

err := client.Start(context.Background())
if err == nil {
Expand All @@ -77,39 +77,4 @@ func TestNopClientWithCollectorErrorRead(t *testing.T) {
if coll.GetState() != otelcol.StateClosed {
t.Errorf("expected collector to be in closed state")
}

err = client.Error()
if err == nil {
t.Errorf("expected error")
}
}

func TestNopClientWithCollectorErrorChanMultipleTime(t *testing.T) {
coll := signozcol.New(signozcol.WrappedCollectorSettings{
ConfigPaths: []string{"testdata/invalid.yaml"},
Version: "0.0.1",
Desc: "test",
LoggingOpts: []zap.Option{zap.AddStacktrace(zap.ErrorLevel)},
})

client := NewSimpleClient(coll)

err := client.Start(context.Background())
if err == nil {
t.Errorf("expected error")
}

if coll.GetState() != otelcol.StateClosed {
t.Errorf("expected collector to be in closed state")
}

err = client.Error()
if err == nil {
t.Errorf("expected error")
}

err = client.Error()
if err != nil {
t.Errorf("expected no error")
}
}
19 changes: 16 additions & 3 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
type Service interface {
Start(ctx context.Context) error
Shutdown(ctx context.Context) error
Error() <-chan error
}

type service struct {
Expand All @@ -33,7 +34,7 @@ func New(wrappedCollector *signozcol.WrappedCollector, logger *zap.Logger, manag

// Running without Opamp
if managerConfigPath == "" {
client = opamp.NewSimpleClient(wrappedCollector)
client = opamp.NewSimpleClient(wrappedCollector, logger)
} else {
managerConfig, err := opamp.ParseAgentManagerConfig(managerConfigPath)
// Invalid config file
Expand All @@ -47,6 +48,9 @@ func New(wrappedCollector *signozcol.WrappedCollector, logger *zap.Logger, manag
CollectorConfgPath: collectorConfigPath,
}
client, err = opamp.NewServerClient(serverClientOpts)
if err != nil {
return nil, fmt.Errorf("failed to create server client: %w", err)
}
}

return &service{
Expand All @@ -55,18 +59,27 @@ func New(wrappedCollector *signozcol.WrappedCollector, logger *zap.Logger, manag
}, err
}

// Start starts the Opamp connection and collector
// Start starts the (Opamp connection and) collector
func (s *service) Start(ctx context.Context) error {
s.l.Info("Starting service")
if err := s.client.Start(ctx); err != nil {
return fmt.Errorf("failed to start : %w", err)
}
s.l.Info("Client started successfully")
return nil
}

// Shutdown stops the Opamp connection and collector
// Shutdown stops the (Opamp connection and) collector
func (s *service) Shutdown(ctx context.Context) error {
s.l.Info("Shutting down service")
if err := s.client.Stop(ctx); err != nil {
return fmt.Errorf("failed to stop: %w", err)
}
s.l.Info("Client stopped successfully")
return nil
}

// Error returns the error channel
func (s *service) Error() <-chan error {
return s.client.Error()
}
Loading
Loading