From 1c7ebd9f4c2df2fd769eac4dfa07b778e100c68b Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Wed, 11 Dec 2024 14:31:03 +0530 Subject: [PATCH] chore: refactor 3 --- warehouse/integrations/azure-synapse/azure-synapse.go | 4 ++-- warehouse/integrations/bigquery/bigquery.go | 4 ++-- warehouse/integrations/deltalake/deltalake.go | 6 +++--- warehouse/integrations/mssql/mssql.go | 2 +- warehouse/integrations/postgres/load.go | 2 +- warehouse/integrations/redshift/redshift.go | 8 ++++---- warehouse/integrations/snowflake/snowflake.go | 2 +- warehouse/internal/mocks/utils/mock_uploader.go | 4 ++-- warehouse/router/state_export_data.go | 4 ++-- warehouse/router/upload.go | 4 ++-- warehouse/router/upload_test.go | 2 +- warehouse/schema/schema.go | 4 ++-- warehouse/schema/schema_test.go | 8 ++++---- warehouse/utils/uploader.go | 2 +- 14 files changed, 28 insertions(+), 28 deletions(-) diff --git a/warehouse/integrations/azure-synapse/azure-synapse.go b/warehouse/integrations/azure-synapse/azure-synapse.go index eb8d44f181..b618550f7e 100644 --- a/warehouse/integrations/azure-synapse/azure-synapse.go +++ b/warehouse/integrations/azure-synapse/azure-synapse.go @@ -278,7 +278,7 @@ func (as *AzureSynapse) loadTable( tableSchemaInUpload, ) previousColumnKeys := warehouseutils.SortColumnKeysFromColumnMap( - as.Uploader.GetTableSchemaInWarehouse( + as.Uploader.GetTableSchema( tableName, ), ) @@ -609,7 +609,7 @@ func (as *AzureSynapse) loadUserTables(ctx context.Context) (errorMap map[string defer as.dropStagingTable(ctx, unionStagingTableName) defer as.dropStagingTable(ctx, identifyStagingTable) - userColMap := as.Uploader.GetTableSchemaInWarehouse(warehouseutils.UsersTable) + userColMap := as.Uploader.GetTableSchema(warehouseutils.UsersTable) var userColNames, firstValProps []string for colName := range userColMap { if colName == "id" { diff --git a/warehouse/integrations/bigquery/bigquery.go b/warehouse/integrations/bigquery/bigquery.go index b75c566610..0d4110deaf 100644 --- a/warehouse/integrations/bigquery/bigquery.go +++ b/warehouse/integrations/bigquery/bigquery.go @@ -615,7 +615,7 @@ func (bq *BigQuery) LoadUserTables(ctx context.Context) (errorMap map[string]err return fmt.Sprintf("FIRST_VALUE(`%[1]s` IGNORE NULLS) OVER (PARTITION BY id ORDER BY received_at DESC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `%[1]s`", column) } - userColMap := bq.uploader.GetTableSchemaInWarehouse(warehouseutils.UsersTable) + userColMap := bq.uploader.GetTableSchema(warehouseutils.UsersTable) var userColNames, firstValProps []string for colName := range userColMap { if colName == "id" { @@ -705,7 +705,7 @@ func (bq *BigQuery) createAndLoadStagingUsersTable(ctx context.Context, stagingT gcsRef.MaxBadRecords = 0 gcsRef.IgnoreUnknownValues = false - usersSchema := getTableSchema(bq.uploader.GetTableSchemaInWarehouse(warehouseutils.UsersTable)) + usersSchema := getTableSchema(bq.uploader.GetTableSchema(warehouseutils.UsersTable)) metaData := &bigquery.TableMetadata{ Schema: usersSchema, } diff --git a/warehouse/integrations/deltalake/deltalake.go b/warehouse/integrations/deltalake/deltalake.go index dad7631c57..40e9c0686e 100644 --- a/warehouse/integrations/deltalake/deltalake.go +++ b/warehouse/integrations/deltalake/deltalake.go @@ -581,7 +581,7 @@ func (d *Deltalake) LoadTable( tableName string, ) (*types.LoadTableStats, error) { uploadTableSchema := d.Uploader.GetTableSchemaInUpload(tableName) - warehouseTableSchema := d.Uploader.GetTableSchemaInWarehouse(tableName) + warehouseTableSchema := d.Uploader.GetTableSchema(tableName) loadTableStat, _, err := d.loadTable( ctx, @@ -980,9 +980,9 @@ func (d *Deltalake) hasAWSCredentials() bool { func (d *Deltalake) LoadUserTables(ctx context.Context) map[string]error { var ( identifiesSchemaInUpload = d.Uploader.GetTableSchemaInUpload(warehouseutils.IdentifiesTable) - identifiesSchemaInWarehouse = d.Uploader.GetTableSchemaInWarehouse(warehouseutils.IdentifiesTable) + identifiesSchemaInWarehouse = d.Uploader.GetTableSchema(warehouseutils.IdentifiesTable) usersSchemaInUpload = d.Uploader.GetTableSchemaInUpload(warehouseutils.UsersTable) - usersSchemaInWarehouse = d.Uploader.GetTableSchemaInWarehouse(warehouseutils.UsersTable) + usersSchemaInWarehouse = d.Uploader.GetTableSchema(warehouseutils.UsersTable) ) d.logger.Infow("started loading for identifies and users tables", diff --git a/warehouse/integrations/mssql/mssql.go b/warehouse/integrations/mssql/mssql.go index b1ecb4d20a..48967468dd 100644 --- a/warehouse/integrations/mssql/mssql.go +++ b/warehouse/integrations/mssql/mssql.go @@ -629,7 +629,7 @@ func (ms *MSSQL) loadUserTables(ctx context.Context) (errorMap map[string]error) defer ms.dropStagingTable(ctx, unionStagingTableName) defer ms.dropStagingTable(ctx, identifyStagingTable) - userColMap := ms.Uploader.GetTableSchemaInWarehouse(warehouseutils.UsersTable) + userColMap := ms.Uploader.GetTableSchema(warehouseutils.UsersTable) var userColNames, firstValProps []string for colName := range userColMap { if colName == "id" { diff --git a/warehouse/integrations/postgres/load.go b/warehouse/integrations/postgres/load.go index 1db142507e..53a35b1f8b 100644 --- a/warehouse/integrations/postgres/load.go +++ b/warehouse/integrations/postgres/load.go @@ -318,7 +318,7 @@ func (pg *Postgres) LoadUserTables(ctx context.Context) map[string]error { identifiesSchemaInUpload := pg.Uploader.GetTableSchemaInUpload(warehouseutils.IdentifiesTable) usersSchemaInUpload := pg.Uploader.GetTableSchemaInUpload(warehouseutils.UsersTable) - usersSchemaInWarehouse := pg.Uploader.GetTableSchemaInWarehouse(warehouseutils.UsersTable) + usersSchemaInWarehouse := pg.Uploader.GetTableSchema(warehouseutils.UsersTable) var loadingError loadUsersTableResponse _ = pg.DB.WithTx(ctx, func(tx *sqlmiddleware.Tx) error { diff --git a/warehouse/integrations/redshift/redshift.go b/warehouse/integrations/redshift/redshift.go index 1ea054046d..8f2b32a5a0 100644 --- a/warehouse/integrations/redshift/redshift.go +++ b/warehouse/integrations/redshift/redshift.go @@ -739,7 +739,7 @@ func (rs *Redshift) loadUserTables(ctx context.Context) map[string]error { _, identifyStagingTable, err = rs.loadTable(ctx, warehouseutils.IdentifiesTable, rs.Uploader.GetTableSchemaInUpload(warehouseutils.IdentifiesTable), - rs.Uploader.GetTableSchemaInWarehouse(warehouseutils.IdentifiesTable), + rs.Uploader.GetTableSchema(warehouseutils.IdentifiesTable), true, ) if err != nil { @@ -760,7 +760,7 @@ func (rs *Redshift) loadUserTables(ctx context.Context) map[string]error { _, _, err := rs.loadTable(ctx, warehouseutils.UsersTable, rs.Uploader.GetTableSchemaInUpload(warehouseutils.UsersTable), - rs.Uploader.GetTableSchemaInWarehouse(warehouseutils.UsersTable), + rs.Uploader.GetTableSchema(warehouseutils.UsersTable), false, ) if err != nil { @@ -775,7 +775,7 @@ func (rs *Redshift) loadUserTables(ctx context.Context) map[string]error { } } - userColMap := rs.Uploader.GetTableSchemaInWarehouse(warehouseutils.UsersTable) + userColMap := rs.Uploader.GetTableSchema(warehouseutils.UsersTable) for colName := range userColMap { // do not reference uuid in queries as it can be an autoincrement field set by segment compatible tables if colName == "id" || colName == "user_id" || colName == "uuid" { @@ -1311,7 +1311,7 @@ func (rs *Redshift) LoadTable(ctx context.Context, tableName string) (*types.Loa ctx, tableName, rs.Uploader.GetTableSchemaInUpload(tableName), - rs.Uploader.GetTableSchemaInWarehouse(tableName), + rs.Uploader.GetTableSchema(tableName), false, ) return loadTableStat, err diff --git a/warehouse/integrations/snowflake/snowflake.go b/warehouse/integrations/snowflake/snowflake.go index af715c2879..3fd6e11649 100644 --- a/warehouse/integrations/snowflake/snowflake.go +++ b/warehouse/integrations/snowflake/snowflake.go @@ -905,7 +905,7 @@ func (sf *Snowflake) LoadUserTables(ctx context.Context) map[string]error { log.Infow("identifies temp table loaded") } - userColMap := sf.Uploader.GetTableSchemaInWarehouse(usersTable) + userColMap := sf.Uploader.GetTableSchema(usersTable) for colName := range userColMap { if colName == "ID" { continue diff --git a/warehouse/internal/mocks/utils/mock_uploader.go b/warehouse/internal/mocks/utils/mock_uploader.go index f4b16ecc7a..326a427019 100644 --- a/warehouse/internal/mocks/utils/mock_uploader.go +++ b/warehouse/internal/mocks/utils/mock_uploader.go @@ -147,7 +147,7 @@ func (mr *MockUploaderMockRecorder) GetTableSchemaInUpload(tableName any) *gomoc // GetTableSchemaInWarehouse mocks base method. func (m *MockUploader) GetTableSchemaInWarehouse(tableName string) model.TableSchema { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetTableSchemaInWarehouse", tableName) + ret := m.ctrl.Call(m, "GetTableSchema", tableName) ret0, _ := ret[0].(model.TableSchema) return ret0 } @@ -155,7 +155,7 @@ func (m *MockUploader) GetTableSchemaInWarehouse(tableName string) model.TableSc // GetTableSchemaInWarehouse indicates an expected call of GetTableSchemaInWarehouse. func (mr *MockUploaderMockRecorder) GetTableSchemaInWarehouse(tableName any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTableSchemaInWarehouse", reflect.TypeOf((*MockUploader)(nil).GetTableSchemaInWarehouse), tableName) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTableSchema", reflect.TypeOf((*MockUploader)(nil).GetTableSchemaInWarehouse), tableName) } // IsSchemaEmpty mocks base method. diff --git a/warehouse/router/state_export_data.go b/warehouse/router/state_export_data.go index 9d7e3be956..754a2f91b3 100644 --- a/warehouse/router/state_export_data.go +++ b/warehouse/router/state_export_data.go @@ -294,7 +294,7 @@ func (job *UploadJob) updateSchema(tName string) (alteredSchema bool, err error) return } - job.schemaHandle.UpdateWarehouseTableSchema(tName, tableSchemaDiff.UpdatedSchema) + job.schemaHandle.UpdateTableSchema(tName, tableSchemaDiff.UpdatedSchema) alteredSchema = true } return @@ -518,7 +518,7 @@ func (job *UploadJob) loadIdentityTables(populateHistoricIdentities bool) (loadE errorMap := map[string]error{tableName: err} return job.processLoadTableResponse(errorMap) } - job.schemaHandle.UpdateWarehouseTableSchema(tableName, tableSchemaDiff.UpdatedSchema) + job.schemaHandle.UpdateTableSchema(tableName, tableSchemaDiff.UpdatedSchema) status := model.TableUploadUpdatedSchema _ = job.tableUploadsRepo.Set(job.ctx, job.upload.ID, tableName, repo.TableUploadSetOptions{ diff --git a/warehouse/router/upload.go b/warehouse/router/upload.go index c30709f769..3e1ef3fc45 100644 --- a/warehouse/router/upload.go +++ b/warehouse/router/upload.go @@ -845,8 +845,8 @@ func (job *UploadJob) IsSchemaEmpty() bool { return job.schemaHandle.IsSchemaEmpty() } -func (job *UploadJob) GetTableSchemaInWarehouse(tableName string) model.TableSchema { - return job.schemaHandle.GetTableSchemaInWarehouse(tableName) +func (job *UploadJob) GetTableSchema(tableName string) model.TableSchema { + return job.schemaHandle.GetTableSchema(tableName) } func (job *UploadJob) GetTableSchemaInUpload(tableName string) model.TableSchema { diff --git a/warehouse/router/upload_test.go b/warehouse/router/upload_test.go index 4d77f52edb..fe01853abe 100644 --- a/warehouse/router/upload_test.go +++ b/warehouse/router/upload_test.go @@ -151,7 +151,7 @@ func TestColumnCountStat(t *testing.T) { statsFactory: statsStore, schemaHandle: &schema.Schema{}, // TODO use constructor } - j.schemaHandle.UpdateWarehouseTableSchema(tableName, model.TableSchema{ + j.schemaHandle.UpdateTableSchema(tableName, model.TableSchema{ "test-column-1": "string", "test-column-2": "string", "test-column-3": "string", diff --git a/warehouse/schema/schema.go b/warehouse/schema/schema.go index a3e4437d0e..8cb06e1a0e 100644 --- a/warehouse/schema/schema.go +++ b/warehouse/schema/schema.go @@ -385,13 +385,13 @@ func (sh *Schema) TableSchemaDiff(tableName string, tableSchema model.TableSchem return diff } -func (sh *Schema) GetTableSchemaInWarehouse(tableName string) model.TableSchema { +func (sh *Schema) GetTableSchema(tableName string) model.TableSchema { sh.localSchemaMu.RLock() defer sh.localSchemaMu.RUnlock() return sh.localSchema[tableName] } -func (sh *Schema) UpdateWarehouseTableSchema(tableName string, tableSchema model.TableSchema) { +func (sh *Schema) UpdateTableSchema(tableName string, tableSchema model.TableSchema) { sh.localSchemaMu.RLock() defer sh.localSchemaMu.RUnlock() if sh.localSchema == nil { diff --git a/warehouse/schema/schema_test.go b/warehouse/schema/schema_test.go index bbddb1e624..c11feffabb 100644 --- a/warehouse/schema/schema_test.go +++ b/warehouse/schema/schema_test.go @@ -200,7 +200,7 @@ func TestSchema_FetchSchemaFromWarehouse(t *testing.T) { ctx := context.Background() - require.Empty(t, s.GetTableSchemaInWarehouse(tableName)) + require.Empty(t, s.GetTableSchema(tableName)) require.True(t, s.IsSchemaEmpty()) _, err := s.FetchSchemaFromWarehouse(ctx, &mockRepo) @@ -209,15 +209,15 @@ func TestSchema_FetchSchemaFromWarehouse(t *testing.T) { } else { require.Error(t, err, fmt.Sprintf("got error %v, want error %v", err, tc.wantError)) } - require.Equal(t, tc.expectedSchema[tableName], s.GetTableSchemaInWarehouse(tableName)) + require.Equal(t, tc.expectedSchema[tableName], s.GetTableSchema(tableName)) require.Equal(t, len(tc.expectedSchema[tableName]), s.GetColumnsCountInSchema(tableName)) if len(tc.expectedSchema) > 0 { require.False(t, s.IsSchemaEmpty()) } else { require.True(t, s.IsSchemaEmpty()) } - s.UpdateWarehouseTableSchema(updatedTable, updatedSchema) - require.Equal(t, updatedSchema, s.GetTableSchemaInWarehouse(updatedTable)) + s.UpdateTableSchema(updatedTable, updatedSchema) + require.Equal(t, updatedSchema, s.GetTableSchema(updatedTable)) require.False(t, s.IsSchemaEmpty()) }) } diff --git a/warehouse/utils/uploader.go b/warehouse/utils/uploader.go index 0554986f0d..379d4b268e 100644 --- a/warehouse/utils/uploader.go +++ b/warehouse/utils/uploader.go @@ -11,7 +11,7 @@ type Uploader interface { IsSchemaEmpty() bool GetLocalSchema(ctx context.Context) (model.Schema, error) UpdateLocalSchema(ctx context.Context, schema model.Schema) error - GetTableSchemaInWarehouse(tableName string) model.TableSchema + GetTableSchema(tableName string) model.TableSchema GetTableSchemaInUpload(tableName string) model.TableSchema GetLoadFilesMetadata(ctx context.Context, options GetLoadFilesOptions) ([]LoadFile, error) GetSampleLoadFileLocation(ctx context.Context, tableName string) (string, error)