Skip to content

Commit

Permalink
clean up 2
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Dec 9, 2024
1 parent 2d29f02 commit 499c9b0
Show file tree
Hide file tree
Showing 12 changed files with 27 additions and 30 deletions.
2 changes: 1 addition & 1 deletion warehouse/integrations/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ func (ch *Clickhouse) AddColumns(ctx context.Context, tableName string, columnsI
}

func (ch *Clickhouse) CreateSchema(ctx context.Context) error {
if !ch.Uploader.IsWarehouseSchemaEmpty() {
if !ch.Uploader.IsSchemaEmpty() {
return nil
}

Expand Down
11 changes: 4 additions & 7 deletions warehouse/integrations/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,18 @@ import (
"testing"
"time"

"github.com/samber/lo"

"github.com/rudderlabs/rudder-go-kit/stats"

"go.uber.org/mock/gomock"

clickhousestd "github.com/ClickHouse/clickhouse-go"
"github.com/google/uuid"
"github.com/samber/lo"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

"github.com/rudderlabs/compose-test/compose"
"github.com/rudderlabs/compose-test/testcompose"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
Expand Down Expand Up @@ -1222,7 +1219,7 @@ func newMockUploader(
u.EXPECT().GetTableSchemaInUpload(gomock.Any()).Return(tableSchema).AnyTimes()
u.EXPECT().GetLoadFilesMetadata(gomock.Any(), gomock.Any()).Return(metadata, nil).AnyTimes()
u.EXPECT().UseRudderStorage().Return(false).AnyTimes()
u.EXPECT().IsWarehouseSchemaEmpty().Return(true).AnyTimes()
u.EXPECT().IsSchemaEmpty().Return(true).AnyTimes()

return u
}
12 changes: 6 additions & 6 deletions warehouse/internal/mocks/utils/mock_uploader.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion warehouse/router/state_create_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func (job *UploadJob) createRemoteSchema(whManager manager.Manager) error {
if job.schemaHandle.IsWarehouseSchemaEmpty() {
if job.schemaHandle.IsSchemaEmpty() {
if err := whManager.CreateSchema(job.ctx); err != nil {
return fmt.Errorf("creating schema: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion warehouse/router/state_export_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ func (job *UploadJob) columnCountStat(tableName string) {
tags := []whutils.Tag{
{Name: "tableName", Value: whutils.TableNameForStats(tableName)},
}
currentColumnsCount := job.schemaHandle.GetColumnsCountInWarehouseSchema(tableName)
currentColumnsCount := job.schemaHandle.GetColumnsCountInSchema(tableName)

job.gaugeStat(`warehouse_load_table_column_count`, tags...).Gauge(currentColumnsCount)
job.gaugeStat(`warehouse_load_table_column_limit`, tags...).Gauge(columnCountLimit)
Expand Down
4 changes: 2 additions & 2 deletions warehouse/router/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,8 +841,8 @@ func (job *UploadJob) GetSampleLoadFileLocation(ctx context.Context, tableName s
return locations[0].Location, nil
}

func (job *UploadJob) IsWarehouseSchemaEmpty() bool {
return job.schemaHandle.IsWarehouseSchemaEmpty()
func (job *UploadJob) IsSchemaEmpty() bool {
return job.schemaHandle.IsSchemaEmpty()
}

func (job *UploadJob) GetTableSchemaInWarehouse(tableName string) model.TableSchema {
Expand Down
2 changes: 1 addition & 1 deletion warehouse/router/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestColumnCountStat(t *testing.T) {
m2 := statsStore.Get("warehouse_load_table_column_limit", tags)

if tc.statExpected {
require.EqualValues(t, m1.LastValue(), j.schemaHandle.GetColumnsCountInWarehouseSchema(tableName))
require.EqualValues(t, m1.LastValue(), j.schemaHandle.GetColumnsCountInSchema(tableName))
require.EqualValues(t, m2.LastValue(), tc.columnCountLimit)
} else {
require.Nil(t, m1)
Expand Down
4 changes: 2 additions & 2 deletions warehouse/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,13 +400,13 @@ func (sh *Schema) UpdateWarehouseTableSchema(tableName string, tableSchema model
sh.localSchema[tableName] = tableSchema
}

func (sh *Schema) IsWarehouseSchemaEmpty() bool {
func (sh *Schema) IsSchemaEmpty() bool {
sh.localSchemaMu.RLock()
defer sh.localSchemaMu.RUnlock()
return len(sh.localSchema) == 0
}

func (sh *Schema) GetColumnsCountInWarehouseSchema(tableName string) int {
func (sh *Schema) GetColumnsCountInSchema(tableName string) int {
sh.localSchemaMu.RLock()
defer sh.localSchemaMu.RUnlock()
return len(sh.localSchema[tableName])
Expand Down
10 changes: 5 additions & 5 deletions warehouse/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func TestSchema_FetchSchemaFromWarehouse(t *testing.T) {
ctx := context.Background()

require.Empty(t, s.GetTableSchemaInWarehouse(tableName))
require.True(t, s.IsWarehouseSchemaEmpty())
require.True(t, s.IsSchemaEmpty())

_, err := s.FetchSchemaFromWarehouse(ctx, &mockRepo)
if tc.wantError == nil {
Expand All @@ -210,15 +210,15 @@ func TestSchema_FetchSchemaFromWarehouse(t *testing.T) {
require.Error(t, err, fmt.Sprintf("got error %v, want error %v", err, tc.wantError))
}
require.Equal(t, tc.expectedSchema[tableName], s.GetTableSchemaInWarehouse(tableName))
require.Equal(t, len(tc.expectedSchema[tableName]), s.GetColumnsCountInWarehouseSchema(tableName))
require.Equal(t, len(tc.expectedSchema[tableName]), s.GetColumnsCountInSchema(tableName))
if len(tc.expectedSchema) > 0 {
require.False(t, s.IsWarehouseSchemaEmpty())
require.False(t, s.IsSchemaEmpty())
} else {
require.True(t, s.IsWarehouseSchemaEmpty())
require.True(t, s.IsSchemaEmpty())
}
s.UpdateWarehouseTableSchema(updatedTable, updatedSchema)
require.Equal(t, updatedSchema, s.GetTableSchemaInWarehouse(updatedTable))
require.False(t, s.IsWarehouseSchemaEmpty())
require.False(t, s.IsSchemaEmpty())
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion warehouse/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (m *Manager) markFailed(ctx context.Context, ids []int64, failError error)

type Uploader struct{}

func (*Uploader) IsWarehouseSchemaEmpty() bool { return true }
func (*Uploader) IsSchemaEmpty() bool { return true }
func (*Uploader) UpdateLocalSchema(context.Context, model.Schema) error { return nil }
func (*Uploader) GetTableSchemaInUpload(string) model.TableSchema { return model.TableSchema{} }
func (*Uploader) ShouldOnDedupUseNewRecord() bool { return false }
Expand Down
4 changes: 2 additions & 2 deletions warehouse/utils/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

//go:generate mockgen -destination=../internal/mocks/utils/mock_uploader.go -package mock_uploader github.com/rudderlabs/rudder-server/warehouse/utils Uploader
type Uploader interface {
IsWarehouseSchemaEmpty() bool
IsSchemaEmpty() bool
GetLocalSchema(ctx context.Context) (model.Schema, error)
UpdateLocalSchema(ctx context.Context, schema model.Schema) error
GetTableSchemaInWarehouse(tableName string) model.TableSchema
Expand All @@ -28,7 +28,7 @@ func NewNoOpUploader() Uploader {
return &noopUploader{}
}

func (n *noopUploader) IsWarehouseSchemaEmpty() bool {
func (n *noopUploader) IsSchemaEmpty() bool {
return false
}
func (n *noopUploader) GetLocalSchema(ctx context.Context) (model.Schema, error) { return nil, nil } // nolint:nilnil
Expand Down
2 changes: 1 addition & 1 deletion warehouse/validations/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ type dummyUploader struct {
dest *backendconfig.DestinationT
}

func (*dummyUploader) IsWarehouseSchemaEmpty() bool { return true }
func (*dummyUploader) IsSchemaEmpty() bool { return true }
func (*dummyUploader) GetLocalSchema(context.Context) (model.Schema, error) {
return model.Schema{}, nil
}
Expand Down

0 comments on commit 499c9b0

Please sign in to comment.