Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust state logic and prune implementation #894

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion cmd/edenClean.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func newCleanCmd(configName, verbosity *string) *cobra.Command {
PersistentPreRunE: preRunViperLoadFunction(cfg, configName, verbosity),
Run: func(cmd *cobra.Command, args []string) {
if err := openEVEC.EdenClean(*configName, configDist, vmName, currentContext); err != nil {
log.Fatalf("Setup eden failed: %s", err)
log.Fatalf("Clean eden failed: %s", err)
}
},
}
Expand Down Expand Up @@ -54,3 +54,21 @@ func newCleanCmd(configName, verbosity *string) *cobra.Command {

return cleanCmd
}

func newPruneCmd(configName, verbosity *string) *cobra.Command {
cfg := &openevec.EdenSetupArgs{}

var pruneCmd = &cobra.Command{
Use: "prune",
Short: "prune stored objects from the controller. Please save them before.",
Long: `Prune stored objects from the controller. Please save them before.`,
PersistentPreRunE: preRunViperLoadFunction(cfg, configName, verbosity),
Run: func(cmd *cobra.Command, args []string) {
if err := openEVEC.EdenPrune(); err != nil {
log.Fatalf("Prune eden failed: %s", err)
}
},
}

return pruneCmd
}
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewEdenCommand() *cobra.Command {
newCleanCmd(&configName, &verbosity),
newConfigCmd(&configName, &verbosity),
newSdnCmd(&configName, &verbosity),
newPruneCmd(&configName, &verbosity),
},
},
{
Expand Down
65 changes: 65 additions & 0 deletions pkg/controller/adam/adam.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package adam

import (
"context"
"encoding/json"
"fmt"
"net/url"
Expand All @@ -10,6 +11,7 @@
"strings"
"time"

"github.com/go-redis/redis/v9"
"github.com/lf-edge/eden/pkg/controller/cachers"
"github.com/lf-edge/eden/pkg/controller/eapps"
"github.com/lf-edge/eden/pkg/controller/eflowlog"
Expand Down Expand Up @@ -405,3 +407,66 @@
}
return &globalOptions, nil
}

func (adam *Ctx) cleanRedisStream(stream string) error {
addr, password, databaseID, err := parseRedisURL(adam.AdamRedisURLEden)
if err != nil {
return err
}
client := redis.NewClient(&redis.Options{
Addr: addr,
Password: password,

Check failure on line 418 in pkg/controller/adam/adam.go

View workflow job for this annotation

GitHub Actions / yetus

detsecrets:5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8:Secret Keyword
DB: databaseID,
MaxRetries: defaults.DefaultRepeatCount,
MinRetryBackoff: defaults.DefaultRepeatTimeout / 2,
MaxRetryBackoff: defaults.DefaultRepeatTimeout * 2,
})
n, err := client.XTrimMaxLenApprox(context.TODO(), stream, 0, 0).Result()
log.Debugf("XTrimMaxLenApprox(%s): %d", stream, n)
return err
}

// CleanInfo removes all info messages of device from controller
func (adam *Ctx) CleanInfo(devUUID uuid.UUID) (err error) {
if adam.AdamRemoteRedis {
stream := adam.getInfoRedisStream(devUUID)
return adam.cleanRedisStream(stream)
}
panic("implement me")
}

// CleanMetrics removes all metric messages of device from controller
func (adam *Ctx) CleanMetrics(devUUID uuid.UUID) (err error) {
if adam.AdamRemoteRedis {
stream := adam.getMetricsRedisStream(devUUID)
return adam.cleanRedisStream(stream)
}
panic("implement me")
}

// CleanLogs removes all logs messages of device from controller
func (adam *Ctx) CleanLogs(devUUID uuid.UUID) (err error) {
if adam.AdamRemoteRedis {
stream := adam.getLogsRedisStream(devUUID)
return adam.cleanRedisStream(stream)
}
panic("implement me")
}

// CleanFlowLogs removes all flow logs messages of device from controller
func (adam *Ctx) CleanFlowLogs(devUUID uuid.UUID) (err error) {
if adam.AdamRemoteRedis {
stream := adam.getFlowLogRedisStream(devUUID)
return adam.cleanRedisStream(stream)
}
panic("implement me")
}

// CleanAppLogs removes all app logs messages of app of device from controller
func (adam *Ctx) CleanAppLogs(devUUID uuid.UUID, appUUID uuid.UUID) (err error) {
if adam.AdamRemoteRedis {
stream := adam.getAppsLogsRedisStream(devUUID, appUUID)
return adam.cleanRedisStream(stream)
}
panic("implement me")
}
7 changes: 6 additions & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
uuid "github.com/satori/go.uuid"
)

