Skip to content

Commit

Permalink
Refactor to support backfilling.
Browse files Browse the repository at this point in the history
  • Loading branch information
wi1dcard committed Jun 17, 2021
1 parent f6d9d10 commit 17cad28
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 60 deletions.
6 changes: 3 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
prometheus-statuspage-pusher
*.swp
vendor/
/prometheus-statuspage-pusher
/config.yaml
/.vscode
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# prometheus-statuspage-pusher

TBD
A fork of [Whyeasy/prometheus-statuspage-pusher](https://github.com/Whyeasy/prometheus-statuspage-pusher) with backfilling support and minor enhancements.
101 changes: 45 additions & 56 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,131 +1,120 @@
package main

import (
"context"
"bytes"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"path"
"strconv"
"strings"
"time"

log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"

"github.com/prometheus/client_golang/api"
prometheus "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
)

type queryConfig map[string]string

var (
prometheusURL = flag.String("prom", "http://localhost:9090", "URL of Prometheus server")
statusPageAPIKey = flag.String("apikey", "", "Statuspage API key")
statusPageID = flag.String("pageid", "", "Statuspage page ID")
queryConfigFile = flag.String("config", "queries.yaml", "Query config file")
metricInterval = flag.Duration("interval", 30*time.Second, "Metric push interval")
backfillDuration = flag.String("backfill", "", "Backfill the data points in, for example, 5d")

httpClient = &http.Client{
Timeout: 5 * time.Second,
Timeout: 30 * time.Second,
}

queryConfig map[string]string
)

func main() {
flag.Parse()
qConfig := queryConfig{}

qcd, err := ioutil.ReadFile(*queryConfigFile)
if err != nil {
log.Fatalf("Couldn't read config file: %s", err)
}
if err := yaml.Unmarshal(qcd, &qConfig); err != nil {
if err := yaml.Unmarshal(qcd, &queryConfig); err != nil {
log.Fatalf("Couldn't parse config file: %s", err)
}

queryPrometheus(qConfig)
ticker := time.NewTicker(*metricInterval)
if *backfillDuration != "" {
md, err := model.ParseDuration(*backfillDuration)
if err != nil {
log.Fatalf("Incorrect duration format: %s", *backfillDuration)
}
d := time.Duration(md)
queryAndPush(&d)
} else {
queryAndPush(nil)
}

ticker := time.NewTicker(*metricInterval)
for {
select {
case <-ticker.C:
go queryPrometheus(qConfig)
go queryAndPush(nil)
}
}
}

func queryPrometheus(qConfig queryConfig) {
client, err := api.NewClient(api.Config{Address: *prometheusURL})
if err != nil {
log.Fatalf("Couldn't create Prometheus client: %s", err)
}
api := prometheus.NewAPI(client)
func queryAndPush(backfill *time.Duration) {
log.Infof("Started to query and pushing metrics")

for metricID, query := range qConfig {
ctxlog := log.WithField("metric_id", metricID)
metrics := queryPrometheus(backfill)
chunkedMetrics := chunkMetrics(metrics)

ts := time.Now()
resp, warnings, err := api.Query(context.Background(), query, ts)
for _, m := range chunkedMetrics {
err := pushStatuspage(m)
if err != nil {
ctxlog.Errorf("Couldn't query Prometheus: %s", err)
continue
}

if len(warnings) > 0 {
for _, warning := range warnings {
ctxlog.Warnf("Prometheus query warning: %s", warning)
}
log.Error(err)
}
}

vec := resp.(model.Vector)
if l := vec.Len(); l != 1 {
ctxlog.Errorf("Expected query to return single value, actual %d samples", l)
continue
}
log.Infof("Pushed metrics to Statuspage")
}

value := vec[0].Value
if "NaN" == value.String() {
ctxlog.Error("Query returns NaN")
continue
}
func pushStatuspage(metrics statuspageMetrics) error {
jsonContents, err := json.Marshal(statuspageMetricsPayload{Data: metrics})
if err != nil {
return err
}

ctxlog.Infof("Query result: %v", value)
log.Debugf("Metrics payload pushing to Statuspage: %s", jsonContents)

if err := sendStatusPage(ts, metricID, float64(value)); err != nil {
ctxlog.Error("Couldn't send metric to Statuspage: %s", err)
continue
}
metricIDs := make([]string, 0, len(metrics))
for id := range metrics {
metricIDs = append(metricIDs, id)
}

log.Infof("Pushed metrics to statuspage.io")
}
log.Infof("Pushing metrics: %s", strings.Join(metricIDs, ", "))

func sendStatusPage(ts time.Time, metricID string, value float64) error {
values := url.Values{
"data[timestamp]": []string{strconv.FormatInt(ts.Unix(), 10)},
"data[value]": []string{strconv.FormatFloat(value, 'f', -1, 64)},
}
url := "https://api.statuspage.io" + path.Join("/v1", "pages", *statusPageID, "metrics", metricID, "data.json")
req, err := http.NewRequest("POST", url, strings.NewReader(values.Encode()))
url := fmt.Sprintf("https://api.statuspage.io/v1/pages/%s/metrics/data", url.PathEscape(*statusPageID))
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonContents))
if err != nil {
return err
}

req.Header.Set("Authorization", "OAuth "+*statusPageAPIKey)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
req.Header.Set("Content-Type", "application/json")
resp, err := httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode < 200 || resp.StatusCode >= 300 {
respStr, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("HTTP status %d, Empty API response", resp.StatusCode)
}
return fmt.Errorf("HTTP status %d, API error: %s", resp.StatusCode, string(respStr))
}

return nil
}
47 changes: 47 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package main

type statuspageMetricPoint struct {
Timestamp int64 `json:"timestamp"`
Value float64 `json:"value"`
}

type statuspageMetrics map[string][]statuspageMetricPoint

type statuspageMetricsPayload struct {
Data statuspageMetrics `json:"data"`
}

const MAX_NUMBER_OF_METRIC_POINTS_PER_REQUEST = 3000

func chunkMetrics(metrics statuspageMetrics) []statuspageMetrics {
chunkedMetrics := []statuspageMetrics{}

chunk := statuspageMetrics{}
capacity := MAX_NUMBER_OF_METRIC_POINTS_PER_REQUEST
for metricID, points := range metrics {
var start, end int
length := len(points)
for start < length {
end = start + capacity
if end > length {
end = length
}
if _, ok := chunk[metricID]; ok {
chunk[metricID] = append(chunk[metricID], points[start:end]...)
} else {
chunk[metricID] = points[start:end]
}
capacity = capacity - (end - start)
if capacity == 0 {
chunkedMetrics = append(chunkedMetrics, chunk)
chunk = statuspageMetrics{}
capacity = MAX_NUMBER_OF_METRIC_POINTS_PER_REQUEST
}
start = end
}
}

chunkedMetrics = append(chunkedMetrics, chunk)

return chunkedMetrics
}
143 changes: 143 additions & 0 deletions prometheus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package main

import (
"context"
"fmt"
"math"
"time"

log "github.com/sirupsen/logrus"

"github.com/prometheus/client_golang/api"
prometheus "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
)

func queryPrometheus(backfill *time.Duration) statuspageMetrics {
client, err := api.NewClient(api.Config{Address: *prometheusURL})
if err != nil {
log.Fatalf("Couldn't create Prometheus client: %s", err)
}
api := prometheus.NewAPI(client)

metrics := make(statuspageMetrics)
for metricID, query := range queryConfig {
ctxlog := log.WithFields(log.Fields{
"metric_id": metricID,
"backfill": backfill,
})

var (
err error
warnings prometheus.Warnings
metricPoints []statuspageMetricPoint
)

if backfill == nil {
metricPoints, warnings, err = queryInstant(api, query, ctxlog)
} else {
metricPoints, warnings, err = queryRange(api, query, backfill, ctxlog)
}

for _, w := range warnings {
ctxlog.Warnf("Prometheus query warning: %s", w)
}

if err != nil {
ctxlog.Error(err)
continue
}

metrics[metricID] = metricPoints
}

return metrics
}

func queryInstant(api prometheus.API, query string, logger *log.Entry) ([]statuspageMetricPoint, prometheus.Warnings, error) {
now := time.Now()
response, warnings, err := api.Query(context.Background(), query, now)

if err != nil {
return nil, warnings, fmt.Errorf("Couldn't query Prometheus: %w", err)
}

if response.Type() != model.ValVector {
return nil, warnings, fmt.Errorf("Expected result type %s, got %s", model.ValVector, response.Type())
}

vec := response.(model.Vector)
if l := vec.Len(); l != 1 {
return nil, warnings, fmt.Errorf("Expected single time serial, got %d", l)
}

value := vec[0].Value
logger.Infof("Query result: %s", value)

if math.IsNaN(float64(value)) {
return nil, warnings, fmt.Errorf("Invalid metric value NaN")
}

return []statuspageMetricPoint{
{
Timestamp: int64(vec[0].Timestamp / 1000),
Value: float64(value),
},
}, warnings, nil
}

func queryRange(api prometheus.API, query string, backfill *time.Duration, logger *log.Entry) ([]statuspageMetricPoint, prometheus.Warnings, error) {
now := time.Now()
start := now.Add(-*backfill)
var (
end time.Time
promWarnings prometheus.Warnings
metricPoints []statuspageMetricPoint
)

for start.Before(now) {
end = start.Add(24 * time.Hour) // 24h as a step
if end.After(now) {
end = now
}

logger.Infof("Querying metrics from %s to %s with step %s", start.Format(time.RFC3339), end.Format(time.RFC3339), *metricInterval)
response, warnings, err := api.QueryRange(context.Background(), query, prometheus.Range{
Start: start,
End: end,
Step: *metricInterval,
})
promWarnings = append(promWarnings, warnings...)

if err != nil {
return nil, promWarnings, fmt.Errorf("Couldn't query Prometheus: %w", err)
}

if response.Type() != model.ValMatrix {
return nil, promWarnings, fmt.Errorf("Expected result type %s, got %s", model.ValMatrix, response.Type())
}

mtx := response.(model.Matrix)
if l := mtx.Len(); l != 1 {
return nil, promWarnings, fmt.Errorf("Expected single time serial, got %d", l)
}

logger.Infof("Got %d samples", len(mtx[0].Values))
logger.Debugf("Query result: %v", mtx[0].Values)

for _, v := range mtx[0].Values {
if math.IsNaN(float64(v.Value)) {
logger.Warn("Invalid metric value NaN")
continue
}
metricPoints = append(metricPoints, statuspageMetricPoint{
Timestamp: int64(v.Timestamp / 1000),
Value: float64(v.Value),
})
}

start = end.Add(1 * time.Millisecond)
}

return metricPoints, promWarnings, nil
}

0 comments on commit 17cad28

Please sign in to comment.