Skip to content

Commit

Permalink
feat: separate migrations from codebase + templatize migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
dhawal1248 committed Oct 10, 2023
1 parent ad8afed commit e6bd49b
Show file tree
Hide file tree
Showing 99 changed files with 649 additions and 520 deletions.
74 changes: 74 additions & 0 deletions cmd/signozcollectormigrator/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package main

import (
"context"
"fmt"
"log"
"os"

"github.com/SigNoz/signoz-otel-collector/migrator"
"github.com/spf13/pflag"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

func init() {
// init zap logger
config := zap.NewProductionConfig()
config.EncoderConfig.EncodeLevel = zapcore.LowercaseLevelEncoder
config.EncoderConfig.TimeKey = "timestamp"
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
logger, err := config.Build()
if err != nil {
log.Fatalf("Failed to initialize zap logger %v", err)
}
// replace global logger
zap.ReplaceGlobals(logger)
}

func main() {
logger := zap.L().With(zap.String("component", "migrate cli"))
f := pflag.NewFlagSet("Collector Migrator CLI Options", pflag.ExitOnError)

f.Usage = func() {
fmt.Println(f.FlagUsages())
os.Exit(0)
}

f.String("dsn", "", "Clickhouse DSN")
f.String("cluster-name", "", "Cluster name to use while running migrations")
f.Bool("multi-node-cluster", false, "True if the dsn points to a multi node clickhouse cluster, false otherwise. Defaults to false.")

err := f.Parse(os.Args[1:])
if err != nil {
logger.Fatal("Failed to parse args", zap.Error(err))
}

dsn, err := f.GetString("dsn")
if err != nil {
logger.Fatal("Failed to get dsn from args", zap.Error(err))
}

clusterName, err := f.GetString("cluster-name")
if err != nil {
logger.Fatal("Failed to get cluster name from args", zap.Error(err))
}

multiNodeCluster, err := f.GetBool("multi-node-cluster")
if err != nil {
logger.Fatal("Failed to get multi node cluster flag from args", zap.Error(err))
}

if dsn == "" || clusterName == "" {
logger.Fatal("dsn and clusterName are required fields")
}

migrationManager, err := migrator.NewMigrationManager(dsn, clusterName, multiNodeCluster)
if err != nil {
logger.Fatal("Failed to create migration manager", zap.Error(err))
}
err = migrationManager.Migrate(context.Background())
if err != nil {
logger.Fatal("Failed to run migrations", zap.Error(err))
}
}
83 changes: 0 additions & 83 deletions exporter/clickhouselogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"log"
"net/url"
"os"
"strings"
"sync"
"time"
Expand All @@ -30,9 +29,6 @@ import (
driver "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/SigNoz/signoz-otel-collector/usage"
"github.com/SigNoz/signoz-otel-collector/utils"
"github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/clickhouse"
_ "github.com/golang-migrate/migrate/v4/source/file"
"github.com/segmentio/ksuid"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
Expand Down Expand Up @@ -428,88 +424,9 @@ func newClickhouseClient(logger *zap.Logger, cfg *Config) (clickhouse.Conn, erro
if err := db.Ping(ctx); err != nil {
return nil, err
}

q := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s ON CLUSTER %s;", databaseName, CLUSTER)
err = db.Exec(ctx, q)
if err != nil {
return nil, fmt.Errorf("failed to create database, err: %s", err)
}

// drop schema migrations table if running in docker multi node cluster mode so that migrations are run on new nodes
if cfg.DockerMultiNodeCluster {
err = dropSchemaMigrationsTable(db)
if err != nil {
logger.Error("Error dropping schema_migrations table", zap.Error(err))
return nil, err
}
}

// do the migration here

// get the migrations folder
mgsFolder := os.Getenv("LOG_MIGRATIONS_FOLDER")
if mgsFolder == "" {
mgsFolder = migrationsFolder
}

logger.Info("Running migrations from path: ", zap.Any("test", mgsFolder))
clickhouseUrl, err := buildClickhouseMigrateURL(cfg)
if err != nil {
return nil, fmt.Errorf("failed to build Clickhouse migrate URL, error: %s", err)
}
m, err := migrate.New("file://"+mgsFolder, clickhouseUrl)
if err != nil {
return nil, fmt.Errorf("clickhouse Migrate failed to run, error: %s", err)
}
// enable migration file templating
m.EnableTemplating = true

// run migrations
err = m.Up()
if err != nil && !strings.HasSuffix(err.Error(), "no change") {
return nil, fmt.Errorf("clickhouse Migrate failed to run, error: %s", err)
}

logger.Info("Clickhouse Migrate finished")
return db, nil
}

func dropSchemaMigrationsTable(db clickhouse.Conn) error {
err := db.Exec(context.Background(), fmt.Sprintf(`DROP TABLE IF EXISTS %s.%s ON CLUSTER %s;`,
databaseName, "schema_migrations", CLUSTER))
if err != nil {
return fmt.Errorf("error dropping schema_migrations table: %v", err)
}
return nil
}

func buildClickhouseMigrateURL(cfg *Config) (string, error) {
// return fmt.Sprintf("clickhouse://localhost:9000?database=default&x-multi-statement=true"), nil
var clickhouseUrl string
parsedURL, err := url.Parse(cfg.DSN)
if err != nil {
return "", err
}
host := parsedURL.Host
if host == "" {
return "", fmt.Errorf("unable to parse host")

}
paramMap, err := url.ParseQuery(parsedURL.RawQuery)
if err != nil {
return "", err
}
username := paramMap["username"]
password := paramMap["password"]

if len(username) > 0 && len(password) > 0 {
clickhouseUrl = fmt.Sprintf("clickhouse://%s:%s@%s/%s?x-multi-statement=true&x-cluster-name=%s&x-migrations-table=schema_migrations&x-migrations-table-engine=MergeTree", username[0], password[0], host, databaseName, CLUSTER)
} else {
clickhouseUrl = fmt.Sprintf("clickhouse://%s/%s?x-multi-statement=true&x-cluster-name=%s&x-migrations-table=schema_migrations&x-migrations-table-engine=MergeTree", host, databaseName, CLUSTER)
}
return clickhouseUrl, nil
}

func renderInsertLogsSQL(cfg *Config) string {
return fmt.Sprintf(insertLogsSQLTemplate, databaseName, DISTRIBUTED_LOGS_TABLE)
}

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

97 changes: 0 additions & 97 deletions exporter/clickhousemetricsexporter/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,85 +94,6 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) {
return nil, fmt.Errorf("database should be set in ClickHouse DSN")
}

var queries []string
if params.DropDatabase {
queries = append(queries, fmt.Sprintf(`DROP DATABASE IF EXISTS %s ON CLUSTER %s;`, database, CLUSTER))
}
queries = append(queries, fmt.Sprintf(`CREATE DATABASE IF NOT EXISTS %s ON CLUSTER %s`, database, CLUSTER))

queries = append(queries, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.%s ON CLUSTER %s (
metric_name LowCardinality(String),
fingerprint UInt64 Codec(DoubleDelta, LZ4),
timestamp_ms Int64 Codec(DoubleDelta, LZ4),
value Float64 Codec(Gorilla, LZ4)
)
ENGINE = MergeTree
PARTITION BY toDate(timestamp_ms / 1000)
ORDER BY (metric_name, fingerprint, timestamp_ms)
TTL toDateTime(timestamp_ms/1000) + INTERVAL 2592000 SECOND DELETE;`, database, SAMPLES_TABLE, CLUSTER))

queries = append(queries, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.%s ON CLUSTER %s AS %s.%s ENGINE = Distributed("%s", "%s", %s, cityHash64(metric_name, fingerprint));`, database, DISTRIBUTED_SAMPLES_TABLE, CLUSTER, database, SAMPLES_TABLE, CLUSTER, database, SAMPLES_TABLE))

