Skip to content

Commit

Permalink
Add shard count watcher (#142)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv authored Jul 5, 2023
1 parent bef27fa commit 7969aef
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 32 deletions.
64 changes: 32 additions & 32 deletions exporter/clickhousemetricsexporter/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ type clickHouse struct {
// Maintains the lookup map for fingerprints that are
// written to time series table. This map is used to eliminate the
// unnecessary writes to table for the records that already exist.
timeSeries map[uint64]struct{}
timeSeries map[uint64]struct{}
prevShardCount uint64
watcherInterval time.Duration

mWrittenTimeSeries prometheus.Counter
}
Expand All @@ -70,6 +72,7 @@ type ClickHouseParams struct {
DropDatabase bool
MaxOpenConns int
MaxTimeSeriesInQuery int
WatcherInterval time.Duration
}

func NewClickHouse(params *ClickHouseParams) (base.Storage, error) {
Expand Down Expand Up @@ -161,6 +164,15 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) {
}
}

// TODO(srikanthccv): Remove this once we have a better way to handle data and last write
removeTTL := fmt.Sprintf(`
ALTER TABLE %s.%s ON CLUSTER %s REMOVE TTL;`, database, TIME_SERIES_TABLE, CLUSTER)
if err = conn.Exec(context.Background(), removeTTL); err != nil {
if !strings.Contains(err.Error(), "Table doesn't have any table TTL expression, cannot remove.") {
return nil, err
}
}

ch := &clickHouse{
conn: conn,
l: l,
Expand All @@ -175,60 +187,48 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) {
Name: "written_time_series",
Help: "Number of written time series.",
}),
watcherInterval: params.WatcherInterval,
}

go func() {
ctx := pprof.WithLabels(context.TODO(), pprof.Labels("component", "clickhouse_reloader"))
pprof.SetGoroutineLabels(ctx)
ch.runTimeSeriesReloader(ctx)
ch.shardCountWatcher(ctx)
}()

return ch, nil
}

// runTimeSeriesReloader periodically queries the time series table
// and updates the timeSeries lookup map with new fingerprints.
// One might wonder why is there a need to reload the data from clickhouse
// when it just suffices to keep track of the fingerprint for the incoming
// write requests. This is because there could be multiple instance of
// metric exporters and they would only contain partial info with latter
// approach.
func (ch *clickHouse) runTimeSeriesReloader(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
func (ch *clickHouse) shardCountWatcher(ctx context.Context) {
ticker := time.NewTicker(ch.watcherInterval)
defer ticker.Stop()

q := fmt.Sprintf(`SELECT DISTINCT fingerprint FROM %s.%s`, ch.database, DISTRIBUTED_TIME_SERIES_TABLE)
q := `SELECT count() FROM system.clusters WHERE cluster='cluster'`
for {
ch.timeSeriesRW.RLock()
timeSeries := make(map[uint64]struct{}, len(ch.timeSeries))
ch.timeSeriesRW.RUnlock()

err := func() error {
ch.l.Debug(q)
rows, err := ch.conn.Query(ctx, q)
row := ch.conn.QueryRow(ctx, q)
if row.Err() != nil {
return row.Err()
}

var shardCount uint64
err := row.Scan(&shardCount)
if err != nil {
return err
}
defer rows.Close()

var f uint64
for rows.Next() {
if err = rows.Scan(&f); err != nil {
return err
}
timeSeries[f] = struct{}{}
}
return rows.Err()
}()
if err == nil {
ch.timeSeriesRW.Lock()
n := len(timeSeries) - len(ch.timeSeries)
for f, m := range timeSeries {
ch.timeSeries[f] = m
if ch.prevShardCount != shardCount {
ch.l.Infof("Shard count changed from %d to %d. Resetting time series map.", ch.prevShardCount, shardCount)
ch.timeSeries = make(map[uint64]struct{})
}
ch.prevShardCount = shardCount
ch.timeSeriesRW.Unlock()
ch.l.Debugf("Loaded %d existing time series, %d were unknown to this instance.", len(timeSeries), n)
} else {
return nil
}()
if err != nil {
ch.l.Error(err)
}

Expand Down
3 changes: 3 additions & 0 deletions exporter/clickhousemetricsexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package clickhousemetricsexporter

import (
"fmt"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -43,6 +44,8 @@ type Config struct {
// "Enabled" - A boolean field to enable/disable this option. Default is `false`.
// If enabled, all the resource attributes will be converted to metric labels by default.
ResourceToTelemetrySettings resourcetotelemetry.Settings `mapstructure:"resource_to_telemetry_conversion"`

WatcherInterval time.Duration `mapstructure:"watcher_interval"`
}

// RemoteWriteQueue allows to configure the remote write queue.
Expand Down
1 change: 1 addition & 0 deletions exporter/clickhousemetricsexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func Test_loadConfig(t *testing.T) {
},
},
ResourceToTelemetrySettings: resourcetotelemetry.Settings{Enabled: true},
WatcherInterval: 30 * time.Second,
})
}

Expand Down
1 change: 1 addition & 0 deletions exporter/clickhousemetricsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func NewPrwExporter(cfg *Config, set exporter.CreateSettings) (*PrwExporter, err
DropDatabase: false,
MaxOpenConns: 75,
MaxTimeSeriesInQuery: 50,
WatcherInterval: cfg.WatcherInterval,
}
ch, err := NewClickHouse(params)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions exporter/clickhousemetricsexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,5 +128,6 @@ func createDefaultConfig() component.Config {
QueueSize: 10000,
NumConsumers: 5,
},
WatcherInterval: 30 * time.Second,
}
}

0 comments on commit 7969aef

Please sign in to comment.