Skip to content

Commit

Permalink
Wire up the components with main (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv authored Sep 26, 2023
1 parent e251846 commit 24cf1da
Show file tree
Hide file tree
Showing 16 changed files with 402 additions and 190 deletions.
128 changes: 110 additions & 18 deletions cmd/signozcollector/main.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,134 @@
package main

import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/SigNoz/signoz-otel-collector/components"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/otelcol"
"github.com/SigNoz/signoz-otel-collector/constants"
signozcolFeatureGate "github.com/SigNoz/signoz-otel-collector/featuregate"
"github.com/SigNoz/signoz-otel-collector/service"
"github.com/SigNoz/signoz-otel-collector/signozcol"
flag "github.com/spf13/pflag"
otelcolFeatureGate "go.opentelemetry.io/collector/featuregate"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

func main() {

factories, err := components.Components()
// Command line flags
f := flag.NewFlagSet("Collector CLI Options", flag.ExitOnError)

f.Usage = func() {
fmt.Println(f.FlagUsages())
os.Exit(0)
}

f.String("config", "", "File path for the collector configuration")
f.String("manager-config", "", "File path for the agent manager configuration")
f.String("copy-path", "/etc/otel/signozcol-config.yaml", "File path for the copied collector configuration")
f.Var(signozcolFeatureGate.NewFlag(otelcolFeatureGate.GlobalRegistry()), "feature-gates",
"Comma-delimited list of feature gate identifiers. Prefix with '-' to disable the feature. '+' or no prefix will enable the feature.")
err := f.Parse(os.Args[1:])
if err != nil {
log.Fatalf("failed to build default components: %v", err)
log.Fatalf("Failed to parse args %v", err)
}

info := component.BuildInfo{
Command: "signoz-otel-collector",
Description: "SigNoz OTEL Collector",
Version: "latest",
logger, err := initZapLog()
if err != nil {
log.Fatalf("failed to initialize zap logger: %v", err)
}

params := otelcol.CollectorSettings{
Factories: factories,
BuildInfo: info,
collectorConfig, _ := f.GetString("config")
managerConfig, _ := f.GetString("manager-config")
copyPath, _ := f.GetString("copy-path")
if managerConfig != "" {
if err := copyConfigFile(collectorConfig, copyPath); err != nil {
logger.Fatal("Failed to copy config file %v", zap.Error(err))
}
collectorConfig = copyPath
}

if err := run(params); err != nil {
log.Fatal(err)
ctx := context.Background()

coll := signozcol.New(
signozcol.WrappedCollectorSettings{
ConfigPaths: []string{collectorConfig},
Version: constants.Version,
Desc: constants.Desc,
LoggingOpts: []zap.Option{zap.WithCaller(true)},
PollInterval: 200 * time.Millisecond,
},
)

svc, err := service.New(coll, logger, managerConfig, collectorConfig)
if err != nil {
logger.Fatal("failed to create collector service:", zap.Error(err))
}

ctx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer cancel()

if err := runInteractive(ctx, logger, svc); err != nil {
logger.Fatal("failed to run service:", zap.Error(err))
}
}

func runInteractive(params otelcol.CollectorSettings) error {
cmd := otelcol.NewCommand(params)
err := cmd.Execute()
func runInteractive(ctx context.Context, logger *zap.Logger, svc service.Service) error {
if err := svc.Start(ctx); err != nil {
return fmt.Errorf("failed to start collector service: %w", err)
}

// Wait for context done or service error
select {
case <-ctx.Done():
logger.Info("Context done, shutting down...")
case err := <-svc.Error():
logger.Error("Service error, shutting down...", zap.Error(err))
}

stopTimeoutCtx, stopCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer stopCancel()

if err := svc.Shutdown(stopTimeoutCtx); err != nil {
return fmt.Errorf("failed to stop service: %w", err)
}

return nil
}

func initZapLog() (*zap.Logger, error) {
config := zap.NewProductionConfig()
config.EncoderConfig.EncodeLevel = zapcore.LowercaseLevelEncoder
config.EncoderConfig.TimeKey = "timestamp"
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
logger, err := config.Build()
return logger, err
}

func copyConfigFile(configPath string, copyPath string) error {
// Check if file exists
if _, err := os.Stat(configPath); os.IsNotExist(err) {
return fmt.Errorf("config file %s does not exist", configPath)
}

return copy(configPath, copyPath)
}

func copy(src, dest string) error {
data, err := os.ReadFile(src)
if err != nil {
return fmt.Errorf("failed to read source file %s: %w", src, err)
}

err = os.WriteFile(dest, data, 0600)
if err != nil {
return fmt.Errorf("application run finished with error: %w", err)
return fmt.Errorf("failed to write to dest file %s: %w", dest, err)
}

return nil
Expand Down
7 changes: 0 additions & 7 deletions cmd/signozcollector/main_others.go

This file was deleted.

1 change: 1 addition & 0 deletions config/default-manager-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
server_endpoint: ws://127.0.0.1:4320/v1/opamp
5 changes: 5 additions & 0 deletions constants/os_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ package constants

import "os"

// Version is the current version of the collector.
// This is set at build time.
var Version = "dev"
var Desc = "SigNoz OpenTelemetry Collector"

// AllowLbExporterConfig enables lb exporter capability in the collector instance
var SupportLbExporterConfig = GetOrDefaultEnv("SUPPORT_LB_EXPORTER_CONFIG", "1")

Expand Down
58 changes: 58 additions & 0 deletions featuregate/flag.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package featuregate

import (
"strings"

flag "github.com/spf13/pflag"
"go.opentelemetry.io/collector/featuregate"

"go.uber.org/multierr"
)

// NewFlag returns a flag.Value that directly applies feature gate statuses to a Registry.
func NewFlag(reg *featuregate.Registry) flag.Value {
return &flagValue{reg: reg}
}

// flagValue implements the flag.Value interface and directly applies feature gate statuses to a Registry.
type flagValue struct {
reg *featuregate.Registry
}

func (f *flagValue) String() string {
var ids []string
f.reg.VisitAll(func(g *featuregate.Gate) {
id := g.ID()
if !g.IsEnabled() {
id = "-" + id
}
ids = append(ids, id)
})
return strings.Join(ids, ",")
}

func (f *flagValue) Set(s string) error {
if s == "" {
return nil
}

var errs error
ids := strings.Split(s, ",")
for i := range ids {
id := ids[i]
val := true
switch id[0] {
case '-':
id = id[1:]
val = false
case '+':
id = id[1:]
}
errs = multierr.Append(errs, f.reg.Set(id, val))
}
return errs
}

func (f *flagValue) Type() string {
return "featuregate"
}
50 changes: 48 additions & 2 deletions opamp/client.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,57 @@
package opamp

import "context"
import (
"context"
"fmt"
"time"

"github.com/SigNoz/signoz-otel-collector/signozcol"
"go.opentelemetry.io/collector/otelcol"
"go.uber.org/zap"
)

type Client interface {
Start(ctx context.Context) error

Stop(ctx context.Context) error

Error() error
Error() <-chan error
}

type baseClient struct {
err chan error
stopped chan bool
coll *signozcol.WrappedCollector
logger *zap.Logger
}

// Error returns the error channel
func (c baseClient) Error() <-chan error {
return c.err
}

// ensureRunning checks if the collector is running
// and sends an error to the error channel if it is not
// running
//
// The error channel is used to signal the main function
// to shutdown the service
//
// The collector may stop running unexpectedly. This can
// happen if a component reports a fatal error or some other
// async error occurs
// See https://github.com/open-telemetry/opentelemetry-collector/blob/8d425480b0dd1270b408582d9e21dd644299cd7e/service/host.go#L34-L39
func (c baseClient) ensureRunning() {
c.logger.Info("Ensuring collector is running")
for {
select {
case <-c.stopped:
c.logger.Info("Collector is stopped")
return
case <-time.After(c.coll.PollInterval):
if c.coll.GetState() == otelcol.StateClosed {
c.err <- fmt.Errorf("collector stopped unexpectedly")
}
}
}
}
35 changes: 26 additions & 9 deletions opamp/config_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,28 @@ var k = koanf.New("::")
// 2. Reloading the agent configuration when the file changes
// 3. Providing the current agent configuration to the Opamp client
type agentConfigManager struct {
agentConfig *remoteControlledConfig
logger *zap.Logger
agentConfig *remoteControlledConfig
logger *zap.Logger
initialConfigReceived bool
}

type reloadFunc func([]byte) error

type remoteControlledConfig struct {
path string

reloader reloadFunc

currentHash []byte
path string // path to the agent config file
reloader reloadFunc // function to reload the agent config
currentHash []byte // hash of the current agent config, used to determine if the config has changed
logger *zap.Logger
}

func NewDynamicConfig(configPath string, reloader reloadFunc) (*remoteControlledConfig, error) {
func NewDynamicConfig(configPath string, reloader reloadFunc, logger *zap.Logger) (*remoteControlledConfig, error) {
if logger == nil {
logger = zap.NewNop()
}
remoteControlledConfig := &remoteControlledConfig{
path: configPath,
reloader: reloader,
logger: logger.Named("dynamic-config"),
}

err := remoteControlledConfig.UpsertInstanceID()
Expand Down Expand Up @@ -82,6 +86,7 @@ service:
if err := os.WriteFile(m.path, bytes, 0644); err != nil {
return fmt.Errorf("failed to write config file %s: %w", m.path, err)
}
m.logger.Info("Added instance id to config file", zap.String("instance_id", instanceID))
return nil
}

Expand All @@ -96,6 +101,9 @@ func (m *remoteControlledConfig) UpdateCurrentHash() error {
}

func NewAgentConfigManager(logger *zap.Logger) *agentConfigManager {
if logger == nil {
logger = zap.NewNop()
}
return &agentConfigManager{
logger: logger.Named("agent-config-manager"),
}
Expand Down Expand Up @@ -137,12 +145,14 @@ func (a *agentConfigManager) Apply(remoteConfig *protobufs.AgentRemoteConfig) (b
remoteConfigMap := remoteConfig.GetConfig().GetConfigMap()

if remoteConfigMap == nil {
a.logger.Debug("No remote config received")
return false, nil
}

remoteCollectorConfig, ok := remoteConfigMap[collectorConfigKey]

if !ok {
a.logger.Info("No remote collector config found with key", zap.String("key", collectorConfigKey))
return false, nil
}

Expand All @@ -153,10 +163,13 @@ func (a *agentConfigManager) Apply(remoteConfig *protobufs.AgentRemoteConfig) (b
func (a *agentConfigManager) applyRemoteConfig(currentConfig *remoteControlledConfig, newContents []byte) (changed bool, err error) {
newConfigHash := fileHash(newContents)

if bytes.Equal(currentConfig.currentHash, newConfigHash) {
// Always reload the config if this is the first config received.
if a.initialConfigReceived && bytes.Equal(currentConfig.currentHash, newConfigHash) {
a.logger.Info("Config has not changed")
return false, nil
}

a.logger.Info("Config has changed, reloading", zap.String("path", currentConfig.path))
err = currentConfig.reloader(newContents)
if err != nil {
return false, fmt.Errorf("failed to reload config: %s: %w", currentConfig.path, err)
Expand All @@ -168,5 +181,9 @@ func (a *agentConfigManager) applyRemoteConfig(currentConfig *remoteControlledCo
return true, err
}

if !a.initialConfigReceived {
a.initialConfigReceived = true
}

return true, nil
}
Loading

0 comments on commit 24cf1da

Please sign in to comment.