queries = append(queries, fmt.Sprintf(`
ALTER TABLE %s.%s ON CLUSTER %s MODIFY SETTING ttl_only_drop_parts = 1;`, database, SAMPLES_TABLE, CLUSTER))

queries = append(queries, `SET allow_experimental_object_type = 1`)

queries = append(queries, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.%s ON CLUSTER %s(
metric_name LowCardinality(String),
fingerprint UInt64 Codec(DoubleDelta, LZ4),
timestamp_ms Int64 Codec(DoubleDelta, LZ4),
labels String Codec(ZSTD(5))
)
ENGINE = ReplacingMergeTree
PARTITION BY toDate(timestamp_ms / 1000)
ORDER BY (metric_name, fingerprint)
TTL toDateTime(timestamp_ms/1000) + INTERVAL 2592000 SECOND DELETE;`, database, TIME_SERIES_TABLE, CLUSTER))

queries = append(queries, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.%s ON CLUSTER %s AS %s.%s ENGINE = Distributed("%s", %s, %s, cityHash64(metric_name, fingerprint));`, database, DISTRIBUTED_TIME_SERIES_TABLE, CLUSTER, database, TIME_SERIES_TABLE, CLUSTER, database, TIME_SERIES_TABLE))

queries = append(queries, fmt.Sprintf(`
ALTER TABLE %s.%s ON CLUSTER %s DROP COLUMN IF EXISTS labels_object`, database, TIME_SERIES_TABLE, CLUSTER))

