Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wire up the components with main #59

Merged
merged 39 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
ad0228c
Add wrapper collector component
srikanthccv Dec 21, 2022
3dc7cb7
fix shutdown
srikanthccv Dec 21, 2022
75b7edb
Add simple client implementation
srikanthccv Dec 21, 2022
8ef991b
Add interfaces/types with documentation
srikanthccv Dec 22, 2022
8b92e37
merge init
srikanthccv Dec 23, 2022
dd0b0a5
Merge branch 'wrapped-collector' into simple-client
srikanthccv Dec 23, 2022
a1802fd
Add config manager
srikanthccv Dec 23, 2022
f8a65da
Add server client implementation
srikanthccv Dec 24, 2022
8e38599
Add service implementation
srikanthccv Dec 24, 2022
10d0b83
Wire up the components with main
srikanthccv Dec 24, 2022
fcce474
more error handling
srikanthccv Dec 25, 2022
44c4380
Update service/service.go
srikanthccv Dec 26, 2022
7daf1b7
Merge branch 'main' into init
srikanthccv Dec 28, 2022
58bab0e
Merge branch 'init' into wrapped-collector
srikanthccv Dec 28, 2022
6fe1b3d
Add restart test
srikanthccv Dec 28, 2022
7fb5495
Merge branch 'wrapped-collector' into simple-client
srikanthccv Dec 28, 2022
f21ca80
Fix TODO item in simpleClient
srikanthccv Dec 28, 2022
db8daed
Merge branch 'simple-client' into config-manager
srikanthccv Dec 28, 2022
2b1d6dd
Add default ID when not provided
srikanthccv Dec 28, 2022
f9e91a6
Resolve conflicts
srikanthccv Dec 28, 2022
3e9b61c
Satisfy the interface
srikanthccv Dec 28, 2022
dc85816
Merge branch 'server-client' into service
srikanthccv Dec 28, 2022
d70a54d
Remove unnecessary fields
srikanthccv Dec 28, 2022
d35e2de
Merge branch 'service' into glue-up
srikanthccv Dec 28, 2022
cea18d7
Merge branch main into glue-up
srikanthccv Mar 6, 2023
165585e
Merge branch 'main' into glue-up
srikanthccv Mar 15, 2023
24c55bc
fix: handle asynchronous errors
srikanthccv Mar 20, 2023
27b85d1
Resolve conflicts
srikanthccv Aug 3, 2023
98e3eda
resolve conflicts
srikanthccv Aug 3, 2023
96c841c
more conflicts
srikanthccv Aug 3, 2023
eaa2a76
Add otelcol state
srikanthccv Aug 3, 2023
e6ba0f9
WIP: copy the initial config and wait for first server response
srikanthccv Aug 3, 2023
629a57b
Update tests
srikanthccv Aug 3, 2023
3daa0cc
Reload on initial config always
srikanthccv Aug 4, 2023
e347f52
Merge branch 'main' into glue-up
srikanthccv Aug 7, 2023
94a81f6
Address review comments
srikanthccv Aug 7, 2023
d1a5da2
Cleanup
srikanthccv Aug 7, 2023
aa78b88
Merge branch 'main' into glue-up
srikanthccv Sep 2, 2023
145e9c3
Merge branch 'main' into glue-up
srikanthccv Sep 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 110 additions & 18 deletions cmd/signozcollector/main.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,134 @@
package main

import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/SigNoz/signoz-otel-collector/components"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/otelcol"
"github.com/SigNoz/signoz-otel-collector/constants"
signozcolFeatureGate "github.com/SigNoz/signoz-otel-collector/featuregate"
"github.com/SigNoz/signoz-otel-collector/service"
"github.com/SigNoz/signoz-otel-collector/signozcol"
flag "github.com/spf13/pflag"
otelcolFeatureGate "go.opentelemetry.io/collector/featuregate"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

