Skip to content

Commit

Permalink
Move usage to new table write block (#360)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv committed Aug 12, 2024
1 parent 140c30d commit 6f34c44
Showing 1 changed file with 21 additions and 22 deletions.
43 changes: 21 additions & 22 deletions exporter/clickhousemetricsexporter/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
return err
}

metrics := map[string]usage.Metric{}
err = func() error {
ctx := context.Background()

Expand All @@ -318,23 +317,6 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
fingerprint := fingerprints[i]
for _, s := range ts.Samples {

// usage collection checks
tenant := "default"
collectUsage := true
for _, val := range timeSeries[fingerprint] {
if val.Name == nameLabel && (strings.HasPrefix(val.Value, "signoz_") || strings.HasPrefix(val.Value, "chi_") || strings.HasPrefix(val.Value, "otelcol_")) {
collectUsage = false
break
}
if val.Name == "tenant" {
tenant = val.Value
}
}

if collectUsage {
usage.AddMetric(metrics, tenant, 1, int64(len(s.String())))
}

err = statement.Append(
fingerprintToName[fingerprint][nameLabel],
fingerprint,
Expand All @@ -359,12 +341,9 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
return err
}

for k, v := range metrics {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(usage.TagTenantKey, k), tag.Upsert(usage.TagExporterIdKey, ch.exporterID.String())}, ExporterSigNozSentMetricPoints.M(int64(v.Count)), ExporterSigNozSentMetricPointsBytes.M(int64(v.Size)))
}

// write to distributed_samples_v4 table
if ch.writeTSToV4 {
metrics := map[string]usage.Metric{}
err = func() error {
statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, unix_milli, value) VALUES (?, ?, ?, ?, ?, ?)", ch.database, DISTRIBUTED_SAMPLES_TABLE_V4), driver.WithReleaseConnection())
if err != nil {
Expand All @@ -386,6 +365,23 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
if err != nil {
return err
}

// usage collection checks
tenant := "default"
collectUsage := true
for _, val := range timeSeries[fingerprint] {
if val.Name == nameLabel && (strings.HasPrefix(val.Value, "signoz_") || strings.HasPrefix(val.Value, "chi_") || strings.HasPrefix(val.Value, "otelcol_")) {
collectUsage = false
break
}
if val.Name == "tenant" {
tenant = val.Value
}
}

if collectUsage {
usage.AddMetric(metrics, tenant, 1, int64(len(s.String())))
}
}
}

Expand All @@ -402,6 +398,9 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
if err != nil {
return err
}
for k, v := range metrics {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(usage.TagTenantKey, k), tag.Upsert(usage.TagExporterIdKey, ch.exporterID.String())}, ExporterSigNozSentMetricPoints.M(int64(v.Count)), ExporterSigNozSentMetricPointsBytes.M(int64(v.Size)))
}
}

// write to distributed_time_series_v4 table
Expand Down

0 comments on commit 6f34c44

Please sign in to comment.