queries = append(queries, fmt.Sprintf(`
ALTER TABLE %s.%s ON CLUSTER %s DROP COLUMN IF EXISTS labels_object`, database, DISTRIBUTED_TIME_SERIES_TABLE, CLUSTER))

queries = append(queries, fmt.Sprintf(`
ALTER TABLE %s.%s ON CLUSTER %s MODIFY SETTING ttl_only_drop_parts = 1;`, database, TIME_SERIES_TABLE, CLUSTER))

// Add temporality column to time_series table
queries = append(queries, fmt.Sprintf(`
ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS temporality LowCardinality(String) DEFAULT 'Unspecified' CODEC(ZSTD(5))`, database, TIME_SERIES_TABLE, CLUSTER))

queries = append(queries, fmt.Sprintf(`
ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS temporality LowCardinality(String) DEFAULT 'Unspecified' CODEC(ZSTD(5))`, database, DISTRIBUTED_TIME_SERIES_TABLE, CLUSTER))

// Add set index on temporality column
queries = append(queries, fmt.Sprintf(`
ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS temporality_index temporality TYPE SET(3) GRANULARITY 1`, database, TIME_SERIES_TABLE, CLUSTER))

// Create a new table
queries = append(queries, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.%s ON CLUSTER %s (
env LowCardinality(String) DEFAULT 'default',
temporality LowCardinality(String) DEFAULT 'Unspecified',
metric_name LowCardinality(String),
fingerprint UInt64 CODEC(Delta, ZSTD),
timestamp_ms Int64 CODEC(Delta, ZSTD),
labels String CODEC(ZSTD(5))
)
ENGINE = ReplacingMergeTree
PARTITION BY toDate(timestamp_ms / 1000)
ORDER BY (env, temporality, metric_name, fingerprint);`, database, TIME_SERIES_TABLE_V3, CLUSTER))

// Create a new distributed table
queries = append(queries, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.%s ON CLUSTER %s AS %s.%s ENGINE = Distributed("%s", %s, %s, cityHash64(env, temporality, metric_name, fingerprint));`, database, DISTRIBUTED_TIME_SERIES_TABLE_V3, CLUSTER, database, TIME_SERIES_TABLE_V3, CLUSTER, database, TIME_SERIES_TABLE_V3))

options := &clickhouse.Options{
Addr: []string{dsnURL.Host},
}
Expand All @@ -186,28 +107,10 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) {
options.Auth = auth
}
conn, err := clickhouse.Open(options)

if err != nil {
return nil, fmt.Errorf("could not connect to clickhouse: %s", err)
}

for _, q := range queries {
q = strings.TrimSpace(q)
l.Infof("Executing:\n%s\n", q)
if err = conn.Exec(context.Background(), q); err != nil {
return nil, err
}
}

// TODO(srikanthccv): Remove this once we have a better way to handle data and last write
removeTTL := fmt.Sprintf(`
ALTER TABLE %s.%s ON CLUSTER %s REMOVE TTL;`, database, TIME_SERIES_TABLE, CLUSTER)
if err = conn.Exec(context.Background(), removeTTL); err != nil {
if !strings.Contains(err.Error(), "Table doesn't have any table TTL expression, cannot remove.") {
return nil, err
}
}

ch := &clickHouse{
conn: conn,
l: l,
Expand Down
Loading

0 comments on commit e6bd49b

Please sign in to comment.