diff --git a/warehouse/integrations/clickhouse/clickhouse.go b/warehouse/integrations/clickhouse/clickhouse.go index d7d0aac3e3..3a6c787a60 100644 --- a/warehouse/integrations/clickhouse/clickhouse.go +++ b/warehouse/integrations/clickhouse/clickhouse.go @@ -895,7 +895,7 @@ func (ch *Clickhouse) AddColumns(ctx context.Context, tableName string, columnsI } func (ch *Clickhouse) CreateSchema(ctx context.Context) error { - if !ch.Uploader.IsWarehouseSchemaEmpty() { + if !ch.Uploader.IsSchemaEmpty() { return nil } diff --git a/warehouse/integrations/clickhouse/clickhouse_test.go b/warehouse/integrations/clickhouse/clickhouse_test.go index 39aa36635c..f7923b7514 100644 --- a/warehouse/integrations/clickhouse/clickhouse_test.go +++ b/warehouse/integrations/clickhouse/clickhouse_test.go @@ -12,21 +12,18 @@ import ( "testing" "time" - "github.com/samber/lo" - - "github.com/rudderlabs/rudder-go-kit/stats" - - "go.uber.org/mock/gomock" - clickhousestd "github.com/ClickHouse/clickhouse-go" "github.com/google/uuid" + "github.com/samber/lo" "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" "github.com/rudderlabs/compose-test/compose" "github.com/rudderlabs/compose-test/testcompose" "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/filemanager" "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/stats" kithelper "github.com/rudderlabs/rudder-go-kit/testhelper" backendconfig "github.com/rudderlabs/rudder-server/backend-config" @@ -1222,7 +1219,7 @@ func newMockUploader( u.EXPECT().GetTableSchemaInUpload(gomock.Any()).Return(tableSchema).AnyTimes() u.EXPECT().GetLoadFilesMetadata(gomock.Any(), gomock.Any()).Return(metadata, nil).AnyTimes() u.EXPECT().UseRudderStorage().Return(false).AnyTimes() - u.EXPECT().IsWarehouseSchemaEmpty().Return(true).AnyTimes() + u.EXPECT().IsSchemaEmpty().Return(true).AnyTimes() return u } diff --git a/warehouse/internal/mocks/utils/mock_uploader.go b/warehouse/internal/mocks/utils/mock_uploader.go index f89d124cee..f4b16ecc7a 100644 --- a/warehouse/internal/mocks/utils/mock_uploader.go +++ b/warehouse/internal/mocks/utils/mock_uploader.go @@ -158,18 +158,18 @@ func (mr *MockUploaderMockRecorder) GetTableSchemaInWarehouse(tableName any) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTableSchemaInWarehouse", reflect.TypeOf((*MockUploader)(nil).GetTableSchemaInWarehouse), tableName) } -// IsWarehouseSchemaEmpty mocks base method. -func (m *MockUploader) IsWarehouseSchemaEmpty() bool { +// IsSchemaEmpty mocks base method. +func (m *MockUploader) IsSchemaEmpty() bool { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "IsWarehouseSchemaEmpty") + ret := m.ctrl.Call(m, "IsSchemaEmpty") ret0, _ := ret[0].(bool) return ret0 } -// IsWarehouseSchemaEmpty indicates an expected call of IsWarehouseSchemaEmpty. -func (mr *MockUploaderMockRecorder) IsWarehouseSchemaEmpty() *gomock.Call { +// IsSchemaEmpty indicates an expected call of IsSchemaEmpty. +func (mr *MockUploaderMockRecorder) IsSchemaEmpty() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsWarehouseSchemaEmpty", reflect.TypeOf((*MockUploader)(nil).IsWarehouseSchemaEmpty)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsSchemaEmpty", reflect.TypeOf((*MockUploader)(nil).IsSchemaEmpty)) } // ShouldOnDedupUseNewRecord mocks base method. diff --git a/warehouse/router/state_create_schema.go b/warehouse/router/state_create_schema.go index c3fdc3f3b3..f77fbb4ebf 100644 --- a/warehouse/router/state_create_schema.go +++ b/warehouse/router/state_create_schema.go @@ -7,7 +7,7 @@ import ( ) func (job *UploadJob) createRemoteSchema(whManager manager.Manager) error { - if job.schemaHandle.IsWarehouseSchemaEmpty() { + if job.schemaHandle.IsSchemaEmpty() { if err := whManager.CreateSchema(job.ctx); err != nil { return fmt.Errorf("creating schema: %w", err) } diff --git a/warehouse/router/state_export_data.go b/warehouse/router/state_export_data.go index fb9f967911..9d7e3be956 100644 --- a/warehouse/router/state_export_data.go +++ b/warehouse/router/state_export_data.go @@ -789,7 +789,7 @@ func (job *UploadJob) columnCountStat(tableName string) { tags := []whutils.Tag{ {Name: "tableName", Value: whutils.TableNameForStats(tableName)}, } - currentColumnsCount := job.schemaHandle.GetColumnsCountInWarehouseSchema(tableName) + currentColumnsCount := job.schemaHandle.GetColumnsCountInSchema(tableName) job.gaugeStat(`warehouse_load_table_column_count`, tags...).Gauge(currentColumnsCount) job.gaugeStat(`warehouse_load_table_column_limit`, tags...).Gauge(columnCountLimit) diff --git a/warehouse/router/upload.go b/warehouse/router/upload.go index 39695ba230..c30709f769 100644 --- a/warehouse/router/upload.go +++ b/warehouse/router/upload.go @@ -841,8 +841,8 @@ func (job *UploadJob) GetSampleLoadFileLocation(ctx context.Context, tableName s return locations[0].Location, nil } -func (job *UploadJob) IsWarehouseSchemaEmpty() bool { - return job.schemaHandle.IsWarehouseSchemaEmpty() +func (job *UploadJob) IsSchemaEmpty() bool { + return job.schemaHandle.IsSchemaEmpty() } func (job *UploadJob) GetTableSchemaInWarehouse(tableName string) model.TableSchema { diff --git a/warehouse/router/upload_test.go b/warehouse/router/upload_test.go index d336bf3b13..4d77f52edb 100644 --- a/warehouse/router/upload_test.go +++ b/warehouse/router/upload_test.go @@ -166,7 +166,7 @@ func TestColumnCountStat(t *testing.T) { m2 := statsStore.Get("warehouse_load_table_column_limit", tags) if tc.statExpected { - require.EqualValues(t, m1.LastValue(), j.schemaHandle.GetColumnsCountInWarehouseSchema(tableName)) + require.EqualValues(t, m1.LastValue(), j.schemaHandle.GetColumnsCountInSchema(tableName)) require.EqualValues(t, m2.LastValue(), tc.columnCountLimit) } else { require.Nil(t, m1) diff --git a/warehouse/schema/schema.go b/warehouse/schema/schema.go index 1c68202a72..a3e4437d0e 100644 --- a/warehouse/schema/schema.go +++ b/warehouse/schema/schema.go @@ -400,13 +400,13 @@ func (sh *Schema) UpdateWarehouseTableSchema(tableName string, tableSchema model sh.localSchema[tableName] = tableSchema } -func (sh *Schema) IsWarehouseSchemaEmpty() bool { +func (sh *Schema) IsSchemaEmpty() bool { sh.localSchemaMu.RLock() defer sh.localSchemaMu.RUnlock() return len(sh.localSchema) == 0 } -func (sh *Schema) GetColumnsCountInWarehouseSchema(tableName string) int { +func (sh *Schema) GetColumnsCountInSchema(tableName string) int { sh.localSchemaMu.RLock() defer sh.localSchemaMu.RUnlock() return len(sh.localSchema[tableName]) diff --git a/warehouse/schema/schema_test.go b/warehouse/schema/schema_test.go index 1c8de71e78..bbddb1e624 100644 --- a/warehouse/schema/schema_test.go +++ b/warehouse/schema/schema_test.go @@ -201,7 +201,7 @@ func TestSchema_FetchSchemaFromWarehouse(t *testing.T) { ctx := context.Background() require.Empty(t, s.GetTableSchemaInWarehouse(tableName)) - require.True(t, s.IsWarehouseSchemaEmpty()) + require.True(t, s.IsSchemaEmpty()) _, err := s.FetchSchemaFromWarehouse(ctx, &mockRepo) if tc.wantError == nil { @@ -210,15 +210,15 @@ func TestSchema_FetchSchemaFromWarehouse(t *testing.T) { require.Error(t, err, fmt.Sprintf("got error %v, want error %v", err, tc.wantError)) } require.Equal(t, tc.expectedSchema[tableName], s.GetTableSchemaInWarehouse(tableName)) - require.Equal(t, len(tc.expectedSchema[tableName]), s.GetColumnsCountInWarehouseSchema(tableName)) + require.Equal(t, len(tc.expectedSchema[tableName]), s.GetColumnsCountInSchema(tableName)) if len(tc.expectedSchema) > 0 { - require.False(t, s.IsWarehouseSchemaEmpty()) + require.False(t, s.IsSchemaEmpty()) } else { - require.True(t, s.IsWarehouseSchemaEmpty()) + require.True(t, s.IsSchemaEmpty()) } s.UpdateWarehouseTableSchema(updatedTable, updatedSchema) require.Equal(t, updatedSchema, s.GetTableSchemaInWarehouse(updatedTable)) - require.False(t, s.IsWarehouseSchemaEmpty()) + require.False(t, s.IsSchemaEmpty()) }) } } diff --git a/warehouse/source/source.go b/warehouse/source/source.go index d2bfe4ee15..b481064299 100644 --- a/warehouse/source/source.go +++ b/warehouse/source/source.go @@ -284,7 +284,7 @@ func (m *Manager) markFailed(ctx context.Context, ids []int64, failError error) type Uploader struct{} -func (*Uploader) IsWarehouseSchemaEmpty() bool { return true } +func (*Uploader) IsSchemaEmpty() bool { return true } func (*Uploader) UpdateLocalSchema(context.Context, model.Schema) error { return nil } func (*Uploader) GetTableSchemaInUpload(string) model.TableSchema { return model.TableSchema{} } func (*Uploader) ShouldOnDedupUseNewRecord() bool { return false } diff --git a/warehouse/utils/uploader.go b/warehouse/utils/uploader.go index 0c2592c445..0554986f0d 100644 --- a/warehouse/utils/uploader.go +++ b/warehouse/utils/uploader.go @@ -8,7 +8,7 @@ import ( //go:generate mockgen -destination=../internal/mocks/utils/mock_uploader.go -package mock_uploader github.com/rudderlabs/rudder-server/warehouse/utils Uploader type Uploader interface { - IsWarehouseSchemaEmpty() bool + IsSchemaEmpty() bool GetLocalSchema(ctx context.Context) (model.Schema, error) UpdateLocalSchema(ctx context.Context, schema model.Schema) error GetTableSchemaInWarehouse(tableName string) model.TableSchema @@ -28,7 +28,7 @@ func NewNoOpUploader() Uploader { return &noopUploader{} } -func (n *noopUploader) IsWarehouseSchemaEmpty() bool { +func (n *noopUploader) IsSchemaEmpty() bool { return false } func (n *noopUploader) GetLocalSchema(ctx context.Context) (model.Schema, error) { return nil, nil } // nolint:nilnil diff --git a/warehouse/validations/validate.go b/warehouse/validations/validate.go index 174aac92f8..f37a55d110 100644 --- a/warehouse/validations/validate.go +++ b/warehouse/validations/validate.go @@ -553,7 +553,7 @@ type dummyUploader struct { dest *backendconfig.DestinationT } -func (*dummyUploader) IsWarehouseSchemaEmpty() bool { return true } +func (*dummyUploader) IsSchemaEmpty() bool { return true } func (*dummyUploader) GetLocalSchema(context.Context) (model.Schema, error) { return model.Schema{}, nil }