Skip to content

Commit

Permalink
Fix bugs in joins.
Browse files Browse the repository at this point in the history
  • Loading branch information
gamolina committed May 25, 2023
1 parent 9671022 commit 8691480
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 28 deletions.
26 changes: 12 additions & 14 deletions core/projector.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,6 @@ func NewProjection(s *Session, foundSets map[string]*roaring64.Bitmap, joinNames
return nil, err
}
p.fkBSI = bsir[p.driverTable]
for k, v := range p.fkBSI {
u.Debugf("RESULTS FK BSI %v = %d ", k, v.GetCardinality())
}
}
p.projFieldMap = projFieldMap

Expand All @@ -131,6 +128,12 @@ func NewProjection(s *Session, foundSets map[string]*roaring64.Bitmap, joinNames
u.Debugf("INNER JOIN = %v", innerJoin)

driverSet := p.foundSets[p.driverTable]
// For inner joins filter out any rows in the driver table not in fkBSI link
if innerJoin && !negate {
for _, v := range p.fkBSI {
driverSet.And(v.GetExistenceBitmap())
}
}
// filter out entries from driver found set not contained within FKBSIs
for k, v := range p.foundSets {
if !innerJoin {
Expand All @@ -152,14 +155,14 @@ func NewProjection(s *Session, foundSets map[string]*roaring64.Bitmap, joinNames

u.Debugf("FKBSI %v = %d", k, fkBsi.GetCardinality())
newSet := fkBsi.Transpose()
driverSet = v

filterSet := v.Clone()
// Anti-join
if negate {
driverSet.AndNot(newSet)
filterSet.AndNot(newSet)
} else {
driverSet.And(newSet)
filterSet.And(newSet)
}
p.foundSets[k] = filterSet
}

p.resultIterator = driverSet.ManyIterator()
Expand Down Expand Up @@ -219,9 +222,6 @@ func (p *Projector) retrieveBitmapResults(foundSets map[string]*roaring64.Bitmap
if err != nil {
return nil, nil, err
}
for k, v := range bsir {
u.Debugf("BSIR %v = %d", k, v.GetCardinality())
}
for field, r := range bitr {
if _, ok := bitmapResults[k]; !ok {
bitmapResults[k] = make(map[string]*BitmapFieldResults)
Expand Down Expand Up @@ -260,10 +260,7 @@ func (p *Projector) nextSets(columnIDs []uint64) (map[string]map[string]*roaring
rs := make(map[string]*roaring64.Bitmap)
driverSet := roaring64.BitmapOf(columnIDs...)
rs[p.driverTable] = driverSet
allAttr := make([]*Attribute, 0)
allAttr = append(allAttr, p.projAttributes...)
allAttr = append(allAttr, p.joinAttributes...)
bsir, bitr, err := p.retrieveBitmapResults(rs, allAttr, false)
bsir, bitr, err := p.retrieveBitmapResults(rs, p.projAttributes, false)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -444,6 +441,7 @@ func (p *Projector) transposeFKColumnIDs(fkBSI *roaring64.BSI, columnIDs []uint6
return
}


func (p *Projector) getRow(colID uint64, strMap map[string]map[interface{}]interface{},
bsiResults map[string]map[string]*roaring64.BSI,
bitmapResults map[string]map[string]*BitmapFieldResults) (row []driver.Value, err error) {
Expand Down
22 changes: 12 additions & 10 deletions core/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,8 @@ func (s *Session) recursivePutRow(name string, row interface{}, pqTablePath stri
if !s.Nested {
// Directly provided parent columnID
if v.Type == "Integer" && (!relBuf.HasPrimaryKey() || fkFieldSpec == "@rownum") {
//vals, _, err := s.readColumn(row, pqTablePath, &v, false, ignoreSourcePath, useNerdCapitalization)
vals, _, err := s.readColumn(row, pqTablePath, &v, false, true, false)
vals, _, err := s.readColumn(row, pqTablePath, &v, false, ignoreSourcePath, useNerdCapitalization)
//vals, _, err := s.readColumn(row, pqTablePath, &v, false, true, false)
if err != nil {
return err
}
Expand All @@ -332,7 +332,8 @@ func (s *Session) recursivePutRow(name string, row interface{}, pqTablePath stri
// return fmt.Errorf("Not a nested import, source must be specified for %s", v.FieldName)
//}

lookupKey, err := s.resolveFKLookupKey(&v, tbuf, row, ignoreSourcePath, useNerdCapitalization)
//lookupKey, err := s.resolveFKLookupKey(&v, tbuf, row, ignoreSourcePath, useNerdCapitalization)
lookupKey, err := s.resolveFKLookupKey(&v, tbuf, row, true, false)
if err != nil {
return fmt.Errorf("resolveFKLookupKey %v", err)
}
Expand All @@ -358,8 +359,7 @@ func (s *Session) recursivePutRow(name string, row interface{}, pqTablePath stri
}
}
} else {
//vals, pqps, err := s.readColumn(row, pqTablePath, &v, isChild, ignoreSourcePath, useNerdCapitalization)
vals, pqps, err := s.readColumn(row, pqTablePath, &v, isChild, true, false)
vals, pqps, err := s.readColumn(row, pqTablePath, &v, isChild, ignoreSourcePath, useNerdCapitalization)
if err != nil {
return fmt.Errorf("Parquet reader error - %v", err)
}
Expand All @@ -381,13 +381,14 @@ func (s *Session) readColumn(row interface{}, pqTablePath string, v *Attribute,
isChild, ignoreSourcePath, useNerdCapitalization bool) ([]interface{}, []string, error) {

// If we are ignoring source path and it is not defined then this must be a defaulted value
if !ignoreSourcePath && v.SourceName == "" {
if v.DefaultValue != "" {
//if !ignoreSourcePath && v.SourceName == "" {
if v.DefaultValue != "" {
//if v.DefaultValue != "" {
retVals := make([]interface{}, 0)
retVals = append(retVals, s.getDefaultValueForColumn(v, row, ignoreSourcePath, useNerdCapitalization))
pqColPaths := []string{""}
return retVals, pqColPaths, nil
}
//}
//return nil, nil, fmt.Errorf("readColumn: attribute sourceName is empty for %s", v.FieldName)
return nil, []string{""}, nil
}
Expand Down Expand Up @@ -623,7 +624,7 @@ func (s *Session) processPrimaryKey(tbuf *TableBuffer, row interface{}, pqTableP
if isChild { // Nothing to do here, no child value
return false, nil
}
return false, fmt.Errorf("empty or nil value for PK field %s, len %d", pqColPaths[i],
return false, fmt.Errorf("empty or nil value for PK field %s - %s, len %d", pk.FieldName, pqColPaths[i],
len(vals))
}
if len(vals) > 1 {
Expand Down Expand Up @@ -860,10 +861,11 @@ func (s *Session) resolveFKLookupKey(v *Attribute, tbuf *TableBuffer, row interf

var retVal strings.Builder
root := "/"
pqTablePath := fmt.Sprintf("%s%s", root, tbuf.Table.Name)
if r, ok := row.(*reader.ParquetReader); ok {
root = r.SchemaHandler.GetRootExName()
pqTablePath = fmt.Sprintf("%s.%s", root, tbuf.Table.Name)
}
pqTablePath := fmt.Sprintf("%s.%s", root, tbuf.Table.Name)
vals, _, err := s.readColumn(row, pqTablePath, v, false, ignoreSourcePath, useNerdCapitalization)
if err != nil {
return "", err
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.14

require (
github.com/Jeffail/tunny v0.0.0-20190930221602-f13eb662a36a
github.com/RoaringBitmap/roaring v0.9.4
github.com/RoaringBitmap/roaring v1.3.0
github.com/akrylysov/pogreb v0.9.1
github.com/alecthomas/kong v0.2.17
github.com/araddon/dateparse v0.0.0-20210207001429-0eec95c9db7e
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ github.com/Jeffail/tunny v0.0.0-20190930221602-f13eb662a36a/go.mod h1:BX3q3G70XX
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/RoaringBitmap/roaring v0.9.4 h1:ckvZSX5gwCRaJYBNe7syNawCU5oruY9gQmjXlp4riwo=
github.com/RoaringBitmap/roaring v0.9.4/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA=
github.com/RoaringBitmap/roaring v1.3.0 h1:aQmu9zQxDU0uhwR8SXOH/OrqEf+X8A0LQmwW3JX8Lcg=
github.com/RoaringBitmap/roaring v1.3.0/go.mod h1:plvDsJQpxOC5bw8LRteu/MLWHsHez/3y6cubLI4/1yE=
github.com/akrylysov/pogreb v0.9.1 h1:kO4TD2qaiCD0TtIjEj3ta1rCTMtyp61RxuD4cet7S14=
github.com/akrylysov/pogreb v0.9.1/go.mod h1:pNs6QmpQ1UlTJKDezuRWmaqkgUE2TuU0YTWyqJZ7+lI=
github.com/alecthomas/kong v0.2.17 h1:URDISCI96MIgcIlQyoCAlhOmrSw6pZScBNkctg8r0W0=
Expand Down
5 changes: 3 additions & 2 deletions server/bitmapquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,8 @@ func (m *BitmapIndex) Projection(ctx context.Context, req *pb.ProjectionRequest)
}
if _, ok := m.bsiCache[req.Index][v]; ok {
var bsi *BSIBitmap
if bsi, err2 = m.timeRangeBSI(req.Index, v, fromTime, toTime, foundSet, req.Negate); err2 != nil {
//if bsi, err2 = m.timeRangeBSI(req.Index, v, fromTime, toTime, foundSet, req.Negate); err2 != nil {
if bsi, err2 = m.timeRangeBSI(req.Index, v, fromTime, toTime, foundSet, false); err2 != nil {
return nil, fmt.Errorf("Error ranging projection BSI for %s %s - %v", req.Index, v, err2)
}
if bsi.GetCardinality() == 0 {
Expand All @@ -538,7 +539,7 @@ func (m *BitmapIndex) Projection(ctx context.Context, req *pb.ProjectionRequest)
}
bsiResults = append(bsiResults, bsir)
} else {
if attr.IsBSI() {
if attr.IsBSI() || attr.MappingStrategy == "ParentRelation" {
bsir := &pb.BSIResult{Field: v}
bsir.Bitmaps, _ = m.newBSIBitmap(req.Index, v).MarshalBinary()
bsiResults = append(bsiResults, bsir)
Expand Down
5 changes: 4 additions & 1 deletion source/sql_to_quanta.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func (m *SQLToQuanta) ResolveField(name string) (field *core.Attribute, isBSI bo
if err != nil {
return
}
if field.MappingStrategy == "ParentRelation" {
isBSI = true
return
}
if core.MapperTypeFromString(field.MappingStrategy).IsBSI() {
isBSI = true
}
Expand Down Expand Up @@ -707,7 +711,6 @@ func (m *SQLToQuanta) walkFilterBinary(node *expr.BinaryNode, q *shared.QueryFra
if m.endDate == "" {
end := ts.AddDate(0, 0, 1)
m.endDate = end.Format(shared.YMDHTimeFmt)
u.Warnf("SETTING DATE RANGE IN EQUALS? %v - %v", m.startDate, m.endDate)
}
}
}
Expand Down

0 comments on commit 8691480

Please sign in to comment.