Skip to content

Commit

Permalink
Fix migration to also migrate to local databases.
Browse files Browse the repository at this point in the history
  • Loading branch information
KevinJoiner committed Apr 24, 2024
1 parent d1a818f commit 8963438
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 9 deletions.
14 changes: 11 additions & 3 deletions pkg/clickhouseinfra/clickhouseinfra.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"database/sql"
"fmt"
"os"
"time"

"github.com/ClickHouse/ch-go"
"github.com/ClickHouse/clickhouse-go/v2"
Expand Down Expand Up @@ -91,10 +92,17 @@ func GetClickhouseAsDB(ctx context.Context, container *chmodule.ClickHouseContai
Database: container.DbName,
},
})
if err := dbConn.Ping(); err != nil {
return nil, fmt.Errorf("failed to ping clickhouse: %w", err)
const retries = 3
for i := 0; i < retries; i++ {
err = dbConn.Ping()
if err != nil {
time.Sleep(500 * time.Millisecond)
continue
}
return dbConn, nil
}
return dbConn, nil

return nil, fmt.Errorf("failed to ping clickhouse after %d retries: %w", retries, err)
}

// Terminate function terminates the clickhouse and zookeeper containers.
Expand Down
2 changes: 0 additions & 2 deletions pkg/migrations/20240411200712_clustered_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,3 @@ func downCommand20240411200712(ctx context.Context, tx *sql.Tx) error {
var clusterSignalCreateStmt = `CREATE table signal_shard on CLUSTER '{cluster}' as local_signal
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/{database}/{table}', '{replica}')
ORDER BY (TokenID, Timestamp, Name)`

var distributedSignalCreateStmt = `CREATE TABLE signal ON CLUSTER '{cluster}' AS signal_shard ENGINE = Distributed('{cluster}', default, signal_shard, TokenID)`
70 changes: 70 additions & 0 deletions pkg/migrations/20240423004039_local_updates_migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package migrations

import (
"context"
"database/sql"
"fmt"
"runtime"

"github.com/pressly/goose/v3"
)

func init() {
_, filename, _, _ := runtime.Caller(0)
registerFunc := func() { goose.AddNamedMigrationContext(filename, upCommand20240423004039, downCommand20240423004039) }
registerFuncs = append(registerFuncs, registerFunc)
registerFunc()
}

func upCommand20240423004039(ctx context.Context, tx *sql.Tx) error {
// This code is executed when the migration is applied.
upStatements := []string{
"DROP TABLE default.signal ON CLUSTER '{cluster}'",
"RENAME TABLE default.signal_shard TO default.signal_shard_old ON CLUSTER '{cluster}'",
createSignalShardStmt,
"INSERT INTO default.signal_shard SELECT TokenID, Timestamp, Name, ValueNumber, ValueString FROM default.signal_shard_old",
distributedSignalCreateStmt,
"DROP TABLE default.signal_shard_old ON CLUSTER '{cluster}'",
}
for _, upStatement := range upStatements {
_, err := tx.ExecContext(ctx, upStatement)
if err != nil {
return fmt.Errorf("failed to execute statment %s: %w", upStatement, err)
}
}
return nil
}

func downCommand20240423004039(ctx context.Context, tx *sql.Tx) error {
// This code is executed when the migration is rolled back.
downStatements := []string{
"ALTER TABLE signal MODIFY COLUMN Timestamp Datetime('UTC')",
}
for _, downStatement := range downStatements {
_, err := tx.ExecContext(ctx, downStatement)
if err != nil {
return err
}
}
return nil
}

var (
createSignalShardStmt = `
CREATE TABLE default.signal_shard ON CLUSTER '{cluster}'
(
TokenID UInt32 COMMENT 'tokenID of this device data.',
Timestamp DateTime64(6, 'UTC') COMMENT 'timestamp of when this data was colllected.',
Name LowCardinality(String) COMMENT 'name of the signal collected.',
ValueNumber Float64 COMMENT 'float64 value of the signal collected.',
ValueString String COMMENT 'string value of the signal collected.'
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/{database}/{table}/{uuid}', '{replica}')
ORDER BY (TokenID, Timestamp, Name)
`

distributedSignalCreateStmt = `
CREATE TABLE signal ON CLUSTER '{cluster}' AS signal_shard
ENGINE = Distributed('{cluster}', default, signal_shard, TokenID)
`
)
6 changes: 5 additions & 1 deletion pkg/migrations/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,20 @@ func TestMigration(t *testing.T) {
columns, err := clickhouseinfra.GetCurrentCols(ctx, conn, "signal")
require.NoError(t, err, "Failed to get current columns")

columnsShard, err := clickhouseinfra.GetCurrentCols(ctx, conn, "signal_shard")
require.NoError(t, err, "Failed to get current columns")

expectedColumns := []clickhouseinfra.ColInfo{
{Name: "TokenID", Type: "UInt32", Comment: "tokenID of this device data."},
{Name: "Timestamp", Type: "DateTime('UTC')", Comment: "timestamp of when this data was colllected."},
{Name: "Timestamp", Type: "DateTime64(6, 'UTC')", Comment: "timestamp of when this data was colllected."},
{Name: "Name", Type: "LowCardinality(String)", Comment: "name of the signal collected."},
{Name: "ValueNumber", Type: "Float64", Comment: "float64 value of the signal collected."},
{Name: "ValueString", Type: "String", Comment: "string value of the signal collected."},
}

// Check if the actual columns match the expected columns
require.Equal(t, expectedColumns, columns, "Unexpected table columns")
require.Equal(t, expectedColumns, columnsShard, "Unexpected shard table columns")

// Close the DB connection
err = db.Close()
Expand Down
12 changes: 9 additions & 3 deletions pkg/vss/convert/payloadv1.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,19 @@ func timestampFromV1Data(jsonData []byte) (time.Time, error) {
if !result.Exists() {
return time.Time{}, errors.New("time field not found")
}
t, ok := result.Value().(string)

timeStr, ok := result.Value().(string)
if !ok {
return time.Time{}, errors.New("time field is not a string")
ms, ok := result.Value().(float64)
if ok {
return time.UnixMilli(int64(ms)), nil
}
return time.Time{}, errors.New("time field is not a string or float64")
}
ts, err := time.Parse(time.RFC3339, t)
ts, err := time.Parse(time.RFC3339, timeStr)
if err != nil {
return time.Time{}, fmt.Errorf("error parsing time: %w", err)
}
return ts, nil

}

0 comments on commit 8963438

Please sign in to comment.