Skip to content

Commit

Permalink
Merge pull request #376 from bruin-data/handle-cluster-change
Browse files Browse the repository at this point in the history
handle cluster and partioning changes in Bq
  • Loading branch information
Barbar1432 authored Jan 3, 2025
2 parents 09d1e0f + 5e202ab commit 24964de
Show file tree
Hide file tree
Showing 7 changed files with 624 additions and 17 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,5 @@ integration-tests/logs
integration-tests/bruin
!integration-tests/.bruin.yml

venv
venv
logs/runs
15 changes: 15 additions & 0 deletions pkg/bigquery/checks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"testing"

"cloud.google.com/go/bigquery"
"github.com/bruin-data/bruin/pkg/ansisql"
"github.com/bruin-data/bruin/pkg/pipeline"
"github.com/bruin-data/bruin/pkg/query"
Expand Down Expand Up @@ -58,6 +59,20 @@ func (m *mockQuerierWithResult) SelectWithSchema(ctx context.Context, q *query.Q

return result, args.Error(1)
}
func (m *mockQuerierWithResult) DeleteTableIfPartitioningOrClusteringMismatch(ctx context.Context, tableName string, asset *pipeline.Asset) error {
args := m.Called(ctx, tableName, asset)
return args.Error(0)
}

func (m *mockQuerierWithResult) IsSamePartitioning(meta *bigquery.TableMetadata, asset *pipeline.Asset) bool {
args := m.Called(meta, asset)
return args.Bool(0)
}

func (m *mockQuerierWithResult) IsSameClustering(meta *bigquery.TableMetadata, asset *pipeline.Asset) bool {
args := m.Called(meta, asset)
return args.Bool(0)
}

