From 205ba9228037573642930d6dc4fffeb7191fdc75 Mon Sep 17 00:00:00 2001 From: Teerat Pitakrat Date: Tue, 21 Mar 2017 00:34:20 +0100 Subject: [PATCH] Add component type cpu and memory --- adm/adm.go | 4 +- adm/adm_test.go | 16 ++++---- adm/fileio_test.go | 8 ++-- adm/netio_test.go | 8 ++-- cfp/arima-r_test.go | 4 +- cfp/cfp.go | 31 ++++++++++++--- config.toml | 12 +++++- fpm/bayesnet-r.go | 7 ++++ fpm/bayesnet-r_test.go | 62 ++++++++++++++--------------- main.go | 3 +- mondat/influx-kieker-reader.go | 48 ++++++++++++++++------ mondat/influx-kieker-reader_test.go | 2 + resultio/influxresultwriter.go | 2 + resultio/influxresultwriter_test.go | 8 ++-- 14 files changed, 139 insertions(+), 76 deletions(-) diff --git a/adm/adm.go b/adm/adm.go index ce8c71f..172e30b 100644 --- a/adm/adm.go +++ b/adm/adm.go @@ -9,6 +9,7 @@ type ADM map[string]DependencyInfo type Component struct { Name string `json:"name"` Hostname string `json:"hostname"` + Type string `json:"type"` } func New() ADM { @@ -16,7 +17,8 @@ func New() ADM { } func (c *Component) UniqName() string { - name := c.Hostname + "_" + c.Name + name := c.Type + "_" + c.Hostname + "_" + c.Name + // TODO: use strings.Replacer name = strings.Replace(name, ".", "_", -1) name = strings.Replace(name, ",", "_", -1) name = strings.Replace(name, ";", "_", -1) diff --git a/adm/adm_test.go b/adm/adm_test.go index b249569..05f2c40 100644 --- a/adm/adm_test.go +++ b/adm/adm_test.go @@ -7,10 +7,10 @@ import ( func TestADMSmall(t *testing.T) { m := New() - compA := Component{"method1()", "host-1"} - compB := Component{"method2(param)", "host-2"} - compC := Component{"method3()", "host-3"} - compD := Component{"method4(param1, param2)", "host-4"} + compA := Component{"method1()", "host-1", "responsetime"} + compB := Component{"method2(param)", "host-2", "responsetime"} + compC := Component{"method3()", "host-3", "responsetime"} + compD := Component{"method4(param1, param2)", "host-4", "responsetime"} depA := DependencyInfo{compA, make([]Dependency, 2, 2)} depA.Component = compA @@ -39,10 +39,10 @@ func TestADMSmall(t *testing.T) { if len(v.Dependencies) != expected { t.Error("Expected: ", expected, " but got ", len(v.Dependencies)) } - if v.Dependencies[0].Component.UniqName() != "host_2_method2_param_" || v.Dependencies[0].Weight != 0.5 { + if v.Dependencies[0].Component.UniqName() != "responsetime_host_2_method2_param_" || v.Dependencies[0].Weight != 0.5 { t.Error("Wrong value") } - if v.Dependencies[1].Component.UniqName() != "host_3_method3__" || v.Dependencies[1].Weight != 0.5 { + if v.Dependencies[1].Component.UniqName() != "responsetime_host_3_method3__" || v.Dependencies[1].Weight != 0.5 { t.Error("Wrong value") } case compB.UniqName(): @@ -50,7 +50,7 @@ func TestADMSmall(t *testing.T) { if len(v.Dependencies) != expected { t.Error("Expected: ", expected, " but got ", len(v.Dependencies)) } - if v.Dependencies[0].Component.UniqName() != "host_4_method4_param1__param2_" || v.Dependencies[0].Weight != 1 { + if v.Dependencies[0].Component.UniqName() != "responsetime_host_4_method4_param1__param2_" || v.Dependencies[0].Weight != 1 { t.Error("Wrong value") } case compC.UniqName(): @@ -58,7 +58,7 @@ func TestADMSmall(t *testing.T) { if len(v.Dependencies) != expected { t.Error("Expected: ", expected, " but got ", len(v.Dependencies)) } - if v.Dependencies[0].Component.UniqName() != "host_4_method4_param1__param2_" || v.Dependencies[0].Weight != 1 { + if v.Dependencies[0].Component.UniqName() != "responsetime_host_4_method4_param1__param2_" || v.Dependencies[0].Weight != 1 { t.Error("Wrong value") } case compD.UniqName(): diff --git a/adm/fileio_test.go b/adm/fileio_test.go index 8f6d37c..f720912 100644 --- a/adm/fileio_test.go +++ b/adm/fileio_test.go @@ -10,10 +10,10 @@ import ( func TestReadWriteFile(t *testing.T) { // Export m := New() - compA := Component{"method1()", "host-1"} - compB := Component{"method2(param)", "host-2"} - compC := Component{"method3()", "host-3"} - compD := Component{"method4(param1, param2)", "host-4"} + compA := Component{"method1()", "host-1", "responsetime"} + compB := Component{"method2(param)", "host-2", "responsetime"} + compC := Component{"method3()", "host-3", "responsetime"} + compD := Component{"method4(param1, param2)", "host-4", "responsetime"} depA := DependencyInfo{compA, make([]Dependency, 2, 2)} depA.Component = compA diff --git a/adm/netio_test.go b/adm/netio_test.go index 0776dc6..40c7fcd 100644 --- a/adm/netio_test.go +++ b/adm/netio_test.go @@ -94,10 +94,10 @@ func TestNetReader(t *testing.T) { m := New() - compA := Component{"method1()", "host-1"} - compB := Component{"method2(param)", "host-2"} - compC := Component{"method3()", "host-3"} - compD := Component{"method4(param1, param2)", "host-4"} + compA := Component{"method1()", "host-1", "responsetime"} + compB := Component{"method2(param)", "host-2", "responsetime"} + compC := Component{"method3()", "host-3", "responsetime"} + compD := Component{"method4(param1, param2)", "host-4", "responsetime"} depA := DependencyInfo{compA, make([]Dependency, 2, 2)} depA.Component = compA diff --git a/cfp/arima-r_test.go b/cfp/arima-r_test.go index 5bad99d..7ffe2d9 100644 --- a/cfp/arima-r_test.go +++ b/cfp/arima-r_test.go @@ -13,7 +13,7 @@ import ( var testdat = []float64{60, 43, 67, 50, 56, 42, 50, 65, 68, 43, 65, 34, 47, 34, 49, 41, 13, 35, 53, 56} func TestInsert(t *testing.T) { - c := adm.Component{"A", "host1"} + c := adm.Component{"A", "host1", "responsetime"} a, err := NewArimaR(c, time.Minute, 5*time.Minute, 20*time.Minute, 70) if err != nil { t.Error("Error getting new ArimaR", err) @@ -34,7 +34,7 @@ func TestInsert(t *testing.T) { } func TestPredict(t *testing.T) { - c := adm.Component{"A", "host1"} + c := adm.Component{"A", "host1", "responsetime"} a, err := NewArimaR(c, time.Minute, 5*time.Minute, 20*time.Minute, 70) if err != nil { t.Error("Error getting new ArimaR", err) diff --git a/cfp/cfp.go b/cfp/cfp.go index 0c06632..ce9d7f4 100644 --- a/cfp/cfp.go +++ b/cfp/cfp.go @@ -74,13 +74,32 @@ func (c *CfpController) start() { // TODO: choose predictor based on component type interval := viper.GetDuration("prediction.interval") leadtime := viper.GetDuration("prediction.leadtime") - history := viper.GetDuration("cfp.responsetime.history") - threshold := float64(viper.GetDuration("cfp.responsetime.threshold") / viper.GetDuration("cfp.responsetime.unit")) - cfp, err = NewArimaR(comp, interval, leadtime, history, threshold) - if err != nil { - log.Print(err) + switch comp.Type { + case "responsetime": + history := viper.GetDuration("cfp.responsetime.history") + threshold := float64(viper.GetDuration("cfp.responsetime.threshold") / viper.GetDuration("cfp.responsetime.unit")) + cfp, err = NewArimaR(comp, interval, leadtime, history, threshold) + if err != nil { + log.Print(err) + } + c.cfps[comp.UniqName()] = cfp + case "cpu": + history := viper.GetDuration("cfp.cpu.history") + threshold := viper.GetFloat64("cfp.cpu.threshold") + cfp, err = NewArimaR(comp, interval, leadtime, history, threshold) + if err != nil { + log.Print(err) + } + c.cfps[comp.UniqName()] = cfp + case "memory": + history := viper.GetDuration("cfp.memory.history") + threshold := viper.GetFloat64("cfp.memory.threshold") + cfp, err = NewArimaR(comp, interval, leadtime, history, threshold) + if err != nil { + log.Print(err) + } + c.cfps[comp.UniqName()] = cfp } - c.cfps[comp.UniqName()] = cfp } cfp.Insert(tsPoint) res, err := cfp.Predict() diff --git a/config.toml b/config.toml index 292a452..fe4c563 100644 --- a/config.toml +++ b/config.toml @@ -15,7 +15,7 @@ title = "Hora configuration" [cfp] [cfp.responsetime] unit = "1ns" - threshold = "500ms" + threshold = "100ms" predictor = "arima" history = "20m" aggregation = "percentile" @@ -23,8 +23,16 @@ title = "Hora configuration" [cfp.errorrate] [cfp.cpu] threshold = 1000 + predictor = "arima" + history = "20m" + aggregation = "percentile" + aggregationvalue = 95 [cfp.memory] - threshold = 100 + threshold = 2000000000 + predictor = "arima" + history = "20m" + aggregation = "percentile" + aggregationvalue = 95 [fpm] updateinterval = "10ms" [rserve] diff --git a/fpm/bayesnet-r.go b/fpm/bayesnet-r.go index d63ce4f..1d58a13 100644 --- a/fpm/bayesnet-r.go +++ b/fpm/bayesnet-r.go @@ -51,6 +51,10 @@ func NewBayesNetR(m adm.ADM) (BayesNetR, <-chan Result, error) { } func (f *BayesNetR) createBayesNet() error { + // See documentation of bnlearn package in R for more details + // Example: https://rstudio-pubs-static.s3.amazonaws.com/124744_09170b0a7e414cb8bf492daa6773f2fe.html + // and http://sujitpal.blogspot.de/2013/07/bayesian-network-inference-with-r-and.html + // Create structure cmd := "net <- model2network(\"" for _, v := range f.admodel { @@ -105,6 +109,9 @@ func (f *BayesNetR) createBayesNet() error { for i, mask := 0, 1; i < nDeps; i, mask = i+1, mask<<1 { if pState&mask > 0 { failProb += v.Dependencies[nDeps-i-1].Weight + if failProb > 1.0 { + failProb = 1.0 + } } } cmd += ", " + strconv.FormatFloat(1-failProb, 'f', 6, 64) diff --git a/fpm/bayesnet-r_test.go b/fpm/bayesnet-r_test.go index d16ce63..055a854 100644 --- a/fpm/bayesnet-r_test.go +++ b/fpm/bayesnet-r_test.go @@ -12,10 +12,10 @@ import ( func TestCreate(t *testing.T) { m := make(adm.ADM) - compA := adm.Component{"A", "host1"} - compB := adm.Component{"B", "host2"} - compC := adm.Component{"C", "host3"} - compD := adm.Component{"D", "host4"} + compA := adm.Component{"A", "host1", "responsetime"} + compB := adm.Component{"B", "host2", "responsetime"} + compC := adm.Component{"C", "host3", "responsetime"} + compD := adm.Component{"D", "host4", "responsetime"} depA := adm.DependencyInfo{compA, make([]adm.Dependency, 2, 2)} depA.Component = compA @@ -46,104 +46,104 @@ func TestCreate(t *testing.T) { t.Error("Error creating BayesNetR", err) } - cfpResult := cfp.Result{adm.Component{"D", "host4"}, time.Unix(0, 0), time.Unix(0, 300), 0.0} + cfpResult := cfp.Result{adm.Component{"D", "host4", "responsetime"}, time.Unix(0, 0), time.Unix(0, 300), 0.0} f.UpdateCfpResult(cfpResult) fpmResult := <-fpmResultCh if err != nil { t.Error("Error making prediction", err) } // TODO: more precision checks - fprobA := fpmResult.FailProbs[adm.Component{"A", "host1"}] + fprobA := fpmResult.FailProbs[adm.Component{"A", "host1", "responsetime"}] if fprobA != 0 { t.Error("Expected: 0 but got", fprobA) } - fprobB := fpmResult.FailProbs[adm.Component{"B", "host2"}] + fprobB := fpmResult.FailProbs[adm.Component{"B", "host2", "responsetime"}] if fprobB != 0 { t.Error("Expected: 0 but got", fprobB) } - fprobC := fpmResult.FailProbs[adm.Component{"C", "host3"}] + fprobC := fpmResult.FailProbs[adm.Component{"C", "host3", "responsetime"}] if fprobC != 0 { t.Error("Expected: 0 but got", fprobC) } - fprobD := fpmResult.FailProbs[adm.Component{"D", "host4"}] + fprobD := fpmResult.FailProbs[adm.Component{"D", "host4", "responsetime"}] if fprobD != 0 { t.Error("Expected: 0 but got", fprobD) } - cfpResult = cfp.Result{adm.Component{"D", "host4"}, time.Unix(0, 0), time.Unix(0, 300), 0.1} + cfpResult = cfp.Result{adm.Component{"D", "host4", "responsetime"}, time.Unix(0, 0), time.Unix(0, 300), 0.1} f.UpdateCfpResult(cfpResult) fpmResult = <-fpmResultCh if err != nil { t.Error("Error making prediction", err) } - fprobA = fpmResult.FailProbs[adm.Component{"A", "host1"}] + fprobA = fpmResult.FailProbs[adm.Component{"A", "host1", "responsetime"}] if fprobA > 0.12 { t.Error("Expected: 0 but got", fprobA) } - fprobB = fpmResult.FailProbs[adm.Component{"B", "host2"}] + fprobB = fpmResult.FailProbs[adm.Component{"B", "host2", "responsetime"}] if fprobB > 0.12 { t.Error("Expected: 0 but got", fprobB) } - fprobC = fpmResult.FailProbs[adm.Component{"C", "host3"}] + fprobC = fpmResult.FailProbs[adm.Component{"C", "host3", "responsetime"}] if fprobC > 0.12 { t.Error("Expected: 0 but got", fprobC) } - fprobD = fpmResult.FailProbs[adm.Component{"D", "host4"}] + fprobD = fpmResult.FailProbs[adm.Component{"D", "host4", "responsetime"}] if fprobD > 0.12 { t.Error("Expected: 0 but got", fprobD) } - cfpResult = cfp.Result{adm.Component{"D", "host4"}, time.Unix(0, 0), time.Unix(0, 300), 0.9} + cfpResult = cfp.Result{adm.Component{"D", "host4", "responsetime"}, time.Unix(0, 0), time.Unix(0, 300), 0.9} f.UpdateCfpResult(cfpResult) fpmResult = <-fpmResultCh if err != nil { t.Error("Error making prediction", err) } - fprobA = fpmResult.FailProbs[adm.Component{"A", "host1"}] + fprobA = fpmResult.FailProbs[adm.Component{"A", "host1", "responsetime"}] if fprobA < 0.89 { t.Error("Expected: 0 but got", fprobA) } - fprobB = fpmResult.FailProbs[adm.Component{"B", "host2"}] + fprobB = fpmResult.FailProbs[adm.Component{"B", "host2", "responsetime"}] if fprobB < 0.89 { t.Error("Expected: 0 but got", fprobB) } - fprobC = fpmResult.FailProbs[adm.Component{"C", "host3"}] + fprobC = fpmResult.FailProbs[adm.Component{"C", "host3", "responsetime"}] if fprobC < 0.89 { t.Error("Expected: 0 but got", fprobC) } - fprobD = fpmResult.FailProbs[adm.Component{"D", "host4"}] + fprobD = fpmResult.FailProbs[adm.Component{"D", "host4", "responsetime"}] if fprobD < 0.89 { t.Error("Expected: 0 but got", fprobD) } - cfpResultD := cfp.Result{adm.Component{"D", "host4"}, time.Unix(0, 0), time.Unix(0, 300), 0.0} + cfpResultD := cfp.Result{adm.Component{"D", "host4", "responsetime"}, time.Unix(0, 0), time.Unix(0, 300), 0.0} f.UpdateCfpResult(cfpResultD) fpmResult = <-fpmResultCh - cfpResultB := cfp.Result{adm.Component{"B", "host2"}, time.Unix(0, 0), time.Unix(0, 300), 0.1} + cfpResultB := cfp.Result{adm.Component{"B", "host2", "responsetime"}, time.Unix(0, 0), time.Unix(0, 300), 0.1} f.UpdateCfpResult(cfpResultB) fpmResult = <-fpmResultCh if err != nil { t.Error("Error making prediction", err) } - fprobA = fpmResult.FailProbs[adm.Component{"A", "host1"}] + fprobA = fpmResult.FailProbs[adm.Component{"A", "host1", "responsetime"}] if fprobA > 0.12 { t.Error("Expected: 0 but got", fprobA) } - fprobB = fpmResult.FailProbs[adm.Component{"B", "host2"}] + fprobB = fpmResult.FailProbs[adm.Component{"B", "host2", "responsetime"}] if fprobB > 0.12 { t.Error("Expected: 0 but got", fprobB) } - fprobC = fpmResult.FailProbs[adm.Component{"C", "host3"}] + fprobC = fpmResult.FailProbs[adm.Component{"C", "host3", "responsetime"}] if fprobC != 0 { t.Error("Expected: 0 but got", fprobC) } - fprobD = fpmResult.FailProbs[adm.Component{"D", "host4"}] + fprobD = fpmResult.FailProbs[adm.Component{"D", "host4", "responsetime"}] if fprobD != 0 { t.Error("Expected: 0 but got", fprobD) } - cfpResultB = cfp.Result{adm.Component{"B", "host2"}, time.Unix(0, 0), time.Unix(0, 300), 0.0} - cfpResultA := cfp.Result{adm.Component{"A", "host1"}, time.Unix(0, 0), time.Unix(0, 300), 0.1} + cfpResultB = cfp.Result{adm.Component{"B", "host2", "responsetime"}, time.Unix(0, 0), time.Unix(0, 300), 0.0} + cfpResultA := cfp.Result{adm.Component{"A", "host1", "responsetime"}, time.Unix(0, 0), time.Unix(0, 300), 0.1} f.UpdateCfpResult(cfpResultB) fpmResult = <-fpmResultCh f.UpdateCfpResult(cfpResultA) @@ -151,19 +151,19 @@ func TestCreate(t *testing.T) { if err != nil { t.Error("Error making prediction", err) } - fprobA = fpmResult.FailProbs[adm.Component{"A", "host1"}] + fprobA = fpmResult.FailProbs[adm.Component{"A", "host1", "responsetime"}] if fprobA > 0.12 { t.Error("Expected: 0 but got", fprobA) } - fprobB = fpmResult.FailProbs[adm.Component{"B", "host2"}] + fprobB = fpmResult.FailProbs[adm.Component{"B", "host2", "responsetime"}] if fprobB > 0.1 { t.Error("Expected: 0 but got", fprobB) } - fprobC = fpmResult.FailProbs[adm.Component{"C", "host3"}] + fprobC = fpmResult.FailProbs[adm.Component{"C", "host3", "responsetime"}] if fprobC != 0 { t.Error("Expected: 0 but got", fprobC) } - fprobD = fpmResult.FailProbs[adm.Component{"D", "host4"}] + fprobD = fpmResult.FailProbs[adm.Component{"D", "host4", "responsetime"}] if fprobD != 0 { t.Error("Expected: 0 but got", fprobD) } diff --git a/main.go b/main.go index 3fd03e6..3d117b6 100644 --- a/main.go +++ b/main.go @@ -83,7 +83,8 @@ func main() { Addr: viper.GetString("influxdb.addr"), Username: viper.GetString("influxdb.username"), Password: viper.GetString("influxdb.password"), - Db: viper.GetString("influxdb.db.kieker"), + KiekerDb: viper.GetString("influxdb.db.kieker"), + K8sDb: viper.GetString("influxdb.db.k8s"), Batch: viper.GetBool("influxdb.batch"), Interval: viper.GetDuration("prediction.interval"), } diff --git a/mondat/influx-kieker-reader.go b/mondat/influx-kieker-reader.go index 3a09f6c..3a5fdca 100644 --- a/mondat/influx-kieker-reader.go +++ b/mondat/influx-kieker-reader.go @@ -18,7 +18,8 @@ type InfluxKiekerReader struct { Addr string Username string Password string - Db string + KiekerDb string + K8sDb string Batch bool Starttime time.Time Endtime time.Time @@ -79,7 +80,7 @@ func (r *InfluxKiekerReader) readBatch(clnt client.Client, ch chan TSPoint) { cmd := "select " + aggregation + "(\"responseTime\"," + aggregationvalue + ") from OperationExecution where \"hostname\" = '" + d.Component.Hostname + "' and \"operationSignature\" = '" + d.Component.Name + "' and time > " + strconv.FormatInt(curtimestamp.UnixNano(), 10) + " and time <= " + strconv.FormatInt(lasttimestamp.UnixNano(), 10) + " group by time(1m)" q := client.Query{ Command: cmd, - Database: r.Db, + Database: r.KiekerDb, } response, err := clnt.Query(q) if err != nil { @@ -144,15 +145,36 @@ func (r *InfluxKiekerReader) readRealtime(clnt client.Client, ch chan TSPoint) { for { log.Print("Reading monitoring data at ", curtime) for _, d := range r.Archdepmod { - // TODO: query for different types of components - // TODO: change group by time according to r.Interval - aggregation := viper.GetString("cfp.responsetime.aggregation") - aggregationvalue := viper.GetString("cfp.responsetime.aggregationvalue") - //cmd := "select " + aggregation + "(\"responseTime\"," + aggregationvalue + ") from operationExecution where \"hostname\" = '" + d.Component.Hostname + "' and \"operationSignature\" = '" + d.Component.Name + "' and time >= " + strconv.FormatInt(curtime.Add(-1*r.Interval).UnixNano(), 10) + " and time < " + strconv.FormatInt(curtime.UnixNano(), 10) + " group by time(" + r.Interval.String() + ")" - cmd := "select " + aggregation + "(\"responseTime\"," + aggregationvalue + ") from OperationExecution where \"hostname\" = '" + d.Component.Hostname + "' and \"operationSignature\" = '" + d.Component.Name + "' and time >= " + strconv.FormatInt(curtime.Add(-1*r.Interval).UnixNano(), 10) + " and time < " + strconv.FormatInt(curtime.UnixNano(), 10) + " group by time(1m)" - q := client.Query{ - Command: cmd, - Database: r.Db, + var q client.Query + var cmd string + // Query for different types of components + switch d.Component.Type { + case "responsetime": + aggregation := viper.GetString("cfp.responsetime.aggregation") + aggregationvalue := viper.GetString("cfp.responsetime.aggregationvalue") + // TODO: change group by time according to r.Interval + //cmd := "select " + aggregation + "(\"responseTime\"," + aggregationvalue + ") from operationExecution where \"hostname\" = '" + d.Component.Hostname + "' and \"operationSignature\" = '" + d.Component.Name + "' and time >= " + strconv.FormatInt(curtime.Add(-1*r.Interval).UnixNano(), 10) + " and time < " + strconv.FormatInt(curtime.UnixNano(), 10) + " group by time(" + r.Interval.String() + ")" + cmd := "select " + aggregation + "(\"responseTime\"," + aggregationvalue + ") from OperationExecution where \"hostname\" = '" + d.Component.Hostname + "' and \"operationSignature\" = '" + d.Component.Name + "' and time >= " + strconv.FormatInt(curtime.Add(-1*r.Interval).UnixNano(), 10) + " and time < " + strconv.FormatInt(curtime.UnixNano(), 10) + " group by time(1m)" + q = client.Query{ + Command: cmd, + Database: r.KiekerDb, + } + case "cpu": + aggregation := viper.GetString("cfp.cpu.aggregation") + aggregationvalue := viper.GetString("cfp.cpu.aggregationvalue") + cmd := "select " + aggregation + "(\"value\"," + aggregationvalue + ") from \"cpu/usage_rate\" where \"pod_name\" = '" + d.Component.Hostname + "' and time >= " + strconv.FormatInt(curtime.Add(-1*r.Interval).UnixNano(), 10) + " and time < " + strconv.FormatInt(curtime.UnixNano(), 10) + " group by time(1m)" + q = client.Query{ + Command: cmd, + Database: r.K8sDb, + } + case "memory": + aggregation := viper.GetString("cfp.memory.aggregation") + aggregationvalue := viper.GetString("cfp.memory.aggregationvalue") + cmd := "select " + aggregation + "(\"value\"," + aggregationvalue + ") from \"cpu/usage_rate\" where \"pod_name\" = '" + d.Component.Hostname + "' and time >= " + strconv.FormatInt(curtime.Add(-1*r.Interval).UnixNano(), 10) + " and time < " + strconv.FormatInt(curtime.UnixNano(), 10) + " group by time(1m)" + q = client.Query{ + Command: cmd, + Database: r.K8sDb, + } } response, err := clnt.Query(q) if err != nil { @@ -192,7 +214,7 @@ func (r *InfluxKiekerReader) getFirstAndLastTimestamp(clnt client.Client, c adm. cmd := "select first(responseTime) from OperationExecution where \"hostname\" = '" + c.Hostname + "' and \"operationSignature\" = '" + c.Name + "'" q := client.Query{ Command: cmd, - Database: r.Db, + Database: r.KiekerDb, } response, err := clnt.Query(q) if err != nil { @@ -214,7 +236,7 @@ func (r *InfluxKiekerReader) getFirstAndLastTimestamp(clnt client.Client, c adm. cmd = "select last(responseTime) from OperationExecution where \"hostname\" = '" + c.Hostname + "' and \"operationSignature\" = '" + c.Name + "'" q = client.Query{ Command: cmd, - Database: r.Db, + Database: r.KiekerDb, } response, err = clnt.Query(q) if err != nil { diff --git a/mondat/influx-kieker-reader_test.go b/mondat/influx-kieker-reader_test.go index eff629a..316486a 100644 --- a/mondat/influx-kieker-reader_test.go +++ b/mondat/influx-kieker-reader_test.go @@ -29,6 +29,7 @@ func TestReadBatch(t *testing.T) { compFetch := adm.Component{ Name: "public javax.ws.rs.core.Response com.netflix.recipes.rss.jersey.resources.MiddleTierResource.fetchSubscriptions(java.lang.String)", Hostname: "middletier-rlz2x", + Type: "responsetime", } var compFetchDepInfo adm.DependencyInfo compFetchDepInfo.Component = compFetch @@ -37,6 +38,7 @@ func TestReadBatch(t *testing.T) { compGet := adm.Component{ Name: "protected java.lang.String com.netflix.recipes.rss.hystrix.GetRSSCommand.run()", Hostname: "edge-xprx0", + Type: "responsetime", } var compGetDepInfo adm.DependencyInfo compGetDepInfo.Component = compGet diff --git a/resultio/influxresultwriter.go b/resultio/influxresultwriter.go index 3796d18..ec61627 100644 --- a/resultio/influxresultwriter.go +++ b/resultio/influxresultwriter.go @@ -69,6 +69,7 @@ func (w *InfluxResultWriter) WriteCfpResult(result cfp.Result) error { tags := map[string]string{ "name": result.Component.Name, "hostname": result.Component.Hostname, + "type": result.Component.Type, } fields := map[string]interface{}{ "failureProbability": result.FailProb, @@ -105,6 +106,7 @@ func (w *InfluxResultWriter) WriteFpmResult(result fpm.Result) error { tags := map[string]string{ "name": k.Name, "hostname": k.Hostname, + "type": k.Type, } fields := map[string]interface{}{ "failureProbability": v, diff --git a/resultio/influxresultwriter_test.go b/resultio/influxresultwriter_test.go index d207c0e..a417243 100644 --- a/resultio/influxresultwriter_test.go +++ b/resultio/influxresultwriter_test.go @@ -33,8 +33,8 @@ func TestWriteCfpResult(t *testing.T) { if err != nil { t.Error(err) } - a := adm.Component{"A", "host1"} - b := adm.Component{"B", "host2"} + a := adm.Component{"A", "host1", "responsetime"} + b := adm.Component{"B", "host2", "responsetime"} resulta := cfp.Result{ a, time.Now(), @@ -73,8 +73,8 @@ func TestWriteFpmResult(t *testing.T) { t.Error(err) } - a := adm.Component{"A", "host1"} - b := adm.Component{"B", "host2"} + a := adm.Component{"A", "host1", "responsetime"} + b := adm.Component{"B", "host2", "responsetime"} failProbs := make(map[adm.Component]float64) failProbs[a] = 0.2 failProbs[b] = 0.3