Skip to content

Commit

Permalink
WIP: copy the initial config and wait for first server response
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv committed Aug 3, 2023
1 parent eaa2a76 commit e6ba0f9
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 17 deletions.
47 changes: 39 additions & 8 deletions cmd/signozcollector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ import (
"go.uber.org/zap/zapcore"
)

var Version = "dev"
var Desc = "SigNoz OpenTelemetry Collector"

const copyPath = "/etc/signozcol-config.yaml"

func main() {

// Command line flags
Expand All @@ -33,23 +38,25 @@ func main() {
log.Fatalf("Failed to parse args %v", err)
}

collectorConfig, _ := f.GetString("config")
managerConfig, _ := f.GetString("manager-config")

// logger, err := zap.NewProduction()
logger, err := initZapLog()
if err != nil {
log.Fatalf("failed to initialize zap logger: %v", err)
}

collectorConfig, _ := f.GetString("config")
if err := copyConfigFile(collectorConfig); err != nil {
logger.Fatal("Failed to copy config file %v", zap.Error(err))
}
managerConfig, _ := f.GetString("manager-config")
fmt.Println("managerConfig", managerConfig)

ctx := context.Background()

coll := signozcol.New(
signozcol.WrappedCollectorSettings{
ConfigPaths: []string{collectorConfig},
// TODO: Build version from git tag
Version: "0.66.5",
Desc: "SigNoz OpenTelemetry Collector",
ConfigPaths: []string{copyPath},
Version: Version,
Desc: Desc,
LoggingOpts: []zap.Option{zap.WithCaller(true)},
PollInterval: 200 * time.Millisecond,
},
Expand Down Expand Up @@ -99,3 +106,27 @@ func initZapLog() (*zap.Logger, error) {
logger, err := config.Build()
return logger, err
}

func copyConfigFile(configPath string) error {
// Check if file exists
if _, err := os.Stat(configPath); os.IsNotExist(err) {
return fmt.Errorf("config file %s does not exist", configPath)
}

copy(configPath, copyPath)
return nil
}

func copy(src, dest string) error {
data, err := os.ReadFile(src)
if err != nil {
return fmt.Errorf("failed to read source file %s: %w", src, err)
}

err = os.WriteFile(dest, data, 0600)
if err != nil {
return fmt.Errorf("failed to write to dest file %s: %w", dest, err)
}

return nil
}
23 changes: 14 additions & 9 deletions opamp/server_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ import (
// 5. Sending the updated agent configuration to the Opamp server
type serverClient struct {
baseClient
logger *zap.Logger
opampClient client.OpAMPClient
configManager *agentConfigManager
managerConfig AgentManagerConfig
instanceId ulid.ULID
logger *zap.Logger
opampClient client.OpAMPClient
configManager *agentConfigManager
managerConfig AgentManagerConfig
instanceId ulid.ULID
receivedInitialConfig bool
}

type NewServerClientOpts struct {
Expand Down Expand Up @@ -153,14 +154,17 @@ func (s *serverClient) Start(ctx context.Context) error {
s.logger.Error("Error while starting opamp client", zap.Error(err))
return err
}
err = s.coll.Run(ctx)
if err != nil {
return err
}
s.waitForInitialRemoteConfig()
go s.ensureRunning()
return nil
}

func (s *serverClient) waitForInitialRemoteConfig() {
for !s.receivedInitialConfig {
time.Sleep(1 * time.Second)
}
}

// Stop stops the Opamp client
// It stops the Opamp client and disconnects from the Opamp server
func (s *serverClient) Stop(ctx context.Context) error {
Expand All @@ -175,6 +179,7 @@ func (s *serverClient) Stop(ctx context.Context) error {
// onMessageFuncHandler is the callback function that is called when the Opamp client receives a message from the Opamp server
func (s *serverClient) onMessageFuncHandler(ctx context.Context, msg *types.MessageData) {
if msg.RemoteConfig != nil {
s.receivedInitialConfig = true
if err := s.onRemoteConfigHandler(ctx, msg.RemoteConfig); err != nil {
s.logger.Error("error while onRemoteConfigHandler", zap.Error(err))
}
Expand Down
3 changes: 3 additions & 0 deletions signozcol/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ func (wCol *WrappedCollector) Shutdown() {
wCol.wg.Wait()
wCol.svc = nil
wCol.logger.Info("Collector service is shut down")
} else {
wCol.logger.Info("Collector service is not running")
wCol.errChan <- nil
}
}

Expand Down

0 comments on commit e6ba0f9

Please sign in to comment.