Skip to content

Commit

Permalink
Update lda process metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Tzvonimir committed Oct 11, 2024
1 parent 3a5340f commit a5467ab
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 20 deletions.
77 changes: 61 additions & 16 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (

"github.com/rs/zerolog"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
)

// Config holds configuration for the client connection.
Expand All @@ -26,19 +28,37 @@ type Client struct {
conn *grpc.ClientConn
logger *zerolog.Logger
timeout time.Duration
config Config
}

// NewClient creates a new client and returns a pointer to it and an error
// NewClient creates a new client with connection management and returns a pointer to it and an error
func NewClient(config Config) (*Client, error) {
client := &Client{
logger: &logging.Log,
timeout: time.Duration(config.Timeout) * time.Second,
config: config,
}

// Establish the initial connection
err := client.connect()
if err != nil {
return nil, err
}

return client, nil
}

// connect handles connection establishment and configuration
func (c *Client) connect() error {
var opts []grpc.DialOption

// Setup connection security based on config
creds := grpc.WithTransportCredentials(insecure.NewCredentials())
if config.SecureConnection {
if config.CertFile != "" {
tlsFromFile, err := credentials.NewClientTLSFromFile(config.CertFile, "")
if c.config.SecureConnection {
if c.config.CertFile != "" {
tlsFromFile, err := credentials.NewClientTLSFromFile(c.config.CertFile, "")
if err != nil {
return nil, fmt.Errorf("failed to create TLS credentials: %w", err)
return fmt.Errorf("failed to create TLS credentials: %w", err)
}
creds = grpc.WithTransportCredentials(tlsFromFile)
} else {
Expand All @@ -47,28 +67,49 @@ func NewClient(config Config) (*Client, error) {
}
opts = append(opts, creds)

conn, err := grpc.Dial(config.Address, opts...)
// Adding keepalive parameters to manage connection health
keepAliveParams := grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second, // Ping the server every 10 seconds to keep the connection alive
Timeout: 5 * time.Second, // Wait 5 seconds for a pong before closing the connection
PermitWithoutStream: true, // Send pings even without active RPCs
})
opts = append(opts, keepAliveParams)

// Dial the server
conn, err := grpc.Dial(c.config.Address, opts...)
if err != nil {
return nil, fmt.Errorf("failed to connect to server: %w", err)
return fmt.Errorf("failed to connect to server: %w", err)
}

// Set a default timeout of 60 seconds if not provided
if config.Timeout == 0 {
config.Timeout = 60
}
// Set the connection on the client
c.conn = conn
return nil
}

client := &Client{
conn: conn,
logger: &logging.Log,
timeout: time.Duration(config.Timeout) * time.Second,
// Reconnect attempts to reconnect if the connection is down
func (c *Client) Reconnect() error {
if c.conn != nil {
c.Close()
}
return c.connect()
}

return client, nil
// CheckAndReconnect checks connection health and reconnects if necessary
func (c *Client) CheckAndReconnect() error {
if c.conn.GetState() == connectivity.TransientFailure || c.conn.GetState() == connectivity.Shutdown {
c.logger.Warn().Msg("Connection lost. Attempting to reconnect...")
return c.Reconnect()
}
return nil
}

// SendCommands sends a list of commands to the server
func (c *Client) SendCommands(commands []*gen.Command, auth *gen.Auth) error {

if err := c.CheckAndReconnect(); err != nil {
return fmt.Errorf("failed to reconnect: %w", err)
}

client := gen.NewCollectorServiceClient(c.conn)

req := &gen.SendCommandsRequest{
Expand All @@ -90,6 +131,10 @@ func (c *Client) SendCommands(commands []*gen.Command, auth *gen.Auth) error {
// SendProcesses sends a list of processes to the server
func (c *Client) SendProcesses(processes []*gen.Process, auth *gen.Auth) error {

if err := c.CheckAndReconnect(); err != nil {
return fmt.Errorf("failed to reconnect: %w", err)
}

client := gen.NewCollectorServiceClient(c.conn)

req := &gen.SendProcessesRequest{
Expand Down
1 change: 1 addition & 0 deletions cmd/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func collect(cmd *cobra.Command, _ []string) error {
UserID: config.AppConfig.UserID,
TeamID: config.AppConfig.TeamID,
WorkspaceID: config.AppConfig.WorkspaceID,
UserEmail: config.AppConfig.UserEmail,
}

if autoCredentials {
Expand Down
1 change: 1 addition & 0 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type AuthConfig struct {
TeamID string
UserID string
WorkspaceID string
UserEmail string
}

// collectionConfig contains the configuration for the collection process
Expand Down
4 changes: 4 additions & 0 deletions config/config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ secure_connection = true
# Default: (empty)
# user_id = ""

# Specifies the user identifier that will be used to make the collection of data for that user
# Default: (empty)
# user_email = ""

# Specifies the user identifier that will be used to make the collection of data for that workspace
# Default: (empty)
# workspace_id = ""
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type Config struct {
TeamID string `mapstructure:"team_id"`
// UserID is the user identifier for the workspace
UserID string `mapstructure:"user_id"`
// UserEmail is the user identifier for the workspace
UserEmail string `mapstructure:"user_email"`
// WorkspaceID is the workspace identifier
WorkspaceID string `mapstructure:"workspace_id"`
}
Expand Down
1 change: 1 addition & 0 deletions proto/api/v1/collector.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ message Auth {
string user_id = 1; // Unique identifer for user that is processing the data
string team_id = 2; // Unique identifier for users team
optional string workspace_id = 3; // Unique identifier of the Workspace that is running the request
string user_email = 4; // Unique identifier of user that is processing the data
}

// Define a message representing a command, including its metadata and timing information.
Expand Down
30 changes: 26 additions & 4 deletions user/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,11 @@ func ReadDZWorkspaceConfig() (collector.AuthConfig, error) {
devzeroTeamFile = "DEVZERO_TEAM_ID"
devzeroUserFile = "DEVZERO_USER_ID"
devzeroWorkspaceFile = "DEVZERO_WORKSPACE_ID"
devzeroEmailFile = "DEVZERO_WORKSPACE_OWNER_EMAIL"
)

userId := ""
userEmail := ""
teamId := ""
workspaceId := ""

Expand All @@ -297,6 +299,14 @@ func ReadDZWorkspaceConfig() (collector.AuthConfig, error) {
}
}

emailPath := filepath.Join(devzeroConfigPath, devzeroEmailFile)
if util.FileExists(emailPath) {
data, err := os.ReadFile(emailPath)
if err == nil && len(data) > 0 {
userEmail = string(data)
}
}

workspacePath := filepath.Join(devzeroConfigPath, devzeroWorkspaceFile)
if util.FileExists(workspacePath) {
data, err := os.ReadFile(workspacePath)
Expand All @@ -309,18 +319,21 @@ func ReadDZWorkspaceConfig() (collector.AuthConfig, error) {
UserID: userId,
TeamID: teamId,
WorkspaceID: workspaceId,
UserEmail: userEmail,
}, nil
}

// ReadDZCliConfig reads the DevZero workspace configuration
func ReadDZCliConfig(path string) (collector.AuthConfig, error) {
const (
localUserFile = "user_id.txt"
localTeamFile = "team_id.txt"
localUserFile = "user_id.txt"
localTeamFile = "team_id.txt"
localEmailFile = "user_email.txt"
)

userId := ""
teamId := ""
userEmail := ""

localUserPath := filepath.Join(path, localUserFile)
if util.FileExists(localUserPath) {
Expand All @@ -338,8 +351,17 @@ func ReadDZCliConfig(path string) (collector.AuthConfig, error) {
}
}

localEmailPath := filepath.Join(path, localEmailFile)
if util.FileExists(localEmailPath) {
data, err := os.ReadFile(localEmailPath)
if err == nil && len(data) > 0 {
userEmail = string(data)
}
}

return collector.AuthConfig{
UserID: userId,
TeamID: teamId,
UserID: userId,
TeamID: teamId,
UserEmail: userEmail,
}, nil
}

0 comments on commit a5467ab

Please sign in to comment.