From 8ee2c1d25a8f035b5c3eb22b6cb1f821e94404a8 Mon Sep 17 00:00:00 2001 From: "GUY.MOLINARI" Date: Tue, 28 Mar 2023 18:17:30 +0000 Subject: [PATCH] One step closer to implement in-place trucate so that component restarts are unnecessary. --- core/session_pool.go | 10 ++++++++++ core/table.go | 2 ++ quanta-admin/truncate.go | 4 ++++ quanta-kinesis-consumer/quanta-kinesis-consumer.go | 3 +-- quanta-proxy/quanta-proxy.go | 11 ++++------- 5 files changed, 21 insertions(+), 9 deletions(-) diff --git a/core/session_pool.go b/core/session_pool.go index 732e593..cbfeb8b 100644 --- a/core/session_pool.go +++ b/core/session_pool.go @@ -169,6 +169,16 @@ func (m *SessionPool) Recover(unflushedCh chan *shared.BatchBuffer) { } } +// Lock - Prevent operations while a table maintenance event is in progress +func (m *SessionPool) Lock() { + m.sessPoolLock.Lock() +} + +// Unlock - Allow operations after a table maintenance event is complete +func (m *SessionPool) Unlock() { + m.sessPoolLock.Unlock() +} + // Metrics - Return pool size and usage. func (m *SessionPool) Metrics() (poolSize, inUse, pooled, maxUsed int) { diff --git a/core/table.go b/core/table.go index 849f1cd..2544dfd 100644 --- a/core/table.go +++ b/core/table.go @@ -52,9 +52,11 @@ func LoadTable(path string, kvStore *shared.KVStore, name string, consulClient * defer tableCacheLock.Unlock() if t, ok := tableCache[name]; ok { t.kvStore = kvStore + u.Debugf("Found table %s in cache.", name) return t, nil } + u.Debugf("Loading table %s.", name) sch, err := shared.LoadSchema(path, name, consulClient) if err != nil { return nil, err diff --git a/quanta-admin/truncate.go b/quanta-admin/truncate.go index a8392a7..9b73533 100644 --- a/quanta-admin/truncate.go +++ b/quanta-admin/truncate.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/disney/quanta/shared" "github.com/hashicorp/consul/api" + "time" ) // TruncateCmd - Truncate command @@ -38,6 +39,9 @@ func (c *TruncateCmd) Run(ctx *Context) error { return fmt.Errorf("updateModTimeForTable error %v", err) } + // Give consumers time to flush and restart. + time.Sleep(time.Second * 5) + err = nukeData(consulClient, ctx.Port, c.Table, "truncate", c.DropEnums) if err != nil { return err diff --git a/quanta-kinesis-consumer/quanta-kinesis-consumer.go b/quanta-kinesis-consumer/quanta-kinesis-consumer.go index 96f2ed3..64d3e62 100644 --- a/quanta-kinesis-consumer/quanta-kinesis-consumer.go +++ b/quanta-kinesis-consumer/quanta-kinesis-consumer.go @@ -271,7 +271,6 @@ func main() { for { ctx, main.cancelFunc = context.WithCancel(context.Background()) scanErr := main.consumer.Scan(ctx, main.scanAndProcess) - time.Sleep(time.Second * 5) main.destroy() if err := main.eg.Wait(); err != nil { u.Errorf("session error: %v", err) @@ -301,6 +300,7 @@ func (m *Main) destroy() { for _, v := range m.shardChannels { close(v) } + time.Sleep(time.Second * 5) // Allow time for completion } func (m *Main) scanAndProcess(v *consumer.Record) error { @@ -422,7 +422,6 @@ func (m *Main) recoverInflight(recoverFunc func(unflushedCh chan *shared.BatchBu func (m *Main) schemaChangeListener(e shared.SchemaChangeEvent) { - time.Sleep(time.Second * 2) if m.cancelFunc != nil { m.cancelFunc() } diff --git a/quanta-proxy/quanta-proxy.go b/quanta-proxy/quanta-proxy.go index 857b2ce..d101068 100644 --- a/quanta-proxy/quanta-proxy.go +++ b/quanta-proxy/quanta-proxy.go @@ -234,19 +234,16 @@ func schemaChangeListener(e shared.SchemaChangeEvent) { core.ClearTableCache() src.GetSessionPool().Recover(nil) + src.GetSessionPool().Lock() + defer src.GetSessionPool().Unlock() switch e.Event { case shared.Drop: schema.DefaultRegistry().SchemaDrop("quanta", e.Table, lex.TokenTable) log.Printf("Dropped table %s", e.Table) case shared.Modify: log.Printf("Truncated table %s", e.Table) - schema.DefaultRegistry().SchemaDrop("quanta", e.Table, lex.TokenTable) - var err error - src, err = source.NewQuantaSource("", consulAddr, *quantaPort, sessionPoolSize) - if err != nil { - u.Error(err) - } - schema.RegisterSourceAsSchema("quanta", src) + time.Sleep(time.Second * 5) + schema.DefaultRegistry().SchemaRefresh("quanta") case shared.Create: schema.DefaultRegistry().SchemaDrop("quanta", "quanta", lex.TokenSource) var err error