Skip to content

Commit

Permalink
Merge pull request #145 from disney/tpch-support
Browse files Browse the repository at this point in the history
Tpch support to develop before query processing changes
  • Loading branch information
guymolinari authored Aug 4, 2024
2 parents 4d11b2d + 3c9b1c5 commit 48e8d73
Show file tree
Hide file tree
Showing 58 changed files with 2,898 additions and 1,159 deletions.
4 changes: 0 additions & 4 deletions Docker/kinesis_entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,4 @@ if [ -n "$SCAN_INTERVAL" ]
then
BOOL_FLAGS=${BOOL_FLAGS}" --scan-interval="${SCAN_INTERVAL}""
fi
if [ -n "$COMMIT_INTERVAL" ]
then
BOOL_FLAGS=${BOOL_FLAGS}" --commit-interval="${COMMIT_INTERVAL}""
fi
exec /usr/bin/quanta-kinesis-consumer ${STREAM} ${SCHEMA} ${SHARD_KEY} ${REGION} ${BOOL_FLAGS}
119 changes: 92 additions & 27 deletions core/builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ func NewStringHashBSIMapper(conf map[string]string) (Mapper, error) {
}

// MapValue - Map a string value to an int64
func (m StringHashBSIMapper) MapValue(attr *Attribute, val interface{}, c *Session) (result uint64, err error) {
func (m StringHashBSIMapper) MapValue(attr *Attribute, val interface{}, c *Session,
isUpdate bool) (result uint64, err error) {

var strVal string
switch val.(type) {
Expand Down Expand Up @@ -58,6 +59,15 @@ func (m StringHashBSIMapper) MapValue(attr *Attribute, val interface{}, c *Sessi
strVal = string(b)
val = strVal
result = Get64BitHash(strVal)
case nil:
if c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, nil, false)
if tbuf, ok := c.TableBuffers[attr.Parent.Name]; ok {
stringPath := indexPath(tbuf, attr.FieldName, "strings") // indexPath() is in core/session.go
c.BatchBuffer.SetPartitionedString(stringPath, tbuf.CurrentColumnID, strVal)
}
}
return
default:
err = fmt.Errorf("StringHashBSIMapper not expecting a '%T' for '%s'", val, attr.FieldName)
return
Expand All @@ -69,8 +79,8 @@ func (m StringHashBSIMapper) MapValue(attr *Attribute, val interface{}, c *Sessi
err = c.StringIndex.Index(strVal)
}
stringPath := indexPath(tbuf, attr.FieldName, "strings") // indexPath() is in core/session.go
c.BatchBuffer.SetPartitionedString(stringPath, tbuf.CurrentColumnID, val)
err = m.UpdateBitmap(c, attr.Parent.Name, attr.FieldName, result, attr.IsTimeSeries)
c.BatchBuffer.SetPartitionedString(stringPath, tbuf.CurrentColumnID, strVal)
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, result, false)
} else {
err = fmt.Errorf("table %s not open for this connection", attr.Parent.Name)
}
Expand All @@ -90,7 +100,7 @@ func NewBoolDirectMapper(conf map[string]string) (Mapper, error) {

// MapValue - Map boolean values true/false to rowid = 0 false rowid = 1 true
func (m BoolDirectMapper) MapValue(attr *Attribute, val interface{},
c *Session) (result uint64, err error) {
c *Session, isUpdate bool) (result uint64, err error) {

result = uint64(0)
switch val.(type) {
Expand Down Expand Up @@ -125,12 +135,17 @@ func (m BoolDirectMapper) MapValue(attr *Attribute, val interface{},
if val.(int64) == 1 {
result = uint64(1)
}
case nil:
if c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, nil, false)
return
}
default:
err = fmt.Errorf("%v: No handling for type '%T'", val, val)
return
}
if c != nil {
err = m.UpdateBitmap(c, attr.Parent.Name, attr.FieldName, result, attr.IsTimeSeries)
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, result, isUpdate)
}
return
}
Expand All @@ -156,7 +171,7 @@ func NewIntDirectMapper(conf map[string]string) (Mapper, error) {

// MapValue - Map a value to a row ID.
func (m IntDirectMapper) MapValue(attr *Attribute, val interface{},
c *Session) (result uint64, err error) {
c *Session, isUpdate bool) (result uint64, err error) {

switch val.(type) {
case uint64:
Expand Down Expand Up @@ -185,11 +200,17 @@ func (m IntDirectMapper) MapValue(attr *Attribute, val interface{},
return
}
result = uint64(v)
case nil:
if c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, nil, isUpdate)
return
}
default:
err = fmt.Errorf("%v: No handling for type '%T'", val, val)
return
}
if c != nil {
err = m.UpdateBitmap(c, attr.Parent.Name, attr.FieldName, result, attr.IsTimeSeries)
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, result, isUpdate)
}
return
}
Expand All @@ -212,18 +233,22 @@ func NewStringToIntDirectMapper(conf map[string]string) (Mapper, error) {

// MapValue - Map a value to a row ID.
func (m StringToIntDirectMapper) MapValue(attr *Attribute, val interface{},
c *Session) (result uint64, err error) {
c *Session, isUpdate bool) (result uint64, err error) {

if val == nil && c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, nil, isUpdate)
}

