Skip to content

Commit

Permalink
Merge pull request #20 from devzero-inc/collect-process-children
Browse files Browse the repository at this point in the history
Collect additional processes and command metrics
  • Loading branch information
Tzvonimir authored Oct 28, 2024
2 parents 060bdd7 + 0ef726a commit ebd3744
Show file tree
Hide file tree
Showing 14 changed files with 167 additions and 40 deletions.
21 changes: 15 additions & 6 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func (c *Collector) handleSocketCollection(con net.Conn) error {

c.logger.Debug().Msgf("Received: %s", string(buf[:n]))

if len(parts) != 5 {
if len(parts) != 7 {
c.logger.Error().Msg("Invalid command format")
return fmt.Errorf("invalid command format")
}
Expand Down Expand Up @@ -315,12 +315,18 @@ func (c *Collector) handleStartCommand(parts []string) error {

c.logger.Debug().Msgf("Parsing command: %s", parts[0])

repo, err := util.GetRepoNameFromConfig(parts[2])
if err != nil {
c.logger.Error().Err(err).Msg("Failed to get repository name")
}

command := Command{
Category: ParseCommand(parts[1]),
Command: parts[1],
Directory: parts[2],
User: parts[3],
StartTime: time.Now().UnixMilli(), // TODO: there are some issues with sending time through shell because of ms support on MAC, explore more
Category: ParseCommand(parts[1]),
Command: parts[1],
Directory: parts[2],
User: parts[3],
StartTime: time.Now().UnixMilli(), // TODO: there are some issues with sending time through shell because of ms support on MAC, explore more
Repository: repo,
}

c.collectionConfig.ongoingCommands[parts[4]] = command
Expand All @@ -342,7 +348,10 @@ func (c *Collector) handleEndCommand(parts []string) error {
if command, exists := c.collectionConfig.ongoingCommands[parts[4]]; exists {
command.EndTime = time.Now().UnixMilli()
command.ExecutionTime = command.EndTime - command.StartTime
command.Result = parts[5]
command.Status = parts[6]

c.logger.Debug().Msgf("Command: %+v", command)
if err := InsertCommand(command); err != nil {
c.logger.Error().Err(err).Msg("Failed to insert command")
return err
Expand Down
10 changes: 8 additions & 2 deletions collector/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type Command struct {
ExecutionTime int64 `json:"execution_time" db:"execution_time"`
StartTime int64 `json:"start_time" db:"start_time"`
EndTime int64 `json:"end_time" db:"end_time"`
Status string `json:"status" db:"status"`
Result string `json:"result" db:"result"`
Repository string `json:"repository" db:"repository"`
}

// GetCommandById fetches a command by its ID
Expand Down Expand Up @@ -87,8 +90,8 @@ func DeleteCommandsByDays(days int) error {

// InsertCommand inserts a command into the database
func InsertCommand(command Command) error {
query := `INSERT INTO commands (category, command, user, directory, execution_time, start_time, end_time)
VALUES (:category, :command, :user, :directory, :execution_time, :start_time, :end_time)`
query := `INSERT INTO commands (category, command, user, directory, execution_time, start_time, end_time, status, result, repository)
VALUES (:category, :command, :user, :directory, :execution_time, :start_time, :end_time, :status, :result, :repository)`

_, err := database.DB.NamedExec(query, command)

Expand Down Expand Up @@ -132,5 +135,8 @@ func MapCommandToProto(command Command) *gen.Command {
ExecutionTime: command.ExecutionTime,
StartTime: command.StartTime,
EndTime: command.EndTime,
Status: command.Status,
Result: command.Result,
Repository: command.Repository,
}
}
6 changes: 5 additions & 1 deletion database/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func createProcessesTable() {
CREATE TABLE IF NOT EXISTS processes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pid INTEGER NOT NULL,
ppid INTEGER,
name TEXT NOT NULL,
status TEXT,
created_time INTEGER,
Expand Down Expand Up @@ -69,7 +70,10 @@ func createCommandsTable() {
directory TEXT,
execution_time INTEGER,
start_time INTEGER,
end_time INTEGER
end_time INTEGER,
status TEXT,
result TEXT,
repository TEXT
);`

_, err := DB.Exec(createCommandsTableSQL)
Expand Down
Binary file added descriptor.bin
Binary file not shown.
23 changes: 23 additions & 0 deletions install.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
LDA_VERSION=${LDA_VERSION:-v0.0.9}
OS=${OS:-$(uname | tr '[:upper:]' '[:lower:]')}
ARCH=${ARCH:-$(uname -m)}

# Handle architecture translation if not already set
if [[ "$ARCH" == "x86_64" ]]; then
ARCH="amd64"
elif [[ "$ARCH" == "arm64" || "$ARCH" == "aarch64" ]]; then
ARCH="arm64"
fi

# Check if wget is available, otherwise fall back to curl
if command -v wget &> /dev/null; then
downloader="wget -O lda-$OS-$ARCH.tar.gz"
else
downloader="curl -L -o lda-$OS-$ARCH.tar.gz"
fi

# Download, unzip, and move binary in one go
$downloader https://github.com/devzero-inc/local-developer-analytics/releases/download/$LDA_VERSION/lda-$OS-$ARCH.tar.gz && \
tar -xvf lda-$OS-$ARCH.tar.gz && \
sudo mv lda /usr/local/bin/lda && \
rm lda-$OS-$ARCH.tar.gz
6 changes: 4 additions & 2 deletions process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func (f *Factory) Create(pType string) (SystemProcess, error) {
type Process struct {
Id int64 `json:"id" db:"id"`
PID int64 `json:"pid" db:"pid"`
PPID int64 `json:"ppid" db:"ppid"`
Name string `json:"name" db:"name"`
// R: Running; S: Sleep; T: Stop; I: Idle; Z: Zombie; W: Wait; L: Lock;
Status string `json:"status" db:"status"`
Expand Down Expand Up @@ -136,8 +137,8 @@ func DeleteProcessesByDays(days int) error {

// InsertProcesses inserts multiple processes into the database in bulk
func InsertProcesses(processes []Process) error {
query := `INSERT INTO processes (pid, name, status, created_time, stored_time, os, platform, platform_family, cpu_usage, memory_usage)
VALUES (:pid, :name, :status, :created_time, :stored_time, :os, :platform, :platform_family, :cpu_usage, :memory_usage)`
query := `INSERT INTO processes (pid, name, status, created_time, stored_time, os, platform, platform_family, cpu_usage, memory_usage, ppid)
VALUES (:pid, :name, :status, :created_time, :stored_time, :os, :platform, :platform_family, :cpu_usage, :memory_usage, :ppid)`

// Begin a transaction
tx, err := database.DB.Beginx()
Expand Down Expand Up @@ -170,6 +171,7 @@ func MapProcessToProto(process Process) *gen.Process {
return &gen.Process{
Id: process.Id,
Pid: process.PID,
Ppid: process.PPID,
Name: process.Name,
Status: process.Status,
CreatedTime: process.CreatedTime,
Expand Down
31 changes: 19 additions & 12 deletions process/ps.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func NewPs(logger zerolog.Logger) *Ps {
func (p *Ps) Collect() ([]Process, error) {
p.logger.Debug().Msg("Collecting process")

cmd := exec.Command("ps", "axo", "pid,pcpu,pmem,lstart,comm")
// Adjust the command to include PPID
cmd := exec.Command("ps", "axo", "pid,ppid,pcpu,pmem,lstart,comm")

var out bytes.Buffer
cmd.Stdout = &out
Expand All @@ -39,41 +40,47 @@ func (p *Ps) Collect() ([]Process, error) {
}

scanner := bufio.NewScanner(&out)
scanner.Scan() // Skipping the header line
scanner.Scan() // Skip the header line

var processInfo []Process

for scanner.Scan() {
line := scanner.Text()
fields := strings.Fields(line)

// Parsing the first three fields as PID, CPU and MEM
// Parse PID, PPID, CPU and MEM usage
pid, _ := strconv.ParseInt(fields[0], 10, 64)
cpuUsage, _ := strconv.ParseFloat(fields[1], 64)
memUsage, _ := strconv.ParseFloat(fields[2], 64)
ppid, _ := strconv.ParseInt(fields[1], 10, 64)
cpuUsage, _ := strconv.ParseFloat(fields[2], 64)
memUsage, _ := strconv.ParseFloat(fields[3], 64)

// Reconstruct lstart from fields 3 to 7
lstart := strings.Join(fields[3:8], " ")
// Parse the start time
lstart := strings.Join(fields[4:9], " ")
const lstartLayout = "Mon Jan 2 15:04:05 2006"
startTime, err := time.Parse(lstartLayout, lstart)
if err != nil {
p.logger.Err(err).Msg("Error parsing start time")
continue
}

// The command name is the rest, starting from field 8
// This assumes that the command name is the last field and can contain spaces
name := strings.Join(fields[8:], " ")
// Command name might contain spaces, so we join remaining fields
name := strings.Join(fields[9:], " ")

processInfo = append(processInfo, Process{
// Create the Process instance
process := Process{
PID: pid,
PPID: ppid,
Name: path.Base(name),
CPUUsage: cpuUsage,
MemoryUsage: memUsage,
CreatedTime: startTime.UnixMilli(),
StoredTime: time.Now().UnixMilli(),
OS: runtime.GOOS,
Platform: runtime.GOOS,
})
}

// Append to the list of processes
processInfo = append(processInfo, process)
}

if err := scanner.Err(); err != nil {
Expand Down
6 changes: 6 additions & 0 deletions process/psutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,14 @@ func (p *Psutil) Collect() ([]Process, error) {
continue
}

ppid, err := proc.Ppid()
if err != nil {
p.logger.Err(err).Msg("Error retrieving parent PID")
}

processInfo = append(processInfo, Process{
PID: int64(proc.Pid),
PPID: int64(ppid),
Name: name,
Status: status,
CreatedTime: createTime,
Expand Down
4 changes: 4 additions & 0 deletions proto/api/v1/collector.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ message Command {
int64 execution_time = 6; // Execution time of the command in milliseconds.
int64 start_time = 7; // Start time of the command execution (Unix timestamp).
int64 end_time = 8; // End time of the command execution (Unix timestamp).
string result = 9; // Result of executed command => success/failure
string status = 10; // Status of executed command
string repository = 11; // Repository is repository where commands are executed
}

// Define a message representing a process, including its metadata and resource usage.
Expand All @@ -41,6 +44,7 @@ message Process {
string platform_family = 9; // More detailed platform family information.
double cpu_usage = 10; // CPU usage percentage by the process.
double memory_usage = 11; // Memory usage by the process in megabytes.
int64 ppid = 12; // Parent process ID.
}

// Requests to send collections of commands and processes.
Expand Down
15 changes: 10 additions & 5 deletions shell/scripts/bash.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ generate_uuid() {
}

preexec_invoke_exec() {
# Mimic preexec functionality using DEBUG trap
# Avoid running preexec_invoke_exec for PROMPT_COMMAND
if [[ "$BASH_COMMAND" != "$PROMPT_COMMAND" ]]; then
export UUID=$(generate_uuid)
Expand All @@ -14,12 +13,18 @@ preexec_invoke_exec() {
}
trap 'preexec_invoke_exec' DEBUG

# Mimic precmd functionality using PROMPT_COMMAND
precmd_invoke_cmd() {
# Send an end execution message
{{.CommandScriptPath}} "end" "$LAST_COMMAND" "$PWD" "$USER" "$UUID"
local exit_status=$?
local result="success"

if [[ $exit_status -ne 0 ]]; then
result="failure"
fi

# Send an end execution message with the result and exit status
{{.CommandScriptPath}} "end" "$LAST_COMMAND" "$PWD" "$USER" "$UUID" "$result" "$exit_status"
}

# Update PROMPT_COMMAND to invoke precmd_invoke_cmd
# Append precmd_invoke_cmd to existing PROMPT_COMMAND to preserve other functionalities
# Append precmd_invoke_cmd to PROMPT_COMMAND to run after each command
PROMPT_COMMAND="${PROMPT_COMMAND:+$PROMPT_COMMAND; }precmd_invoke_cmd"
10 changes: 6 additions & 4 deletions shell/scripts/collector.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
# $3 - Working directory
# $4 - User who executed the command
# $5 - Unique identifier
# $6 - Command result (success/failure)
# $7 - Exit status

# UNIX socket path
SOCKET_PATH="{{.SocketPath}}"
Expand Down Expand Up @@ -43,8 +45,8 @@ send_via_perl() {
close(\$sock);" 2>/dev/null
}

# Construct the log message
LOG_MESSAGE="$1|$2|$3|$4|$5"
# Construct the log message including result and exit status
LOG_MESSAGE="$1|$2|$3|$4|$5|$6|$7"

# Send the log message to the Go application via UNIX socket
if command_exists nc && nc_supports_U; then
Expand All @@ -56,7 +58,7 @@ elif command_exists python; then
elif command_exists perl; then
send_via_perl
else
# TODO: Implement direct command sending
echo "Neither nc, socat, python or perl are available on this system."
# Output error if no suitable command is available
echo "Neither nc, socat, python or perl are available on this system." >&2
exit 1
fi
19 changes: 13 additions & 6 deletions shell/scripts/fish.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,20 @@ function generate_uuid
end

function fish_preexec --on-event fish_preexec
set -gx LAST_COMMAND $argv[1]
set -gx UUID (generate_uuid)
# Send a start execution message
{{.CommandScriptPath}} "start" "$LAST_COMMAND" "$PWD" "$USER" "$UUID"
set -gx LAST_COMMAND $argv[1]
set -gx UUID (generate_uuid)
# Send a start execution message
{{.CommandScriptPath}} "start" "$LAST_COMMAND" "$PWD" "$USER" "$UUID"
end

function fish_postexec --on-event fish_postexec
# Send an end execution message
{.CommandScriptPath}} "end" "$LAST_COMMAND" "$PWD" "$USER" "$UUID"
set -l exit_status $status
set -l result "success"

if test $exit_status -ne 0
set result "failure"
end

# Send an end execution message with result and exit status
{{.CommandScriptPath}} "end" "$LAST_COMMAND" "$PWD" "$USER" "$UUID" $result $exit_status
end
11 changes: 9 additions & 2 deletions shell/scripts/zsh.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ preexec() {
}

precmd() {
# Send an end execution message
{{.CommandScriptPath}} "end" "$LAST_COMMAND" "$PWD" "$USER" "$UUID"
local exit_status=$?
local result="success"

if [[ $exit_status -ne 0 ]]; then
result="failure"
fi

# Send an end execution message with result and exit status
{{.CommandScriptPath}} "end" "$LAST_COMMAND" "$PWD" "$USER" "$UUID" "$result" "$exit_status"
}
Loading

0 comments on commit ebd3744

Please sign in to comment.