Skip to content

Commit

Permalink
Resolve server side deadlock during table operations.
Browse files Browse the repository at this point in the history
  • Loading branch information
gamolina committed Apr 1, 2023
1 parent 397b2f7 commit b9c03de
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 20 deletions.
26 changes: 23 additions & 3 deletions quanta-admin/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/disney/quanta/shared"
"github.com/hashicorp/consul/api"
"log"
"time"
)

// DropCmd - Drop command
Expand All @@ -22,24 +23,43 @@ func (c *DropCmd) Run(ctx *Context) error {
return fmt.Errorf("Error connecting to consul %v", err)
}

if ctx.Debug {
fmt.Println("Checking for child dependencies.")
}
if err = checkForChildDependencies(consulClient, c.Table, "drop"); err != nil {
return err
}

if ctx.Debug {
fmt.Println("Acquiring distributed lock.")
}
lock, errx := shared.Lock(consulClient, "admin-tool", "admin-tool")
if errx != nil {
return errx
}
defer shared.Unlock(consulClient, lock)
err = nukeData(consulClient, ctx.Port, c.Table, "drop", false)
if err != nil {
return err

if ctx.Debug {
fmt.Println("Calling DeleteTable to remove table from consul and notify upstream clients.")
}

err = shared.DeleteTable(consulClient, c.Table)
if err != nil {
return fmt.Errorf("DeleteTable error %v", err)
}

// Give consumers time to flush and restart.
time.Sleep(time.Second * 5)

if ctx.Debug {
fmt.Println("Calling nukeData.")
}

err = nukeData(consulClient, ctx.Port, c.Table, "drop", false)
if err != nil {
return err
}

fmt.Printf("Successfully dropped table %s\n", c.Table)
return nil
}
Expand Down
4 changes: 3 additions & 1 deletion quanta-admin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var (
type Context struct {
ConsulAddr string `help:"Consul agent address/port." default:"127.0.0.1:8500"`
Port int `help:"Port number for Quanta service." default:"4000"`
Debug bool `help:"Print Debug messages."`
}

// VersionCmd - Version command
Expand All @@ -28,6 +29,7 @@ type VersionCmd struct {
var cli struct {
ConsulAddr string `default:"127.0.0.1:8500"`
Port int `default:"4000"`
Debug bool `default:"false"`
Create CreateCmd `cmd help:"Create table."`
Drop DropCmd `cmd help:"Drop table."`
Truncate TruncateCmd `cmd help:"Truncate table."`
Expand All @@ -45,7 +47,7 @@ var cli struct {
func main() {

ctx := kong.Parse(&cli)
err := ctx.Run(&Context{ConsulAddr: cli.ConsulAddr, Port: cli.Port})
err := ctx.Run(&Context{ConsulAddr: cli.ConsulAddr, Port: cli.Port, Debug: cli.Debug})
ctx.FatalIfErrorf(err)
}

Expand Down
17 changes: 6 additions & 11 deletions quanta-proxy/quanta-proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,25 +234,20 @@ func schemaChangeListener(e shared.SchemaChangeEvent) {

core.ClearTableCache()
src.GetSessionPool().Recover(nil)
src.GetSessionPool().Lock()
defer src.GetSessionPool().Unlock()
switch e.Event {
case shared.Drop:
schema.DefaultRegistry().SchemaDrop("quanta", e.Table, lex.TokenTable)
log.Printf("Dropped table %s", e.Table)
src.GetSessionPool().Lock()
defer src.GetSessionPool().Unlock()
schema.DefaultRegistry().SchemaDrop("quanta", e.Table, lex.TokenTable)
case shared.Modify:
log.Printf("Truncated table %s", e.Table)
src.GetSessionPool().Lock()
defer src.GetSessionPool().Unlock()
time.Sleep(time.Second * 5)
schema.DefaultRegistry().SchemaRefresh("quanta")
case shared.Create:
schema.DefaultRegistry().SchemaDrop("quanta", "quanta", lex.TokenSource)
var err error
src, err = source.NewQuantaSource("", consulAddr, *quantaPort, sessionPoolSize)
if err != nil {
u.Error(err)
}
schema.RegisterSourceAsSchema("quanta", src)
//schema.DefaultRegistry().SchemaRefresh("quanta")
schema.DefaultRegistry().SchemaRefresh("quanta")
log.Printf("Created table %s", e.Table)
}
}
Expand Down
9 changes: 4 additions & 5 deletions server/bitmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,23 +675,22 @@ func (m *BitmapIndex) updateBSICache(f *BitmapFragment) {
// Truncate - Truncate the in-memory data cache for a given index
func (m *BitmapIndex) Truncate(index string) {

m.bitmapCacheLock.Lock()
m.bsiCacheLock.Lock()
defer m.bitmapCacheLock.Unlock()
defer m.bsiCacheLock.Unlock()

fm := m.bitmapCache[index]
for _, rm := range fm {
for _, tm := range rm {
for ts := range tm {
m.bitmapCacheLock.Lock()
delete(tm, ts)
m.bitmapCacheLock.Unlock()
}
}
}
bm := m.bsiCache[index]
for _, tm := range bm {
for ts := range tm {
m.bsiCacheLock.Lock()
delete(tm, ts)
m.bsiCacheLock.Unlock()
}
}
}
Expand Down

0 comments on commit b9c03de

Please sign in to comment.