Skip to content

Commit

Permalink
fix queries
Browse files Browse the repository at this point in the history
  • Loading branch information
marianososto committed Mar 8, 2024
1 parent 10778c5 commit 44b5c00
Showing 1 changed file with 71 additions and 45 deletions.
116 changes: 71 additions & 45 deletions api/handlers/protocols/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ from(bucket: "%s")

const QueryTemplateActivityLatestPoint = `
from(bucket: "%s")
|> range(start: -3d)
|> range(start: 1970-01-01T00:00:00Z)
|> filter(fn: (r) => r._measurement == "%s" and r.protocol == "%s" and r.version == "%s")
|> keep(columns: ["_time","_field","protocol", "_value", "total_value_secure", "total_value_transferred"])
|> last()
Expand All @@ -42,20 +42,27 @@ const QueryIntProtocolsTotalStartOfDay = `
startOfCurrentDay = date.truncate(t: now(), unit: 1d)
from(bucket: "%s")
data = from(bucket: "%s")
|> range(start: 1970-01-01T00:00:00Z,stop:startOfCurrentDay)
|> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s")
tvt = data
|> filter(fn : (r) => r._field == "total_value_transferred")
|> group()
|> sum()
|> set(key:"_field",value:"total_value_transferred")
|> map(fn: (r) => ({r with _value: int(v: r._value)}))
totalMsgs = data
|> filter(fn : (r) => r._field == "total_messages")
|> group()
|> sum()
|> set(key:"_field",value:"total_messages")
union(tables:[tvt,totalMsgs])
|> set(key:"_time",value:string(v:startOfCurrentDay))
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> group(columns: ["app_id"])
|> reduce(fn: (r, accumulator) => ({
total_messages: accumulator.total_messages + (if exists r.total_messages then r.total_messages else int(v:0)),
total_value_transferred: accumulator.total_value_transferred + (if exists r.total_value_transferred then r.total_value_transferred else int(v:0))
//total_messages: accumulator.total_messages + (if exists r.total_messages then r.total_messages else float(v:0)),
//total_value_transferred: accumulator.total_value_transferred + (if exists r.total_value_transferred then r.total_value_transferred else float(v:0))
}), identity: {
total_messages: float(v:0),
total_value_transferred: float(v:0.0)
})
|> set(key:"app_id",value:"%s")
`

// QueryIntProtocolsDeltaSinceStartOfDay calculate delta since the beginning of current day
Expand All @@ -65,41 +72,60 @@ const QueryIntProtocolsDeltaSinceStartOfDay = `
ts = date.truncate(t: now(), unit: 1h)
startOfDay = date.truncate(t: now(), unit: 1d)
from(bucket: "%s")
|> range(start: startOfDay, stop: ts)
|> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> group(columns: ["app_id"])
|> reduce(fn: (r, accumulator) => ({
total_messages: accumulator.total_messages + (if exists r.total_messages then r.total_messages else float(v:0)),
total_value_transferred: accumulator.total_value_transferred + (if exists r.total_value_transferred then r.total_value_transferred else float(v:0))
}), identity: {
total_messages: float(v:0),
total_value_transferred: float(v:0.0)
})
data = from(bucket: "%s")
|> range(start: startOfDay,stop:ts)
|> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s")
tvt = data
|> filter(fn : (r) => r._field == "total_value_transferred")
|> group()
|> sum()
|> set(key:"_field",value:"total_value_transferred")
|> map(fn: (r) => ({r with _value: int(v: r._value)}))
totalMsgs = data
|> filter(fn : (r) => r._field == "total_messages")
|> group()
|> sum()
|> set(key:"_field",value:"total_messages")
union(tables:[tvt,totalMsgs])
|> set(key:"_time",value:string(v:startOfDay))
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> set(key:"app_id",value:"%s")
`

// QueryIntProtocolsDeltaLastDay calculate last day delta
const QueryIntProtocolsDeltaLastDay = `
import "date"
import "types"
ts = date.truncate(t: now(), unit: 1h)
yesterday = date.sub(d: 1d, from: ts)
import "date"
import "types"
from(bucket: "%s")
|> range(start: yesterday, stop: ts)
|> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> group(columns: ["app_id"])
|> reduce(fn: (r, accumulator) => ({
total_messages: accumulator.total_messages + (if exists r.total_messages then r.total_messages else float(v:0)),
total_value_transferred: accumulator.total_value_transferred + (if exists r.total_value_transferred then r.total_value_transferred else float(v:0))
}), identity: {
total_messages: float(v:0),
total_value_transferred: float(v:0.0)
})
ts = date.truncate(t: now(), unit: 1h)
yesterday = date.sub(d: 1d, from: ts)
data = from(bucket: "%s")
|> range(start: yesterday,stop:ts)
|> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s")
tvt = data
|> filter(fn : (r) => r._field == "total_value_transferred")
|> group()
|> sum()
|> set(key:"_field",value:"total_value_transferred")
|> map(fn: (r) => ({r with _value: int(v: r._value)}))
totalMsgs = data
|> filter(fn : (r) => r._field == "total_messages")
|> group()
|> sum()
|> set(key:"_field",value:"total_messages")
union(tables:[tvt,totalMsgs])
|> set(key:"_time",value:string(v:yesterday))
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> set(key:"app_id",value:"%s")
`

type Repository struct {
Expand Down Expand Up @@ -216,14 +242,14 @@ func (r *Repository) getProtocolActivity(ctx context.Context, protocol string) (
func (r *Repository) getInternalProtocolStats(ctx context.Context, protocol string) (intStats, error) {

// calculate total values till the start of current day
totalTillCurrentDayQuery := fmt.Sprintf(QueryIntProtocolsTotalStartOfDay, r.bucketInfinite, r.intProtocolMeasurement[protocol].Daily, protocol)
totalTillCurrentDayQuery := fmt.Sprintf(QueryIntProtocolsTotalStartOfDay, r.bucketInfinite, r.intProtocolMeasurement[protocol].Daily, protocol, protocol)
totalsUntilToday, err := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, totalTillCurrentDayQuery, protocol)
if err != nil {
return intStats{}, err
}

// calculate delta since the beginning of current day
q2 := fmt.Sprintf(QueryIntProtocolsDeltaSinceStartOfDay, r.bucket30d, r.intProtocolMeasurement[protocol].Hourly, protocol)
q2 := fmt.Sprintf(QueryIntProtocolsDeltaSinceStartOfDay, r.bucket30d, r.intProtocolMeasurement[protocol].Hourly, protocol, protocol)
currentDayStats, errCD := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, q2, protocol)
if errCD != nil {
return intStats{}, errCD
Expand All @@ -240,7 +266,7 @@ func (r *Repository) getInternalProtocolStats(ctx context.Context, protocol stri
}

// calculate last day delta
q3 := fmt.Sprintf(QueryIntProtocolsDeltaLastDay, r.bucket30d, r.intProtocolMeasurement[protocol].Hourly, protocol)
q3 := fmt.Sprintf(QueryIntProtocolsDeltaLastDay, r.bucket30d, r.intProtocolMeasurement[protocol].Hourly, protocol, protocol)
deltaYesterdayStats, errQ3 := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, q3, protocol)
if errQ3 != nil {
return result, errQ3
Expand Down

0 comments on commit 44b5c00

Please sign in to comment.