var v int64
v, err = strconv.ParseInt(strings.TrimSpace(val.(string)), 10, 64)
if err == nil && c != nil {
err = m.UpdateBitmap(c, attr.Parent.Name, attr.FieldName, result, attr.IsTimeSeries)
}
if v <= 0 {
err = fmt.Errorf("cannot map %d as a positive non-zero value", v)
return
}
result = uint64(v)
if err == nil && c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, result, isUpdate)
}
return
}

Expand All @@ -239,7 +264,7 @@ func NewFloatScaleBSIMapper(conf map[string]string) (Mapper, error) {

// MapValue - Map a value to an int64.
func (m FloatScaleBSIMapper) MapValue(attr *Attribute, val interface{},
c *Session) (result uint64, err error) {
c *Session, isUpdate bool) (result uint64, err error) {

var floatVal float64
switch val.(type) {
Expand All @@ -260,6 +285,11 @@ func (m FloatScaleBSIMapper) MapValue(attr *Attribute, val interface{},
if err != nil {
return
}
case nil:
if c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, nil, false)
}
return
default:
err = fmt.Errorf("type passed for '%s' is of type '%T' which in unsupported", attr.FieldName, val)
return
Expand All @@ -286,7 +316,7 @@ func (m FloatScaleBSIMapper) MapValue(attr *Attribute, val interface{},
result = uint64(0)
}
if c != nil {
err = m.UpdateBitmap(c, attr.Parent.Name, attr.FieldName, result, attr.IsTimeSeries)
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, result, false)
}
return
}
Expand All @@ -303,7 +333,7 @@ func NewIntBSIMapper(conf map[string]string) (Mapper, error) {

// MapValue - Map a value to an int64.
func (m IntBSIMapper) MapValue(attr *Attribute, val interface{},
c *Session) (result uint64, err error) {
c *Session, isUpdate bool) (result uint64, err error) {

switch val.(type) {
case int64:
Expand All @@ -330,11 +360,16 @@ func (m IntBSIMapper) MapValue(attr *Attribute, val interface{},
return
}
result = uint64(v)
case nil:
if c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, nil, false)
}
return
default:
err = fmt.Errorf("%s: No handling for type '%T'", m.String(), val)
}
if c != nil {
err = m.UpdateBitmap(c, attr.Parent.Name, attr.FieldName, result, attr.IsTimeSeries)
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, result, false)
}
return
}
Expand All @@ -358,7 +393,7 @@ func NewStringEnumMapper(conf map[string]string) (Mapper, error) {

// MapValue - Map a value to a row id.
func (m StringEnumMapper) MapValue(attr *Attribute, val interface{},
c *Session) (result uint64, err error) {
c *Session, isUpdate bool) (result uint64, err error) {

var multi []string
switch val.(type) {
Expand All @@ -380,6 +415,11 @@ func (m StringEnumMapper) MapValue(attr *Attribute, val interface{},
case int64:
strVal := fmt.Sprintf("%d", val.(int64))
multi = []string{strVal}
case nil:
if c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, nil, isUpdate)
}
return
default:
return 0, fmt.Errorf("cannot cast '%s' from '%T' to a string", attr.FieldName, val)
}
Expand All @@ -393,7 +433,7 @@ func (m StringEnumMapper) MapValue(attr *Attribute, val interface{},
if result, err = attr.GetValue(val); err != nil {
return
}
if err = m.UpdateBitmap(c, attr.Parent.Name, attr.FieldName, result, attr.IsTimeSeries); err != nil {
if err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, result, isUpdate); err != nil {
return
}
}
Expand Down Expand Up @@ -437,20 +477,25 @@ func NewBoolRegexMapper(conf map[string]string) (Mapper, error) {

// MapValue - Map a value to a row id.
func (m BoolRegexMapper) MapValue(attr *Attribute, val interface{},
c *Session) (result uint64, err error) {
c *Session, isUpdate bool) (result uint64, err error) {

switch val.(type) {
case bool:
result = uint64(0)
if val.(bool) {
result = uint64(1)
}
case nil:
if c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, nil, isUpdate)
}
return
default:
return 0, fmt.Errorf("cannot cast '%s' from '%T' to a string", attr.FieldName, val)
}

if c != nil && err == nil {
err = m.UpdateBitmap(c, attr.Parent.Name, attr.FieldName, result, attr.IsTimeSeries)
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, result, isUpdate)
}
return
}
Expand Down Expand Up @@ -482,7 +527,7 @@ func NewSysMillisBSIMapper(conf map[string]string) (Mapper, error) {

// MapValue - Maps a value to an int64
func (m SysMillisBSIMapper) MapValue(attr *Attribute, val interface{},
c *Session) (result uint64, err error) {
c *Session, isUpdate bool) (result uint64, err error) {

switch val.(type) {
case string:
Expand All @@ -509,11 +554,16 @@ func (m SysMillisBSIMapper) MapValue(attr *Attribute, val interface{},
result = uint64(val.(int64))
case float64:
result = uint64(val.(float64))
case nil:
if c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, nil, false)
}
return
default:
err = fmt.Errorf("%s: No handling for type '%T'", m.String(), val)
}
if c != nil && err == nil {
err = m.UpdateBitmap(c, attr.Parent.Name, attr.FieldName, result, attr.IsTimeSeries)
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, result, false)
}
return
}
Expand All @@ -530,7 +580,7 @@ func NewSysMicroBSIMapper(conf map[string]string) (Mapper, error) {

// MapValue - Maps a value to an int64.
func (m SysMicroBSIMapper) MapValue(attr *Attribute, val interface{},
c *Session) (result uint64, err error) {
c *Session, isUpdate bool) (result uint64, err error) {

switch val.(type) {
case string:
Expand All @@ -557,11 +607,16 @@ func (m SysMicroBSIMapper) MapValue(attr *Attribute, val interface{},
result = uint64(val.(int64))
case float64:
result = uint64(val.(float64))
case nil:
if c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, nil, false)
}
return
default:
err = fmt.Errorf("%s: No handling for type '%T'", m.String(), val)
}
if c != nil && err == nil {
err = m.UpdateBitmap(c, attr.Parent.Name, attr.FieldName, result, attr.IsTimeSeries)
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, result, false)
}
return
}
Expand All @@ -578,7 +633,7 @@ func NewSysSecBSIMapper(conf map[string]string) (Mapper, error) {

// MapValue - Maps a value to an int64.
func (m SysSecBSIMapper) MapValue(attr *Attribute, val interface{},
c *Session) (result uint64, err error) {
c *Session, isUpdate bool) (result uint64, err error) {

switch val.(type) {
case string:
Expand All @@ -601,11 +656,16 @@ func (m SysSecBSIMapper) MapValue(attr *Attribute, val interface{},
result = uint64(val.(int64))
case int32:
result = uint64(val.(int32))
case nil:
if c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, nil, false)
}
return
default:
err = fmt.Errorf("%s: No handling for type '%T'", m.String(), val)
}
if c != nil && err == nil {
err = m.UpdateBitmap(c, attr.Parent.Name, attr.FieldName, result, attr.IsTimeSeries)
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, result, false)
}
return
}
Expand All @@ -622,7 +682,7 @@ func NewIntToBoolDirectMapper(conf map[string]string) (Mapper, error) {

// MapValue - Map a value to a row id.
func (m IntToBoolDirectMapper) MapValue(attr *Attribute, val interface{},
c *Session) (result uint64, err error) {
c *Session, isUpdate bool) (result uint64, err error) {

switch val.(type) {
case int:
Expand All @@ -640,12 +700,17 @@ func (m IntToBoolDirectMapper) MapValue(attr *Attribute, val interface{},
if val.(string) == "true" {
result = uint64(1)
}
case nil:
if c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, nil, isUpdate)
}
return
default:
return 0, fmt.Errorf("cannot cast '%s' from '%T' to a boolean", attr.FieldName, val)
}

if c != nil && err == nil {
err = m.UpdateBitmap(c, attr.Parent.Name, attr.FieldName, result, attr.IsTimeSeries)
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, result, isUpdate)
}
return
}
Expand Down
Loading

0 comments on commit 48e8d73

Please sign in to comment.