Skip to content

Commit

Permalink
Fix migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv committed Aug 21, 2024
1 parent 5f9ef0f commit 9e95531
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 89 deletions.
77 changes: 73 additions & 4 deletions cmd/signozschemamigrator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log"
"os"
"strconv"
"strings"

"github.com/ClickHouse/clickhouse-go/v2"
Expand Down Expand Up @@ -56,12 +57,10 @@ func main() {
var dsn string
var replicationEnabled bool
var clusterName string
var downMigrations []string

cmd.PersistentFlags().StringVar(&dsn, "dsn", "", "Clickhouse DSN")
cmd.PersistentFlags().BoolVar(&replicationEnabled, "replication", false, "Enable replication")
cmd.PersistentFlags().StringVar(&clusterName, "cluster-name", "cluster", "Cluster name to use while running migrations")
cmd.PersistentFlags().StringArrayVar(&downMigrations, "down", []string{}, "Down migrations to run")

registerSyncMigrate(cmd)
registerAsyncMigrate(cmd)
Expand All @@ -73,6 +72,9 @@ func main() {

func registerSyncMigrate(cmd *cobra.Command) {

var upVersions string
var downVersions string

syncCmd := &cobra.Command{
Use: "sync",
Short: "Run migrations in sync mode",
Expand All @@ -81,6 +83,32 @@ func registerSyncMigrate(cmd *cobra.Command) {
replicationEnabled := strings.ToLower(cmd.Flags().Lookup("replication").Value.String()) == "true"
clusterName := cmd.Flags().Lookup("cluster-name").Value.String()

upVersions := []uint64{}
for _, version := range strings.Split(cmd.Flags().Lookup("up").Value.String(), ",") {
if version == "" {
continue
}
v, err := strconv.ParseUint(version, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse version: %w", err)
}
upVersions = append(upVersions, v)
}
downVersions := []uint64{}
for _, version := range strings.Split(cmd.Flags().Lookup("down").Value.String(), ",") {
if version == "" {
continue
}
v, err := strconv.ParseUint(version, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse version: %w", err)
}
downVersions = append(downVersions, v)
}
if len(upVersions) != 0 && len(downVersions) != 0 {
return fmt.Errorf("cannot provide both up and down migrations")
}

opts, err := clickhouse.ParseDSN(dsn)
if err != nil {
return fmt.Errorf("failed to parse dsn: %w", err)
Expand Down Expand Up @@ -108,15 +136,24 @@ func registerSyncMigrate(cmd *cobra.Command) {
if err != nil {
return fmt.Errorf("failed to run migrations: %w", err)
}
return manager.MigrateUpSync(context.Background())
if len(downVersions) != 0 {
return manager.MigrateDownSync(context.Background(), downVersions)
}
return manager.MigrateUpSync(context.Background(), upVersions)
},
}

syncCmd.Flags().StringVar(&upVersions, "up", "", "Up migrations to run, comma separated. Leave empty to run all up migrations")
syncCmd.Flags().StringVar(&downVersions, "down", "", "Down migrations to run, comma separated. Must provide all down migrations to run")

cmd.AddCommand(syncCmd)
}

func registerAsyncMigrate(cmd *cobra.Command) {

var upVersions string
var downVersions string

asyncCmd := &cobra.Command{
Use: "async",
Short: "Run migrations in async mode",
Expand All @@ -125,6 +162,32 @@ func registerAsyncMigrate(cmd *cobra.Command) {
replicationEnabled := strings.ToLower(cmd.Flags().Lookup("replication").Value.String()) == "true"
clusterName := cmd.Flags().Lookup("cluster-name").Value.String()

upVersions := []uint64{}
for _, version := range strings.Split(cmd.Flags().Lookup("up").Value.String(), ",") {
if version == "" {
continue
}
v, err := strconv.ParseUint(version, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse version: %w", err)
}
upVersions = append(upVersions, v)
}
downVersions := []uint64{}
for _, version := range strings.Split(cmd.Flags().Lookup("down").Value.String(), ",") {
if version == "" {
continue
}
v, err := strconv.ParseUint(version, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse version: %w", err)
}
downVersions = append(downVersions, v)
}
if len(upVersions) != 0 && len(downVersions) != 0 {
return fmt.Errorf("cannot provide both up and down migrations")
}

opts, err := clickhouse.ParseDSN(dsn)
if err != nil {
return fmt.Errorf("failed to parse dsn: %w", err)
Expand All @@ -143,9 +206,15 @@ func registerAsyncMigrate(cmd *cobra.Command) {
return fmt.Errorf("failed to create migration manager: %w", err)
}

return manager.MigrateUpAsync(context.Background())
if len(downVersions) != 0 {
return manager.MigrateDownAsync(context.Background(), downVersions)
}
return manager.MigrateUpAsync(context.Background(), upVersions)
},
}

asyncCmd.Flags().StringVar(&upVersions, "up", "", "Up migrations to run, comma separated. Leave empty to run all up migrations")
asyncCmd.Flags().StringVar(&downVersions, "down", "", "Down migrations to run, comma separated. Must provide all down migrations to run")

cmd.AddCommand(asyncCmd)
}
Loading

0 comments on commit 9e95531

Please sign in to comment.