Skip to content

Commit

Permalink
Merge pull request #49 from disney/develop
Browse files Browse the repository at this point in the history
0.9.9 release
  • Loading branch information
guymolinari authored Dec 3, 2022
2 parents bb4159b + fc7b10b commit a36a64f
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 32 deletions.
2 changes: 2 additions & 0 deletions core/projector.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,11 @@ func (p *Projector) Next(count int) (columnIDs []uint64, rows [][]driver.Value,
allAttr = append(allAttr, p.joinAttributes...)
bsiResults, bitmapResults, e := p.retrieveBitmapResults(p.foundSets, allAttr, false)
if e != nil {
p.stateGuard.Unlock()
err = e
return
}

p.bsiResults = bsiResults
p.bitmapResults = bitmapResults
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,4 @@ require (
gopkg.in/yaml.v2 v2.4.0
)

replace github.com/araddon/qlbridge => github.com/guymolinari/qlbridge v0.0.0-20220908123708-8e2258173b7d
replace github.com/araddon/qlbridge => github.com/guymolinari/qlbridge v0.0.0-20221128230730-c39f38b91831
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,10 @@ github.com/guymolinari/qlbridge v0.0.0-20220907215446-39888b918b57 h1:2ahN3SKUg6
github.com/guymolinari/qlbridge v0.0.0-20220907215446-39888b918b57/go.mod h1:4SL2pGOyW3H4a/mckpnbrp38ixt2Xw8yCnbzsrI3WM4=
github.com/guymolinari/qlbridge v0.0.0-20220908123708-8e2258173b7d h1:vFolPrAYG57gDBFFJsZn3faXBWqhGrrKlgB0Vh+gmdo=
github.com/guymolinari/qlbridge v0.0.0-20220908123708-8e2258173b7d/go.mod h1:4SL2pGOyW3H4a/mckpnbrp38ixt2Xw8yCnbzsrI3WM4=
github.com/guymolinari/qlbridge v0.0.0-20221120162606-7d69902fce3c h1:V+FEQrJimp10vX0bcYbLhPSPgW+K1YTmkRiZNwF3NCE=
github.com/guymolinari/qlbridge v0.0.0-20221120162606-7d69902fce3c/go.mod h1:4SL2pGOyW3H4a/mckpnbrp38ixt2Xw8yCnbzsrI3WM4=
github.com/guymolinari/qlbridge v0.0.0-20221128230730-c39f38b91831 h1:LwsFarJkNQzz6w9jD6HyTfVPqy8lVQZaSZpg4gRMv3I=
github.com/guymolinari/qlbridge v0.0.0-20221128230730-c39f38b91831/go.mod h1:4SL2pGOyW3H4a/mckpnbrp38ixt2Xw8yCnbzsrI3WM4=
github.com/hamba/avro v1.6.0 h1:a9tNvjpZVfDQQJWSM2g8hUc7gYacKZkHF3OK0w49UXY=
github.com/hamba/avro v1.6.0/go.mod h1:iKbXifVeT1gOHU+Eqe8wWziE745Z+Aa/6sbJnWeSW5A=
github.com/harlow/kinesis-consumer v0.3.5 h1:xeiDp2frP8DdKDeOzVuS+vaBX03JjifQO/Apzu4IOMA=
Expand Down
77 changes: 73 additions & 4 deletions shared/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"encoding/binary"
"fmt"
"github.com/araddon/dateparse"
"github.com/araddon/qlbridge/exec"
u "github.com/araddon/gou"
"github.com/hashicorp/consul/api"
"golang.org/x/sync/errgroup"
"net"
"os"
"os/signal"
Expand Down Expand Up @@ -384,13 +386,13 @@ func Lock(consul *api.Client, lockName, processName string) (*api.Lock, error) {

// create lock key
opts := &api.LockOptions{
Key: lockName + "/1",
Value: []byte("lock set by " + processName),
Key: lockName + "/1",
Value: []byte("lock set by " + processName),
SessionTTL: "10s",
/*
SessionOpts: &api.SessionEntry{
Checks: []string{"check1", "check2"},
Behavior: "release",
Checks: []string{"check1", "check2"},
Behavior: "release",
},
*/
}
Expand Down Expand Up @@ -542,3 +544,70 @@ func SetClusterSizeTarget(consul *api.Client, size int) error {
}
return nil
}

// GetIntParam retrieves an int value from a parameter map
func GetIntParam(params map[string]interface{}, key string) (val int, err error) {

if params != nil {
sParam := params[key]
if sParam != nil {
switch v := sParam.(type) {
case int64:
val = int(sParam.(int64))
case string:
var valx int64
valx, err = strconv.ParseInt(sParam.(string), 0, 32)
if err != nil {
err = fmt.Errorf("error parsing %s - %v", key, err)
return
}
val = int(valx)
default:
err = fmt.Errorf("unknown type %T for timeout", v)
}
}
}
return
}

// GetBoolParam retrieves an boolean value from a parameter map
func GetBoolParam(params map[string]interface{}, key string) (val bool, err error) {

if params != nil {
sParam := params[key]
if sParam != nil {
switch v := sParam.(type) {
case bool:
val = sParam.(bool)
case string:
val, err = strconv.ParseBool(sParam.(string))
if err != nil {
err = fmt.Errorf("error parsing %s - %v", key, err)
return
}
default:
err = fmt.Errorf("unknown type %T for timeout", v)
}
}
}
return
}

// WaitTimeout waits for the waitgroup for the specified max timeout.
// Returns true if waiting timed out.
func WaitTimeout(wg *errgroup.Group, timeout time.Duration, sigChan exec.SigChan) (error, bool) {

c := make(chan error, 1)
go func() {
defer close(c)
c <-wg.Wait()
}()
select {
case err := <-c:
return err, false // completed normally
case <-time.After(timeout):
sigChan <- true
close(sigChan)
return nil, true // timed out
}
}
45 changes: 40 additions & 5 deletions sink/s3sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type (
acl string
sseKmsKeyID string
config *aws.Config
path string
}
)