type mockConnectionFetcher struct {
mock.Mock
Expand Down
142 changes: 135 additions & 7 deletions pkg/bigquery/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,15 @@ type MetadataUpdater interface {
UpdateTableMetadataIfNotExist(ctx context.Context, asset *pipeline.Asset) error
}

type TableManager interface {
DeleteTableIfPartitioningOrClusteringMismatch(ctx context.Context, tableName string, asset *pipeline.Asset) error
}

type DB interface {
Querier
Selector
MetadataUpdater
TableManager
}

type Client struct {
Expand Down Expand Up @@ -185,6 +190,24 @@ func (m NoMetadataUpdatedError) Error() string {
return "no metadata found for the given asset to be pushed to BigQuery"
}

func (d *Client) getTableRef(tableName string) (*bigquery.Table, error) {
tableComponents := strings.Split(tableName, ".")

// Check for empty components
for _, component := range tableComponents {
if component == "" {
return nil, fmt.Errorf("table name must be in dataset.table or project.dataset.table format, '%s' given", tableName)
}
}

if len(tableComponents) == 3 {
return d.client.DatasetInProject(tableComponents[0], tableComponents[1]).Table(tableComponents[2]), nil
} else if len(tableComponents) == 2 {
return d.client.DatasetInProject(d.config.ProjectID, tableComponents[0]).Table(tableComponents[1]), nil
}
return nil, fmt.Errorf("table name must be in dataset.table or project.dataset.table format, '%s' given", tableName)
}

func (d *Client) UpdateTableMetadataIfNotExist(ctx context.Context, asset *pipeline.Asset) error {
anyColumnHasDescription := false
colsByName := make(map[string]*pipeline.Column, len(asset.Columns))
Expand All @@ -198,13 +221,10 @@ func (d *Client) UpdateTableMetadataIfNotExist(ctx context.Context, asset *pipel
if asset.Description == "" && (len(asset.Columns) == 0 || !anyColumnHasDescription) {
return NoMetadataUpdatedError{}
}

tableComponents := strings.Split(asset.Name, ".")
if len(tableComponents) != 2 {
return fmt.Errorf("asset name must be in schema.table format to update the metadata, '%s' given", asset.Name)
tableRef, err := d.getTableRef(asset.Name)
if err != nil {
return err
}

tableRef := d.client.Dataset(tableComponents[0]).Table(tableComponents[1])
meta, err := tableRef.Metadata(ctx)
if err != nil {
return err
Expand All @@ -228,7 +248,6 @@ func (d *Client) UpdateTableMetadataIfNotExist(ctx context.Context, asset *pipel
if asset.Description != "" {
update.Description = asset.Description
}

primaryKeys := asset.ColumnNamesWithPrimaryKey()
if len(primaryKeys) > 0 {
update.TableConstraints = &bigquery.TableConstraints{
Expand Down Expand Up @@ -271,3 +290,112 @@ func (d *Client) Ping(ctx context.Context) error {

return nil // Return nil if the query runs successfully
}

func (d *Client) DeleteTableIfPartitioningOrClusteringMismatch(ctx context.Context, tableName string, asset *pipeline.Asset) error {
tableRef, err := d.getTableRef(tableName)
if err != nil {
return err
}

// Fetch table metadata
meta, err := tableRef.Metadata(ctx)
if err != nil {
return fmt.Errorf("failed to fetch metadata for table '%s': %w", tableName, err)
}

// Check if partitioning or clustering exists in metadata or is wanted by asset
if meta.TimePartitioning != nil || meta.RangePartitioning != nil || asset.Materialization.PartitionBy != "" || len(asset.Materialization.ClusterBy) > 0 {
if !IsSamePartitioning(meta, asset) || !IsSameClustering(meta, asset) {
if err := tableRef.Delete(ctx); err != nil {
return fmt.Errorf("failed to delete table '%s': %w", tableName, err)
}
fmt.Printf("Your table will be dropped and recreated:\n")
fmt.Printf("Table '%s' dropped successfully.\n", tableName)
fmt.Printf("Recreating the table with the new clustering and partitioning strategies...\n")
}
}

return nil
}

func IsSamePartitioning(meta *bigquery.TableMetadata, asset *pipeline.Asset) bool {
// If asset wants partitioning but table has none
if asset.Materialization.PartitionBy != "" &&
meta.TimePartitioning == nil &&
meta.RangePartitioning == nil {
fmt.Printf(
"Mismatch detected: Your table has no partitioning, but you are attempting to partition by '%s'.\n",
asset.Materialization.PartitionBy,
)
return false
}

// Safe to proceed only if table has any partitioning
if meta.TimePartitioning == nil && meta.RangePartitioning == nil {
return true
}

if meta.TimePartitioning != nil {
if meta.TimePartitioning.Field != asset.Materialization.PartitionBy {
fmt.Printf(
"Mismatch detected: Your table has a time partitioning strategy with the field '%s', "+
"but you are attempting to use the field '%s'\n",
meta.TimePartitioning.Field,
asset.Materialization.PartitionBy,
)
return false
}
}
if meta.RangePartitioning != nil {
if meta.RangePartitioning.Field != asset.Materialization.PartitionBy {
fmt.Printf(
"Mismatch detected: Your table has a range partitioning strategy with the field '%s',"+
"but you are attempting to use the field '%s'.\n", meta.RangePartitioning.Field,
asset.Materialization.PartitionBy,
)
return false
}
}
return true
}

func IsSameClustering(meta *bigquery.TableMetadata, asset *pipeline.Asset) bool {
// If asset wants clustering but table has none
if len(asset.Materialization.ClusterBy) > 0 &&
(meta.Clustering == nil || len(meta.Clustering.Fields) == 0) {
fmt.Printf(
"Mismatch detected: Your table has no clustering, but you are attempting to cluster by %v.\n",
asset.Materialization.ClusterBy,
)
return false
}

// Safe to proceed only if table has clustering
if meta.Clustering == nil {
return true
}

bigQueryFields := meta.Clustering.Fields
userFields := asset.Materialization.ClusterBy

if len(bigQueryFields) != len(userFields) {
fmt.Printf(
"Mismatch detected: Your table has the clustering fields (%v), but you are trying to use the fields (%v).\n",
bigQueryFields, userFields,
)
return false
}

for i := range bigQueryFields {
if bigQueryFields[i] != userFields[i] {
fmt.Printf(
"Mismatch detected: Your table is clustered by '%s' at position %d, "+
"but you are trying to cluster by '%s'.\n",
bigQueryFields[i], i+1, userFields[i],
)
return false
}
}

return true
}
Loading

0 comments on commit 24964de

Please sign in to comment.