func main() {

factories, err := components.Components()
// Command line flags
f := flag.NewFlagSet("Collector CLI Options", flag.ExitOnError)

f.Usage = func() {
fmt.Println(f.FlagUsages())
os.Exit(0)
}

f.String("config", "", "File path for the collector configuration")
f.String("manager-config", "", "File path for the agent manager configuration")
f.String("copy-path", "/etc/otel/signozcol-config.yaml", "File path for the copied collector configuration")
f.Var(signozcolFeatureGate.NewFlag(otelcolFeatureGate.GlobalRegistry()), "feature-gates",
"Comma-delimited list of feature gate identifiers. Prefix with '-' to disable the feature. '+' or no prefix will enable the feature.")
err := f.Parse(os.Args[1:])
if err != nil {
log.Fatalf("failed to build default components: %v", err)
log.Fatalf("Failed to parse args %v", err)
}

info := component.BuildInfo{
Command: "signoz-otel-collector",
Description: "SigNoz OTEL Collector",
Version: "latest",
logger, err := initZapLog()
if err != nil {
log.Fatalf("failed to initialize zap logger: %v", err)
}

params := otelcol.CollectorSettings{
Factories: factories,
BuildInfo: info,
collectorConfig, _ := f.GetString("config")
managerConfig, _ := f.GetString("manager-config")
copyPath, _ := f.GetString("copy-path")
if managerConfig != "" {
if err := copyConfigFile(collectorConfig, copyPath); err != nil {
logger.Fatal("Failed to copy config file %v", zap.Error(err))
}
collectorConfig = copyPath
}

if err := run(params); err != nil {
log.Fatal(err)
ctx := context.Background()

coll := signozcol.New(
signozcol.WrappedCollectorSettings{
ConfigPaths: []string{collectorConfig},
Version: constants.Version,
Desc: constants.Desc,
LoggingOpts: []zap.Option{zap.WithCaller(true)},
PollInterval: 200 * time.Millisecond,
},
)

svc, err := service.New(coll, logger, managerConfig, collectorConfig)
if err != nil {
logger.Fatal("failed to create collector service:", zap.Error(err))
}

ctx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer cancel()

if err := runInteractive(ctx, logger, svc); err != nil {
logger.Fatal("failed to run service:", zap.Error(err))
}
}

func runInteractive(params otelcol.CollectorSettings) error {
cmd := otelcol.NewCommand(params)
err := cmd.Execute()
func runInteractive(ctx context.Context, logger *zap.Logger, svc service.Service) error {
if err := svc.Start(ctx); err != nil {
return fmt.Errorf("failed to start collector service: %w", err)
}

// Wait for context done or service error
select {
case <-ctx.Done():
logger.Info("Context done, shutting down...")
case err := <-svc.Error():
logger.Error("Service error, shutting down...", zap.Error(err))
}

stopTimeoutCtx, stopCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer stopCancel()

if err := svc.Shutdown(stopTimeoutCtx); err != nil {
return fmt.Errorf("failed to stop service: %w", err)
}

return nil
}

func initZapLog() (*zap.Logger, error) {
config := zap.NewProductionConfig()
config.EncoderConfig.EncodeLevel = zapcore.LowercaseLevelEncoder
config.EncoderConfig.TimeKey = "timestamp"
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
logger, err := config.Build()
return logger, err
}

func copyConfigFile(configPath string, copyPath string) error {
// Check if file exists
if _, err := os.Stat(configPath); os.IsNotExist(err) {
return fmt.Errorf("config file %s does not exist", configPath)
}

return copy(configPath, copyPath)
}

func copy(src, dest string) error {
data, err := os.ReadFile(src)
if err != nil {
return fmt.Errorf("failed to read source file %s: %w", src, err)
}

err = os.WriteFile(dest, data, 0600)
if err != nil {
return fmt.Errorf("application run finished with error: %w", err)
return fmt.Errorf("failed to write to dest file %s: %w", dest, err)
}

return nil
Expand Down
7 changes: 0 additions & 7 deletions cmd/signozcollector/main_others.go

This file was deleted.

1 change: 1 addition & 0 deletions config/default-manager-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
server_endpoint: ws://127.0.0.1:4320/v1/opamp
5 changes: 5 additions & 0 deletions constants/os_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ package constants

import "os"

// Version is the current version of the collector.
// This is set at build time.
var Version = "dev"
var Desc = "SigNoz OpenTelemetry Collector"

// AllowLbExporterConfig enables lb exporter capability in the collector instance
var SupportLbExporterConfig = GetOrDefaultEnv("SUPPORT_LB_EXPORTER_CONFIG", "1")

Expand Down
58 changes: 58 additions & 0 deletions featuregate/flag.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package featuregate

import (
"strings"

flag "github.com/spf13/pflag"
"go.opentelemetry.io/collector/featuregate"

"go.uber.org/multierr"
)

// NewFlag returns a flag.Value that directly applies feature gate statuses to a Registry.
func NewFlag(reg *featuregate.Registry) flag.Value {
return &flagValue{reg: reg}
}

// flagValue implements the flag.Value interface and directly applies feature gate statuses to a Registry.
type flagValue struct {
reg *featuregate.Registry
}

func (f *flagValue) String() string {
var ids []string
f.reg.VisitAll(func(g *featuregate.Gate) {
id := g.ID()
if !g.IsEnabled() {
id = "-" + id
}
ids = append(ids, id)
})
return strings.Join(ids, ",")
}

func (f *flagValue) Set(s string) error {
if s == "" {
return nil
}

var errs error
ids := strings.Split(s, ",")
for i := range ids {
id := ids[i]
val := true
switch id[0] {
case '-':
id = id[1:]
val = false
case '+':
id = id[1:]
}
errs = multierr.Append(errs, f.reg.Set(id, val))
}
return errs
}

func (f *flagValue) Type() string {
return "featuregate"
}
50 changes: 48 additions & 2 deletions opamp/client.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,57 @@
package opamp

import "context"
import (
"context"
"fmt"
"time"

"github.com/SigNoz/signoz-otel-collector/signozcol"
"go.opentelemetry.io/collector/otelcol"
"go.uber.org/zap"
)

type Client interface {
Start(ctx context.Context) error

Stop(ctx context.Context) error

Error() error
Error() <-chan error
}

type baseClient struct {
err chan error
stopped chan bool
coll *signozcol.WrappedCollector
logger *zap.Logger
}

// Error returns the error channel
func (c baseClient) Error() <-chan error {
return c.err
}

// ensureRunning checks if the collector is running
// and sends an error to the error channel if it is not
// running
//
// The error channel is used to signal the main function
// to shutdown the service
//
// The collector may stop running unexpectedly. This can
// happen if a component reports a fatal error or some other
// async error occurs
// See https://github.com/open-telemetry/opentelemetry-collector/blob/8d425480b0dd1270b408582d9e21dd644299cd7e/service/host.go#L34-L39
func (c baseClient) ensureRunning() {
c.logger.Info("Ensuring collector is running")
for {
select {
case <-c.stopped:
c.logger.Info("Collector is stopped")
return
case <-time.After(c.coll.PollInterval):
if c.coll.GetState() == otelcol.StateClosed {
c.err <- fmt.Errorf("collector stopped unexpectedly")
}
}
}
}
35 changes: 26 additions & 9 deletions opamp/config_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,28 @@ var k = koanf.New("::")
// 2. Reloading the agent configuration when the file changes
// 3. Providing the current agent configuration to the Opamp client
type agentConfigManager struct {
agentConfig *remoteControlledConfig
logger *zap.Logger
agentConfig *remoteControlledConfig
logger *zap.Logger
initialConfigReceived bool
}

type reloadFunc func([]byte) error

type remoteControlledConfig struct {
path string

reloader reloadFunc

currentHash []byte
path string // path to the agent config file
reloader reloadFunc // function to reload the agent config
currentHash []byte // hash of the current agent config, used to determine if the config has changed
logger *zap.Logger
}

func NewDynamicConfig(configPath string, reloader reloadFunc) (*remoteControlledConfig, error) {
func NewDynamicConfig(configPath string, reloader reloadFunc, logger *zap.Logger) (*remoteControlledConfig, error) {
if logger == nil {
logger = zap.NewNop()
}
remoteControlledConfig := &remoteControlledConfig{
path: configPath,
reloader: reloader,
logger: logger.Named("dynamic-config"),
}

err := remoteControlledConfig.UpsertInstanceID()
Expand Down Expand Up @@ -82,6 +86,7 @@ service:
if err := os.WriteFile(m.path, bytes, 0644); err != nil {
return fmt.Errorf("failed to write config file %s: %w", m.path, err)
}
m.logger.Info("Added instance id to config file", zap.String("instance_id", instanceID))
return nil
}

Expand All @@ -96,6 +101,9 @@ func (m *remoteControlledConfig) UpdateCurrentHash() error {
}

func NewAgentConfigManager(logger *zap.Logger) *agentConfigManager {
if logger == nil {
logger = zap.NewNop()
}
return &agentConfigManager{
logger: logger.Named("agent-config-manager"),
}
Expand Down Expand Up @@ -137,12 +145,14 @@ func (a *agentConfigManager) Apply(remoteConfig *protobufs.AgentRemoteConfig) (b
remoteConfigMap := remoteConfig.GetConfig().GetConfigMap()

if remoteConfigMap == nil {
a.logger.Debug("No remote config received")
return false, nil
}

remoteCollectorConfig, ok := remoteConfigMap[collectorConfigKey]

if !ok {
a.logger.Info("No remote collector config found with key", zap.String("key", collectorConfigKey))
return false, nil
}

Expand All @@ -153,10 +163,13 @@ func (a *agentConfigManager) Apply(remoteConfig *protobufs.AgentRemoteConfig) (b
func (a *agentConfigManager) applyRemoteConfig(currentConfig *remoteControlledConfig, newContents []byte) (changed bool, err error) {
newConfigHash := fileHash(newContents)

if bytes.Equal(currentConfig.currentHash, newConfigHash) {
// Always reload the config if this is the first config received.
if a.initialConfigReceived && bytes.Equal(currentConfig.currentHash, newConfigHash) {
a.logger.Info("Config has not changed")
return false, nil
}

a.logger.Info("Config has changed, reloading", zap.String("path", currentConfig.path))
err = currentConfig.reloader(newContents)
if err != nil {
return false, fmt.Errorf("failed to reload config: %s: %w", currentConfig.path, err)
Expand All @@ -168,5 +181,9 @@ func (a *agentConfigManager) applyRemoteConfig(currentConfig *remoteControlledCo
return true, err
}

if !a.initialConfigReceived {
a.initialConfigReceived = true
}

return true, nil
}
Loading
Loading