Expand All @@ -54,6 +55,8 @@ type (
acl string
sseKmsKeyID string
config *aws.Config
path string
s3svc *s3.Client
}
)

Expand Down Expand Up @@ -83,6 +86,7 @@ func NewS3Sink(ctx *plan.Context, path string, params map[string]interface{}) (e
// Open CSV session to S3
func (s *S3CSVSink) Open(ctx *plan.Context, bucketpath string, params map[string]interface{}) error {

s.path = bucketpath
if delimiter, ok := params["delimiter"]; !ok {
s.delimiter = '\t'
} else {
Expand Down Expand Up @@ -178,9 +182,17 @@ func (s *S3CSVSink) Close() error {
return nil
}


// Cleanup S3 CSV session.
func (s *S3CSVSink) Cleanup() error {
return s.outBucket.Delete(s.path)
}


// Open Parquet session to S3
func (s *S3ParquetSink) Open(ctx *plan.Context, bucketpath string, params map[string]interface{}) error {

s.path = bucketpath
bucket, file, err := parseBucketName(bucketpath)
if err != nil {
return err
Expand Down Expand Up @@ -215,7 +227,7 @@ func (s *S3ParquetSink) Open(ctx *plan.Context, bucketpath string, params map[st
u.Errorf("Parquet Sink: Could not load the default config: %v",err)
}

var s3svc *s3.Client
//var s3svc *s3.Client

if s.assumeRoleArn != "" {
client := sts.NewFromConfig(cfg)
Expand All @@ -237,25 +249,25 @@ func (s *S3ParquetSink) Open(ctx *plan.Context, bucketpath string, params map[st
return fmt.Errorf("Failed to retrieve credentials from cache: %v", err)
}

s3svc = s3.NewFromConfig(cfg, func(o *s3.Options) {
s.s3svc = s3.NewFromConfig(cfg, func(o *s3.Options) {
o.Region = region
o.Credentials = provider
o.RetryMaxAttempts = 10
})
} else {
s3svc = s3.NewFromConfig(cfg, func(o *s3.Options) {
s.s3svc = s3.NewFromConfig(cfg, func(o *s3.Options) {
o.Region = region
o.RetryMaxAttempts = 10
})
}

if s3svc == nil {
if s.s3svc == nil {
return fmt.Errorf("failed creating S3 session")
}

// Create S3 service client
u.Infof("Parquet Sink: Opening Output S3 path s3://%s/%s", bucket, file)
s.outFile, err = pgs3.NewS3FileWriterWithClient(context.Background(), s3svc, bucket, file, nil, func(p *s3.PutObjectInput){
s.outFile, err = pgs3.NewS3FileWriterWithClient(context.Background(), s.s3svc, bucket, file, nil, func(p *s3.PutObjectInput){
p.SSEKMSKeyId = aws.String(s.sseKmsKeyID)
p.ServerSideEncryption = "aws:kms"
p.ACL = types.ObjectCannedACL(s.acl)
Expand Down Expand Up @@ -332,6 +344,29 @@ func (s *S3ParquetSink) Close() error {
return nil
}

// Cleanup S3 Parquet session.
func (s *S3ParquetSink) Cleanup() error {

err := s.Close()
if err != nil {
return err
}

bucket, obj, err2 := parseBucketName(s.path)
if err2 != nil {
return err2
}

_, err = s.s3svc.DeleteObject(context.TODO(), &s3.DeleteObjectInput{Bucket: aws.String(bucket), Key: aws.String(obj)})
if err != nil {
u.Warnf("Unable to delete object %q from bucket %q, %v", obj, bucket, err)
return err
}

u.Warnf("removed partially written file %q from bucket %q", obj, bucket)
return err
}

func parseBucketName(bucketPath string) (bucket string, file string, err error) {

noScheme := strings.Replace(strings.ToLower(bucketPath), "s3://", "", 1)
Expand Down
5 changes: 5 additions & 0 deletions sink/tablesink.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,8 @@ func (s *TableSink) Next(dest []driver.Value, colIndex map[string]int) error {
func (s *TableSink) Close() error {
return nil
}

// Cleanup output session.
func (s *TableSink) Cleanup() error {
return nil
}
57 changes: 48 additions & 9 deletions source/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"runtime"
"strings"
"sync"
"time"
//u "github.com/araddon/gou"
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/araddon/qlbridge/datasource"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/araddon/qlbridge/value"
"github.com/araddon/qlbridge/vm"
"github.com/disney/quanta/core"
"github.com/disney/quanta/shared"
)

func outputRownumMessages(outCh exec.MessageChan, rs *roaring64.Bitmap, limit, offset int) {
Expand Down Expand Up @@ -77,30 +79,57 @@ func decorateRow(row []driver.Value, proj *rel.Projection, rowCols map[string]in
if v.Col.Expr.NodeType() != "Func" {
continue
}
nodeVal, ok := vm.Eval(ctx, v.Col.Expr)
if !ok {
nodeVal, ok := vm.Eval(ctx, v.Col.Expr)
if !ok {
newRow[i] = "NULL"
continue
}
}
newRow[i] = nodeVal.ToString()
}
return newRow
}

func outputProjection(outCh exec.MessageChan, sigChan exec.SigChan, proj *core.Projector,
colNames, rowCols map[string]int, limit, offset int, isExport, isDistinct bool, pro *rel.Projection) error {
colNames, rowCols map[string]int, limit, offset int, isExport, isDistinct bool, pro *rel.Projection,
params map[string]interface{}) error {

batchSize := limit
nThreads := 1
limitIsBatch := true
timeout := 60
var dupMap sync.Map
var err error
if params != nil {
if params["timeout"] != nil {
if timeout, err = shared.GetIntParam(params, "timeout"); err != nil {
return err
}
}
if params["threads"] != nil {
if nThreads, err = shared.GetIntParam(params, "threads"); err != nil {
return err
}
}
}
limitIsBatch := true

// Parallelize projection for SELECT ... INTO
if isExport {
batchSize = 1000
nThreads = runtime.NumCPU() / 2
nThreads = runtime.NumCPU()
limitIsBatch = false
proj.Prefetch = true
if params != nil {
var err error
if params["batchSize"] != nil {
if batchSize, err = shared.GetIntParam(params, "batchSize"); err != nil {
return err
}
}
if params["prefetch"] != nil {
if proj.Prefetch, err = shared.GetBoolParam(params, "prefetch"); err != nil {
return err
}
}
}
}

var eg errgroup.Group
Expand Down Expand Up @@ -129,8 +158,14 @@ func outputProjection(outCh exec.MessageChan, sigChan exec.SigChan, proj *core.P
}
msg := datasource.NewSqlDriverMessageMap(columnID, rows[i], colNames)
select {
case <-sigChan:
case _, closed := <-sigChan:
if closed {
return fmt.Errorf("timed out.")
}
return nil
default:
}
select {
case outCh <- msg:
// continue
}
Expand All @@ -141,9 +176,13 @@ func outputProjection(outCh exec.MessageChan, sigChan exec.SigChan, proj *core.P
}
})
}
if err := eg.Wait(); err != nil {
err, timedOut := shared.WaitTimeout(&eg, time.Duration(timeout) * time.Second, sigChan)
if err != nil {
return err
}
if timedOut {
return fmt.Errorf("timed out after %d seconds", timeout)
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion source/quanta_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func (m *JoinMerge) Run() error {
}

if err = outputProjection(outCh, m.SigChan(), proj, cn, rn, limit, offset, isExport,
orig.Distinct, fp); err != nil {
orig.Distinct, fp, orig.With); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion source/resultreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func (m *ResultReader) Run() error {
}

if err = outputProjection(outCh, sigChan, proj, colNames, rowCols, m.limit, m.offset, isExport,
orig.Distinct, m.sql.p.Proj); err != nil {
orig.Distinct, m.sql.p.Proj, orig.With); err != nil {
return err
}

Expand Down
Loading

0 comments on commit a36a64f

Please sign in to comment.