//Controller is an interface of controller
// Controller is an interface of controller
type Controller interface {
CertsGet(devUUID uuid.UUID) (out string, err error)
ConfigGet(devUUID uuid.UUID) (out string, err error)
Expand All @@ -28,6 +28,11 @@ type Controller interface {
FlowLogLastCallback(devUUID uuid.UUID, q map[string]string, handler eflowlog.HandlerFunc) (err error)
InfoChecker(devUUID uuid.UUID, q map[string]string, handler einfo.HandlerFunc, mode einfo.InfoCheckerMode, timeout time.Duration) (err error)
InfoLastCallback(devUUID uuid.UUID, q map[string]string, handler einfo.HandlerFunc) (err error)
CleanInfo(devUUID uuid.UUID) (err error)
CleanMetrics(devUUID uuid.UUID) (err error)
CleanLogs(devUUID uuid.UUID) (err error)
CleanFlowLogs(devUUID uuid.UUID) (err error)
CleanAppLogs(devUUID uuid.UUID, appUUID uuid.UUID) (err error)
MetricChecker(devUUID uuid.UUID, q map[string]string, handler emetric.HandlerFunc, mode emetric.MetricCheckerMode, timeout time.Duration) (err error)
MetricLastCallback(devUUID uuid.UUID, q map[string]string, handler emetric.HandlerFunc) (err error)
RequestLastCallback(devUUID uuid.UUID, q map[string]string, handler erequest.HandlerFunc) (err error)
Expand Down
85 changes: 57 additions & 28 deletions pkg/eve/applications.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,17 @@
ExternalIP string
InternalPort string
ExternalPort string
Metadata string
MemoryUsed uint32
MemoryAvail uint32
CPUUsage int
Macs []string
Volumes map[string]uint32

prevCPUNS uint64
prevCPUNSTime time.Time
deleted bool
infoTime time.Time
PrevCPUNS uint64
PrevCPUNSTime time.Time
Deleted bool
InfoTime time.Time
}

func appStateHeader() string {
Expand Down Expand Up @@ -117,7 +118,7 @@
}

func (ctx *State) initApplications(ctrl controller.Cloud, dev *device.Ctx) error {
ctx.applications = make(map[string]*AppInstState)
ctx.Applications = make(map[string]*AppInstState)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to export Applications?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to store the state in file (using json functions), which requires to export fields

for _, el := range dev.GetApplicationInstances() {
app, err := ctrl.GetApplicationInstanceConfig(el)
if err != nil {
Expand All @@ -144,35 +145,56 @@
Volumes: volumes,
UUID: app.Uuidandversion.Uuid,
}
ctx.applications[app.Uuidandversion.Uuid] = appStateObj
ctx.Applications[app.Uuidandversion.Uuid] = appStateObj
}
return nil
}

func (ctx *State) applyOldStateApps(state *State) {
for stateID, stateEL := range state.Applications {
found := false
for id := range ctx.Applications {
if id != stateID {
continue
}
ctx.Applications[id] = stateEL
found = true
}
if !found {
if stateEL.Deleted {
continue
}
stateEL.AdamState = notInControllerConfig
ctx.Applications[stateID] = stateEL
}
}
}

func (ctx *State) processApplicationsByMetric(msg *metrics.ZMetricMsg) {
if appMetrics := msg.GetAm(); appMetrics != nil {
for _, appMetric := range appMetrics {
for _, el := range ctx.applications {
for _, el := range ctx.Applications {
if appMetric.AppID == el.UUID {
el.MemoryAvail = appMetric.Memory.GetAvailMem()
el.MemoryUsed = appMetric.Memory.GetUsedMem()
// if not restarted
if el.prevCPUNS < appMetric.Cpu.TotalNs {
el.CPUUsage = int(float32(appMetric.Cpu.TotalNs-el.prevCPUNS) / float32(msg.GetAtTimeStamp().AsTime().Sub(el.prevCPUNSTime).Nanoseconds()) * 100.0)
if el.PrevCPUNS < appMetric.Cpu.TotalNs {
el.CPUUsage = int(float32(appMetric.Cpu.TotalNs-el.PrevCPUNS) / float32(msg.GetAtTimeStamp().AsTime().Sub(el.PrevCPUNSTime).Nanoseconds()) * 100.0)
}
el.prevCPUNS = appMetric.Cpu.TotalNs
el.prevCPUNSTime = msg.GetAtTimeStamp().AsTime()
el.PrevCPUNS = appMetric.Cpu.TotalNs
el.PrevCPUNSTime = msg.GetAtTimeStamp().AsTime()
break
}
}
}
}
}

//nolint:cyclop
func (ctx *State) processApplicationsByInfo(im *info.ZInfoMsg) {
switch im.GetZtype() {

Check failure on line 195 in pkg/eve/applications.go

View workflow job for this annotation

GitHub Actions / yetus

golangcilint: missing cases in switch of type info.ZInfoTypes: ZInfoTypes_ZiBlobList, ZInfoTypes_ZiContentTree, ZInfoTypes_ZiEdgeview, ZInfoTypes_ZiHardware, ZInfoTypes_ZiLocation, ZInfoTypes_ZiNop (exhaustive)
case info.ZInfoTypes_ZiVolume:
for _, app := range ctx.applications {
for _, app := range ctx.Applications {
if len(app.Volumes) == 0 {
continue
}
Expand All @@ -188,16 +210,23 @@
app.EVEState = fmt.Sprintf("%s (%d%%)", info.ZSwState_DOWNLOAD_STARTED.String(), int(percent)/len(app.Volumes))
}
}
case info.ZInfoTypes_ZiAppInstMetaData:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional thanks for AppInstMetaData

for _, app := range ctx.Applications {
if im.GetAmdinfo().Uuid == app.UUID {
app.Metadata = string(im.GetAmdinfo().Data)
break
}
}
case info.ZInfoTypes_ZiApp:
appStateObj, ok := ctx.applications[im.GetAinfo().AppID]
appStateObj, ok := ctx.Applications[im.GetAinfo().AppID]
if !ok {
appStateObj = &AppInstState{
Name: im.GetAinfo().AppName,
Image: "-",
AdamState: notInControllerConfig,
UUID: im.GetAinfo().AppID,
}
ctx.applications[im.GetAinfo().AppID] = appStateObj
ctx.Applications[im.GetAinfo().AppID] = appStateObj
}
appStateObj.EVEState = im.GetAinfo().State.String()
if len(im.GetAinfo().AppErr) > 0 {
Expand Down Expand Up @@ -227,20 +256,20 @@
//check appStateObj not defined in adam
if appStateObj.AdamState != inControllerConfig {
if im.GetAinfo().AppID == appStateObj.UUID {
appStateObj.deleted = false //if in recent ZInfoTypes_ZiApp, then not deleted
appStateObj.Deleted = false //if in recent ZInfoTypes_ZiApp, then not deleted
}
}
if im.GetAinfo().State == info.ZSwState_INVALID {
appStateObj.deleted = true
appStateObj.Deleted = true
}
appStateObj.infoTime = im.AtTimeStamp.AsTime()
appStateObj.InfoTime = im.AtTimeStamp.AsTime()
case info.ZInfoTypes_ZiNetworkInstance: //try to find ips from NetworkInstances
for _, el := range im.GetNiinfo().IpAssignments {
// nothing to show if no IpAddress received
if len(el.IpAddress) == 0 {
continue
}
for _, appStateObj := range ctx.applications {
for _, appStateObj := range ctx.Applications {
for ind, mac := range appStateObj.Macs {
if mac == el.MacAddress {
appStateObj.InternalIP[ind] = el.IpAddress[0]
Expand All @@ -250,18 +279,18 @@
}
case info.ZInfoTypes_ZiDevice:
for _, el := range im.GetDinfo().AppInstances {
if _, ok := ctx.applications[el.Uuid]; !ok {
if _, ok := ctx.Applications[el.Uuid]; !ok {
appStateObj := &AppInstState{
Name: el.Name,
Image: "-",
AdamState: notInControllerConfig,
EVEState: "UNKNOWN",
UUID: el.Uuid,
}
ctx.applications[el.Uuid] = appStateObj
ctx.Applications[el.Uuid] = appStateObj
}
}
for _, appStateObj := range ctx.applications {
for _, appStateObj := range ctx.Applications {
seen := false
for _, el := range im.GetDinfo().AppInstances {
if appStateObj.UUID == el.Uuid {
Expand Down Expand Up @@ -289,12 +318,12 @@
appStateObj.ExternalIP = "127.0.0.1"
}
//check appStateObj not defined in adam
if appStateObj.AdamState != inControllerConfig && appStateObj.infoTime.Before(im.AtTimeStamp.AsTime()) {
appStateObj.deleted = true
if appStateObj.AdamState != inControllerConfig && appStateObj.InfoTime.Before(im.AtTimeStamp.AsTime()) {
appStateObj.Deleted = true
for _, el := range im.GetDinfo().AppInstances {
//if in recent ZInfoTypes_ZiDevice with timestamp after ZInfoTypes_ZiApp, than not deleted
//if in recent ZInfoTypes_ZiDevice with timestamp after ZInfoTypes_ZiApp, then not deleted
if el.Uuid == appStateObj.UUID {
appStateObj.deleted = false
appStateObj.Deleted = false
}
}
}
Expand All @@ -308,8 +337,8 @@
if _, err := fmt.Fprintln(w, appStateHeader()); err != nil {
return err
}
appStatesSlice := make([]*AppInstState, 0, len(ctx.Applications()))
appStatesSlice = append(appStatesSlice, ctx.Applications()...)
appStatesSlice := make([]*AppInstState, 0, len(ctx.NotDeletedApplications()))
appStatesSlice = append(appStatesSlice, ctx.NotDeletedApplications()...)
sort.SliceStable(appStatesSlice, func(i, j int) bool {
return appStatesSlice[i].Name < appStatesSlice[j].Name
})
Expand All @@ -322,7 +351,7 @@
}

func (ctx *State) printPodListJSON() error {
result, err := json.MarshalIndent(ctx.Applications(), "", " ")
result, err := json.MarshalIndent(ctx.NotDeletedApplications(), "", " ")
if err != nil {
return err
}
Expand Down
Loading
Loading