Skip to content

Commit

Permalink
Refactoring into modules and multiline command visibility in prompt (#8)
Browse files Browse the repository at this point in the history
* refactored into small packages
* support for multiline visible in prompt
  • Loading branch information
satyakommula96 authored Oct 15, 2023
1 parent 71d9d7e commit 1f3f2cf
Show file tree
Hide file tree
Showing 8 changed files with 301 additions and 169 deletions.
18 changes: 18 additions & 0 deletions calcitesql/calcitesql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package calcitesql
95 changes: 95 additions & 0 deletions calcitesql/execution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package calcitesql

import (
"database/sql"
"fmt"
"log"
"os"
"strings"
"time"

_ "github.com/apache/calcite-avatica-go/v5"
"github.com/olekukonko/tablewriter"
)

func ExecuteQuery(db *sql.DB, query string) {
cmd := strings.TrimRight(query, ";")
start := time.Now()
// Execute the query
rows, err := db.Query(cmd)
duration := time.Since(start)
if err != nil {
log.Println("Error executing query:", err)
return
}
defer rows.Close()

// Get column names
columns, err := rows.Columns()
if err != nil {
log.Println("Error retrieving column names:", err)
return
}

// Create a new table writer for each query
table := tablewriter.NewWriter(os.Stdout)
table.SetAutoFormatHeaders(true)
table.SetAutoWrapText(false)
table.SetReflowDuringAutoWrap(true)

// Create a slice to store the query results
values := make([]interface{}, len(columns))
scanArgs := make([]interface{}, len(columns))
for i := range values {
scanArgs[i] = &values[i]
}

// Fetch and print rows
count := 0
for rows.Next() {
err = rows.Scan(scanArgs...)
if err != nil {
log.Println("Error retrieving row data:", err)
continue
}

// Prepare row data
rowData := make([]string, len(columns))
for i, v := range values {
if v != nil {
rowData[i] = fmt.Sprintf("%v", v)
} else {
rowData[i] = "NULL"
}
}

// Add row to the table
table.Append(rowData)
count++
}

// Set the table headers
table.SetHeader(columns)

// Render the table
table.Render()

fmt.Printf("Rows: %d\nExecution Time: %s\n\n", count, duration)
}
205 changes: 52 additions & 153 deletions cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,21 @@ import (
"database/sql"
"fmt"
"log"
"os"
"strings"
"time"

_ "github.com/apache/calcite-avatica-go/v5"
"github.com/c-bata/go-prompt"
"github.com/olekukonko/tablewriter"
keywords "github.com/satyakommula96/calcite-cli/keywords"
prompt "github.com/satyakommula96/calcite-cli/prompt"
"github.com/spf13/cobra"
)

var (
connectionURL = "http://localhost:8080"
serialization = "protobuf"
enablePartitionPruning = true
distributedExecution = false
connectionURL = "http://localhost:8080"
serialization = "protobuf"
schema string
connectionParams string
user string
passwd string
maxRowsTotal string
)

func main() {
Expand All @@ -48,9 +47,13 @@ func main() {

// Define flags for connection URL and additional parameters
rootCmd.Flags().StringVar(&connectionURL, "url", connectionURL, "Connection URL")
rootCmd.Flags().StringVar(&serialization, "serialization", serialization, "Serialization parameter")
rootCmd.Flags().BoolVar(&enablePartitionPruning, "enablePartitionPruning", enablePartitionPruning, "Enable Partition Pruning")
rootCmd.Flags().BoolVar(&distributedExecution, "distributedExecution", distributedExecution, "Distributed Execution")
rootCmd.Flags().StringVar(&serialization, "serialization", "", "Serialization parameter")
rootCmd.Flags().StringVar(&connectionParams, "params", "", "Extra parameters for avatica connection (ex: property=value&property=value)")
rootCmd.Flags().StringVarP(&schema, "schema", "s", "", "The schema path sets the default schema to use for this connection.")
rootCmd.Flags().StringVarP(&user, "username", "u", "", "Username (required if password is set)")
rootCmd.Flags().StringVarP(&passwd, "password", "p", "", "Password (required if username is set)")
rootCmd.MarkFlagsRequiredTogether("username", "password")
rootCmd.Flags().StringVarP(&maxRowsTotal, "maxRowsTotal", "m", "", "Serialization parameter")

err := rootCmd.Execute()
if err != nil {
Expand All @@ -60,169 +63,65 @@ func main() {

func runSQLPrompt(cmd *cobra.Command, args []string) {
// Establish a connection to the calcite server
db, err := sql.Open("avatica", buildConnectionURL())
if err != nil {
log.Fatal(err)
}
db := establishConnection()
defer db.Close()

fmt.Println("Welcome! Use SQL to query Apache Calcite.\nUse Ctrl+D, type \"exit\" or \"quit\" to exit.")
fmt.Println()

p := prompt.New(
executeQueryWrapper(db),
keywords.CustomCompleter,
prompt.OptionLivePrefix(LivePrefix),
prompt.OptionPrefixTextColor(prompt.Yellow),
prompt.OptionPreviewSuggestionTextColor(prompt.Blue),
prompt.OptionSuggestionBGColor(prompt.White),
prompt.OptionSuggestionTextColor(prompt.Black),
prompt.OptionSelectedSuggestionBGColor(prompt.DarkGray),
prompt.OptionSelectedSuggestionTextColor(prompt.White),
prompt.OptionCompletionOnDown(),
prompt.OptionTitle("Calcite CLI Prompt"), // Set a title for the prompt
prompt.OptionInputTextColor(prompt.Fuchsia), // Customize input text color
prompt.OptionDescriptionTextColor(prompt.Black), // Customize description text color
prompt.OptionSelectedSuggestionTextColor(prompt.White), // Customize selected suggestion text color
prompt.OptionSelectedSuggestionBGColor(prompt.LightGray), // Customize selected suggestion background color
prompt.OptionPrefix("calcite \U0001F48E:sql> "), // Set a custom prefix for the prompt
)

p.Run()

}

var isMultiline bool

func LivePrefix() (prefix string, useLivePrefix bool) {
if isMultiline {
prefix = "... "
useLivePrefix = true
} else {
prefix = "calcite \U0001F48E:sql> "
useLivePrefix = !isMultiline
}
return prefix, useLivePrefix
// Create and run the SQL prompt
prompt.CreateAndRunPrompt(db)
}

func executeQueryWrapper(db *sql.DB) func(string) {
var multiLineQuery strings.Builder

return func(query string) {
// Check for exit command
if strings.ToLower(query) == "exit" || strings.ToLower(query) == "quit" {
fmt.Println("Exiting calcite CLI Prompt...")
os.Exit(0)
}

trimmedQuery := strings.TrimSpace(query)

// Check if it is a multiline query
if strings.HasSuffix(trimmedQuery, ";") {
multiLineQuery.WriteString(trimmedQuery)
executeQuery(db, multiLineQuery.String())
multiLineQuery.Reset()
isMultiline = false
} else {
if !isMultiline {
multiLineQuery.Reset()
isMultiline = true
}
multiLineQuery.WriteString(trimmedQuery)
multiLineQuery.WriteString(" ")
}
}
}

func executeQuery(db *sql.DB, query string) {
// Execute the query
start := time.Now()
cmd := strings.TrimRight(query, ";")
rows, err := db.Query(cmd)
func establishConnection() *sql.DB {
parameters := buildConnectionURL()
fmt.Println("Connecting to ", parameters)
db, err := sql.Open("avatica", parameters)
if err != nil {
log.Println("Error executing query:", err)
return
}
defer rows.Close()

// Get column names
columns, err := rows.Columns()
if err != nil {
log.Println("Error retrieving column names:", err)
return
}

// Create a new table writer for each query
table := tablewriter.NewWriter(os.Stdout)
table.SetAutoFormatHeaders(true)
table.SetAutoWrapText(false)
table.SetReflowDuringAutoWrap(true)

// Create a slice to store the query results
values := make([]interface{}, len(columns))
scanArgs := make([]interface{}, len(columns))
for i := range values {
scanArgs[i] = &values[i]
log.Fatal(err)
}

// Fetch and print rows
count := 0
for rows.Next() {
err = rows.Scan(scanArgs...)
if err != nil {
log.Println("Error retrieving row data:", err)
continue
}

// Prepare row data
rowData := make([]string, len(columns))
for i, v := range values {
if v != nil {
rowData[i] = fmt.Sprintf("%v", v)
} else {
rowData[i] = "NULL"
}
}

// Add row to the table
table.Append(rowData)
count++
if err = db.Ping(); err != nil {
log.Fatal(err)
}

duration := time.Since(start)

// Set the table headers
table.SetHeader(columns)

// Render the table
table.Render()

fmt.Printf("Rows: %d\nExecution Time: %s\n\n", count, duration)
fmt.Println("Connected")
return db
}

func buildConnectionURL() string {
var url strings.Builder

// Append the connection URL
url.WriteString(connectionURL)

var params []string

// Add serialization parameter
// Add serialization parameter by default protobuf
if serialization != "" {
params = append(params, "serialization="+serialization)
}

// Add enablePartitionPruning parameter
if enablePartitionPruning {
params = append(params, "enablePartitionPruning=true")
// Add username and password as parameter
if user != "" {
params = append(params, "avaticaUser="+user)
params = append(params, "avaticaPassword="+passwd)
}

// Add distributedExecution parameter
if !distributedExecution {
params = append(params, "distributedExecution=false")
// Add connection parameters
if connectionParams != "" {
params = append(params, connectionParams)
}

if maxRowsTotal != "" {
params = append(params, "maxRowsTotal="+maxRowsTotal)
}

// Combine the connection URL and parameters
url := connectionURL
if schema != "" {
url.WriteString("/")
url.WriteString(schema)
}

if len(params) > 0 {
url += "?" + strings.Join(params, "&")
url.WriteString("?")
url.WriteString(strings.Join(params, "&"))
}

return url
return url.String()
}
Loading

0 comments on commit 1f3f2cf

Please sign in to comment.