From 076a33a64cd832740dc1b0d930b1468b757d9145 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Fri, 19 Apr 2024 20:43:02 +0530 Subject: [PATCH] Use clickhouse.ParseDSN for clickhouse URL parsing (#294) --- exporter/clickhouselogsexporter/exporter.go | 31 ++++------------- .../clickhousemetricsexporter/clickhouse.go | 30 ++++++----------- exporter/clickhousetracesexporter/options.go | 27 +++++++-------- .../migrators/basemigrator/migrator.go | 33 ++++--------------- 4 files changed, 34 insertions(+), 87 deletions(-) diff --git a/exporter/clickhouselogsexporter/exporter.go b/exporter/clickhouselogsexporter/exporter.go index 46caace5..f0926354 100644 --- a/exporter/clickhouselogsexporter/exporter.go +++ b/exporter/clickhouselogsexporter/exporter.go @@ -19,7 +19,6 @@ import ( "encoding/json" "errors" "fmt" - "net/url" "regexp" "strconv" "strings" @@ -540,34 +539,16 @@ const ( func newClickhouseClient(logger *zap.Logger, cfg *Config) (clickhouse.Conn, error) { // use empty database to create database ctx := context.Background() - dsnURL, err := url.Parse(cfg.DSN) + options, err := clickhouse.ParseDSN(cfg.DSN) if err != nil { return nil, err } - // setting maxOpenIdleConnections = numConsumers + 1 to avoid `prepareBatch:clickhouse: acquire conn timeout` error - maxOpenIdleConnections := cfg.QueueSettings.NumConsumers + 1 - - options := &clickhouse.Options{ - Addr: []string{dsnURL.Host}, - MaxOpenConns: maxOpenIdleConnections + 5, - MaxIdleConns: maxOpenIdleConnections, - } - - if dsnURL.Query().Get("username") != "" { - auth := clickhouse.Auth{ - Username: dsnURL.Query().Get("username"), - Password: dsnURL.Query().Get("password"), - } - options.Auth = auth - } - - if dsnURL.Query().Get("dial_timeout") != "" { - dialTimeout, err := time.ParseDuration(dsnURL.Query().Get("dial_timeout")) - if err != nil { - return nil, fmt.Errorf("failed to parse dial_timeout from dsn: %w", err) - } - options.DialTimeout = dialTimeout + // setting maxIdleConnections = numConsumers + 1 to avoid `prepareBatch:clickhouse: acquire conn timeout` error + maxIdleConnections := cfg.QueueSettings.NumConsumers + 1 + if options.MaxIdleConns < maxIdleConnections { + options.MaxIdleConns = maxIdleConnections + options.MaxOpenConns = maxIdleConnections + 5 } db, err := clickhouse.Open(options) diff --git a/exporter/clickhousemetricsexporter/clickhouse.go b/exporter/clickhousemetricsexporter/clickhouse.go index 80313979..2dbb2948 100644 --- a/exporter/clickhousemetricsexporter/clickhouse.go +++ b/exporter/clickhousemetricsexporter/clickhouse.go @@ -19,7 +19,6 @@ import ( "context" "fmt" "math" - "net/url" "runtime/pprof" "strings" "sync" @@ -93,31 +92,22 @@ type ClickHouseParams struct { func NewClickHouse(params *ClickHouseParams) (base.Storage, error) { l := logrus.WithField("component", "clickhouse") - dsnURL, err := url.Parse(params.DSN) + options, err := clickhouse.ParseDSN(params.DSN) if err != nil { return nil, err } - database := dsnURL.Query().Get("database") - if database == "" { - return nil, fmt.Errorf("database should be set in ClickHouse DSN") - } - options := &clickhouse.Options{ - Addr: []string{dsnURL.Host}, - MaxIdleConns: params.MaxIdleConns, - MaxOpenConns: params.MaxOpenConns, - DialTimeout: 1 * time.Minute, + if options.MaxIdleConns < params.MaxIdleConns { + options.MaxIdleConns = params.MaxIdleConns } - if dsnURL.Query().Get("username") != "" { - auth := clickhouse.Auth{ - // Database: "", - Username: dsnURL.Query().Get("username"), - Password: dsnURL.Query().Get("password"), - } - - options.Auth = auth + if options.MaxOpenConns < params.MaxOpenConns { + options.MaxOpenConns = params.MaxOpenConns } + if options.DialTimeout < 1*time.Minute { + options.DialTimeout = 1 * time.Minute + } + conn, err := clickhouse.Open(options) if err != nil { return nil, fmt.Errorf("could not connect to clickhouse: %s", err) @@ -126,7 +116,7 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) { ch := &clickHouse{ conn: conn, l: l, - database: database, + database: options.Auth.Database, maxTimeSeriesInQuery: params.MaxTimeSeriesInQuery, timeSeries: make(map[uint64]struct{}, 8192), diff --git a/exporter/clickhousetracesexporter/options.go b/exporter/clickhousetracesexporter/options.go index 89053c22..268ca5d9 100644 --- a/exporter/clickhousetracesexporter/options.go +++ b/exporter/clickhousetracesexporter/options.go @@ -18,7 +18,6 @@ import ( "context" "flag" "fmt" - "net/url" "github.com/ClickHouse/clickhouse-go/v2" "github.com/google/uuid" @@ -91,21 +90,19 @@ type Connector func(cfg *namespaceConfig) (clickhouse.Conn, error) func defaultConnector(cfg *namespaceConfig) (clickhouse.Conn, error) { ctx := context.Background() + options, err := clickhouse.ParseDSN(cfg.Datasource) + + if err != nil { + return nil, err + } + // setting maxOpenIdleConnections = numConsumers + 1 to avoid `prepareBatch:clickhouse: acquire conn timeout` // error when using multiple consumers along with usage exporter - maxOpenIdleConnections := cfg.NumConsumers + 1 - dsnURL, err := url.Parse(cfg.Datasource) - options := &clickhouse.Options{ - Addr: []string{dsnURL.Host}, - MaxOpenConns: maxOpenIdleConnections + 5, - MaxIdleConns: maxOpenIdleConnections, - } - if dsnURL.Query().Get("username") != "" { - auth := clickhouse.Auth{ - Username: dsnURL.Query().Get("username"), - Password: dsnURL.Query().Get("password"), - } - options.Auth = auth + maxIdleConnections := cfg.NumConsumers + 1 + + if options.MaxIdleConns < maxIdleConnections { + options.MaxIdleConns = maxIdleConnections + options.MaxOpenConns = maxIdleConnections + 5 } db, err := clickhouse.Open(options) if err != nil { @@ -116,7 +113,7 @@ func defaultConnector(cfg *namespaceConfig) (clickhouse.Conn, error) { return nil, err } - query := fmt.Sprintf(`CREATE DATABASE IF NOT EXISTS %s ON CLUSTER %s`, dsnURL.Query().Get("database"), cfg.Cluster) + query := fmt.Sprintf(`CREATE DATABASE IF NOT EXISTS %s ON CLUSTER %s`, options.Auth.Database, cfg.Cluster) if err := db.Exec(ctx, query); err != nil { return nil, err } diff --git a/migrationmanager/migrators/basemigrator/migrator.go b/migrationmanager/migrators/basemigrator/migrator.go index d6faa124..d2fd4906 100644 --- a/migrationmanager/migrators/basemigrator/migrator.go +++ b/migrationmanager/migrators/basemigrator/migrator.go @@ -3,7 +3,6 @@ package basemigrator import ( "context" "fmt" - "net/url" "strings" "github.com/ClickHouse/clickhouse-go/v2" @@ -86,21 +85,10 @@ func (m *BaseMigrator) runSqlMigrations(ctx context.Context, migrationFolder, da func (m *BaseMigrator) buildClickhouseMigrateURL(database string) (string, error) { var clickhouseUrl, migrationsTableEngine string - parsedURL, err := url.Parse(m.Cfg.DSN) + options, err := clickhouse.ParseDSN(m.Cfg.DSN) if err != nil { return "", err } - host := parsedURL.Host - if host == "" { - return "", fmt.Errorf("unable to parse host") - - } - paramMap, err := url.ParseQuery(parsedURL.RawQuery) - if err != nil { - return "", err - } - username := paramMap["username"] - password := paramMap["password"] if m.Cfg.ReplicationEnabled { migrationsTableEngine = "ReplicatedMergeTree" @@ -108,29 +96,20 @@ func (m *BaseMigrator) buildClickhouseMigrateURL(database string) (string, error migrationsTableEngine = "MergeTree" } - if len(username) > 0 && len(password) > 0 { - clickhouseUrl = fmt.Sprintf("clickhouse://%s:%s@%s/%s?x-multi-statement=true&x-cluster-name=%s&x-migrations-table=schema_migrations&x-migrations-table-engine=%s", username[0], password[0], host, database, m.Cfg.ClusterName, migrationsTableEngine) + if len(options.Auth.Username) > 0 && len(options.Auth.Password) > 0 { + clickhouseUrl = fmt.Sprintf("clickhouse://%s:%s@%s/%s?x-multi-statement=true&x-cluster-name=%s&x-migrations-table=schema_migrations&x-migrations-table-engine=%s", options.Auth.Username, options.Auth.Password, options.Addr[0], database, m.Cfg.ClusterName, migrationsTableEngine) } else { - clickhouseUrl = fmt.Sprintf("clickhouse://%s/%s?x-multi-statement=true&x-cluster-name=%s&x-migrations-table=schema_migrations&x-migrations-table-engine=%s", host, database, m.Cfg.ClusterName, migrationsTableEngine) + clickhouseUrl = fmt.Sprintf("clickhouse://%s/%s?x-multi-statement=true&x-cluster-name=%s&x-migrations-table=schema_migrations&x-migrations-table-engine=%s", options.Addr[0], database, m.Cfg.ClusterName, migrationsTableEngine) } return clickhouseUrl, nil } func createClickhouseConnection(dsn string) (driver.Conn, error) { - dsnURL, err := url.Parse(dsn) + options, err := clickhouse.ParseDSN(dsn) if err != nil { return nil, fmt.Errorf("failed to parse dsn: %w", err) } - options := &clickhouse.Options{ - Addr: []string{dsnURL.Host}, - } - if dsnURL.Query().Get("username") != "" { - auth := clickhouse.Auth{ - Username: dsnURL.Query().Get("username"), - Password: dsnURL.Query().Get("password"), - } - options.Auth = auth - } + db, err := clickhouse.Open(options) if err != nil { return nil, fmt.Errorf("failed to open clickhouse connection: %w", err)