Skip to content

Commit

Permalink
One step closer to implement in-place trucate so that component resta…
Browse files Browse the repository at this point in the history
…rts are unnecessary.
  • Loading branch information
gamolina committed Mar 28, 2023
1 parent 398830e commit 8ee2c1d
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 9 deletions.
10 changes: 10 additions & 0 deletions core/session_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,16 @@ func (m *SessionPool) Recover(unflushedCh chan *shared.BatchBuffer) {
}
}

// Lock - Prevent operations while a table maintenance event is in progress
func (m *SessionPool) Lock() {
m.sessPoolLock.Lock()
}

// Unlock - Allow operations after a table maintenance event is complete
func (m *SessionPool) Unlock() {
m.sessPoolLock.Unlock()
}

// Metrics - Return pool size and usage.
func (m *SessionPool) Metrics() (poolSize, inUse, pooled, maxUsed int) {

Expand Down
2 changes: 2 additions & 0 deletions core/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ func LoadTable(path string, kvStore *shared.KVStore, name string, consulClient *
defer tableCacheLock.Unlock()
if t, ok := tableCache[name]; ok {
t.kvStore = kvStore
u.Debugf("Found table %s in cache.", name)
return t, nil
}

u.Debugf("Loading table %s.", name)
sch, err := shared.LoadSchema(path, name, consulClient)
if err != nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions quanta-admin/truncate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"github.com/disney/quanta/shared"
"github.com/hashicorp/consul/api"
"time"
)

// TruncateCmd - Truncate command
Expand Down Expand Up @@ -38,6 +39,9 @@ func (c *TruncateCmd) Run(ctx *Context) error {
return fmt.Errorf("updateModTimeForTable error %v", err)
}

// Give consumers time to flush and restart.
time.Sleep(time.Second * 5)

err = nukeData(consulClient, ctx.Port, c.Table, "truncate", c.DropEnums)
if err != nil {
return err
Expand Down
3 changes: 1 addition & 2 deletions quanta-kinesis-consumer/quanta-kinesis-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,6 @@ func main() {
for {
ctx, main.cancelFunc = context.WithCancel(context.Background())
scanErr := main.consumer.Scan(ctx, main.scanAndProcess)
time.Sleep(time.Second * 5)
main.destroy()
if err := main.eg.Wait(); err != nil {
u.Errorf("session error: %v", err)
Expand Down Expand Up @@ -301,6 +300,7 @@ func (m *Main) destroy() {
for _, v := range m.shardChannels {
close(v)
}
time.Sleep(time.Second * 5) // Allow time for completion
}

func (m *Main) scanAndProcess(v *consumer.Record) error {
Expand Down Expand Up @@ -422,7 +422,6 @@ func (m *Main) recoverInflight(recoverFunc func(unflushedCh chan *shared.BatchBu

func (m *Main) schemaChangeListener(e shared.SchemaChangeEvent) {

time.Sleep(time.Second * 2)
if m.cancelFunc != nil {
m.cancelFunc()
}
Expand Down
11 changes: 4 additions & 7 deletions quanta-proxy/quanta-proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,19 +234,16 @@ func schemaChangeListener(e shared.SchemaChangeEvent) {

core.ClearTableCache()
src.GetSessionPool().Recover(nil)
src.GetSessionPool().Lock()
defer src.GetSessionPool().Unlock()
switch e.Event {
case shared.Drop:
schema.DefaultRegistry().SchemaDrop("quanta", e.Table, lex.TokenTable)
log.Printf("Dropped table %s", e.Table)
case shared.Modify:
log.Printf("Truncated table %s", e.Table)
schema.DefaultRegistry().SchemaDrop("quanta", e.Table, lex.TokenTable)
var err error
src, err = source.NewQuantaSource("", consulAddr, *quantaPort, sessionPoolSize)
if err != nil {
u.Error(err)
}
schema.RegisterSourceAsSchema("quanta", src)
time.Sleep(time.Second * 5)
schema.DefaultRegistry().SchemaRefresh("quanta")
case shared.Create:
schema.DefaultRegistry().SchemaDrop("quanta", "quanta", lex.TokenSource)
var err error
Expand Down

0 comments on commit 8ee2c1d

Please sign in to comment.