Skip to content

Commit

Permalink
Consumers should exit if their table is dropped. They shouldn't start…
Browse files Browse the repository at this point in the history
… if the table doesn't exist.
  • Loading branch information
gamolina committed Mar 31, 2023
1 parent 8ee2c1d commit 397b2f7
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 1 deletion.
4 changes: 4 additions & 0 deletions quanta-kinesis-consumer/quanta-kinesis-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,10 @@ func (m *Main) schemaChangeListener(e shared.SchemaChangeEvent) {
case shared.Drop:
//m.sessionPool.Recover(nil)
u.Warnf("Dropped table %s", e.Table)
if e.Table == m.Index {
u.Errorf("The table [%s], for this consumer was dropped, exiting", e.Table)
os.Exit(1)
}
case shared.Modify:
u.Warnf("Truncated table %s", e.Table)
case shared.Create:
Expand Down
4 changes: 4 additions & 0 deletions shared/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ func putRecursive(typ reflect.Type, value reflect.Value, consul *api.Client, roo
func UnmarshalConsul(consul *api.Client, name string) (BasicTable, error) {

table := BasicTable{Name: name}
keys, _, _ := consul.KV().Keys("schema/" + name, "", nil)
if len(keys) == 0 {
return table, fmt.Errorf("Table %s not found.", name)
}
ps := reflect.ValueOf(&table)
err := getRecursive(reflect.TypeOf(table), ps.Elem(), consul, "schema/"+name)
return table, err
Expand Down
2 changes: 1 addition & 1 deletion version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
quanta 0.9.10-rc-14
quanta 0.9.10-rc-15

0 comments on commit 397b2f7

Please sign in to comment.