diff --git a/quanta-admin/drop.go b/quanta-admin/drop.go index cf44ce8..fbd18b3 100644 --- a/quanta-admin/drop.go +++ b/quanta-admin/drop.go @@ -5,6 +5,7 @@ import ( "github.com/disney/quanta/shared" "github.com/hashicorp/consul/api" "log" + "time" ) // DropCmd - Drop command @@ -22,24 +23,43 @@ func (c *DropCmd) Run(ctx *Context) error { return fmt.Errorf("Error connecting to consul %v", err) } + if ctx.Debug { + fmt.Println("Checking for child dependencies.") + } if err = checkForChildDependencies(consulClient, c.Table, "drop"); err != nil { return err } + if ctx.Debug { + fmt.Println("Acquiring distributed lock.") + } lock, errx := shared.Lock(consulClient, "admin-tool", "admin-tool") if errx != nil { return errx } defer shared.Unlock(consulClient, lock) - err = nukeData(consulClient, ctx.Port, c.Table, "drop", false) - if err != nil { - return err + + if ctx.Debug { + fmt.Println("Calling DeleteTable to remove table from consul and notify upstream clients.") } + err = shared.DeleteTable(consulClient, c.Table) if err != nil { return fmt.Errorf("DeleteTable error %v", err) } + // Give consumers time to flush and restart. + time.Sleep(time.Second * 5) + + if ctx.Debug { + fmt.Println("Calling nukeData.") + } + + err = nukeData(consulClient, ctx.Port, c.Table, "drop", false) + if err != nil { + return err + } + fmt.Printf("Successfully dropped table %s\n", c.Table) return nil } diff --git a/quanta-admin/main.go b/quanta-admin/main.go index 1334025..c86ef89 100644 --- a/quanta-admin/main.go +++ b/quanta-admin/main.go @@ -19,6 +19,7 @@ var ( type Context struct { ConsulAddr string `help:"Consul agent address/port." default:"127.0.0.1:8500"` Port int `help:"Port number for Quanta service." default:"4000"` + Debug bool `help:"Print Debug messages."` } // VersionCmd - Version command @@ -28,6 +29,7 @@ type VersionCmd struct { var cli struct { ConsulAddr string `default:"127.0.0.1:8500"` Port int `default:"4000"` + Debug bool `default:"false"` Create CreateCmd `cmd help:"Create table."` Drop DropCmd `cmd help:"Drop table."` Truncate TruncateCmd `cmd help:"Truncate table."` @@ -45,7 +47,7 @@ var cli struct { func main() { ctx := kong.Parse(&cli) - err := ctx.Run(&Context{ConsulAddr: cli.ConsulAddr, Port: cli.Port}) + err := ctx.Run(&Context{ConsulAddr: cli.ConsulAddr, Port: cli.Port, Debug: cli.Debug}) ctx.FatalIfErrorf(err) } diff --git a/quanta-proxy/quanta-proxy.go b/quanta-proxy/quanta-proxy.go index d101068..916fbb4 100644 --- a/quanta-proxy/quanta-proxy.go +++ b/quanta-proxy/quanta-proxy.go @@ -234,25 +234,20 @@ func schemaChangeListener(e shared.SchemaChangeEvent) { core.ClearTableCache() src.GetSessionPool().Recover(nil) - src.GetSessionPool().Lock() - defer src.GetSessionPool().Unlock() switch e.Event { case shared.Drop: - schema.DefaultRegistry().SchemaDrop("quanta", e.Table, lex.TokenTable) log.Printf("Dropped table %s", e.Table) + src.GetSessionPool().Lock() + defer src.GetSessionPool().Unlock() + schema.DefaultRegistry().SchemaDrop("quanta", e.Table, lex.TokenTable) case shared.Modify: log.Printf("Truncated table %s", e.Table) + src.GetSessionPool().Lock() + defer src.GetSessionPool().Unlock() time.Sleep(time.Second * 5) schema.DefaultRegistry().SchemaRefresh("quanta") case shared.Create: - schema.DefaultRegistry().SchemaDrop("quanta", "quanta", lex.TokenSource) - var err error - src, err = source.NewQuantaSource("", consulAddr, *quantaPort, sessionPoolSize) - if err != nil { - u.Error(err) - } - schema.RegisterSourceAsSchema("quanta", src) - //schema.DefaultRegistry().SchemaRefresh("quanta") + schema.DefaultRegistry().SchemaRefresh("quanta") log.Printf("Created table %s", e.Table) } } diff --git a/server/bitmap.go b/server/bitmap.go index ae9f0e2..2cda84a 100644 --- a/server/bitmap.go +++ b/server/bitmap.go @@ -675,23 +675,22 @@ func (m *BitmapIndex) updateBSICache(f *BitmapFragment) { // Truncate - Truncate the in-memory data cache for a given index func (m *BitmapIndex) Truncate(index string) { - m.bitmapCacheLock.Lock() - m.bsiCacheLock.Lock() - defer m.bitmapCacheLock.Unlock() - defer m.bsiCacheLock.Unlock() - fm := m.bitmapCache[index] for _, rm := range fm { for _, tm := range rm { for ts := range tm { + m.bitmapCacheLock.Lock() delete(tm, ts) + m.bitmapCacheLock.Unlock() } } } bm := m.bsiCache[index] for _, tm := range bm { for ts := range tm { + m.bsiCacheLock.Lock() delete(tm, ts) + m.bsiCacheLock.Unlock() } } }