Skip to content

Commit

Permalink
chore: refactor 3
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Dec 11, 2024
1 parent 499c9b0 commit 1c7ebd9
Show file tree
Hide file tree
Showing 14 changed files with 28 additions and 28 deletions.
4 changes: 2 additions & 2 deletions warehouse/integrations/azure-synapse/azure-synapse.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (as *AzureSynapse) loadTable(
tableSchemaInUpload,
)
previousColumnKeys := warehouseutils.SortColumnKeysFromColumnMap(
as.Uploader.GetTableSchemaInWarehouse(
as.Uploader.GetTableSchema(
tableName,
),
)
Expand Down Expand Up @@ -609,7 +609,7 @@ func (as *AzureSynapse) loadUserTables(ctx context.Context) (errorMap map[string
defer as.dropStagingTable(ctx, unionStagingTableName)
defer as.dropStagingTable(ctx, identifyStagingTable)

userColMap := as.Uploader.GetTableSchemaInWarehouse(warehouseutils.UsersTable)
userColMap := as.Uploader.GetTableSchema(warehouseutils.UsersTable)
var userColNames, firstValProps []string
for colName := range userColMap {
if colName == "id" {
Expand Down
4 changes: 2 additions & 2 deletions warehouse/integrations/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ func (bq *BigQuery) LoadUserTables(ctx context.Context) (errorMap map[string]err
return fmt.Sprintf("FIRST_VALUE(`%[1]s` IGNORE NULLS) OVER (PARTITION BY id ORDER BY received_at DESC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS `%[1]s`", column)
}

userColMap := bq.uploader.GetTableSchemaInWarehouse(warehouseutils.UsersTable)
userColMap := bq.uploader.GetTableSchema(warehouseutils.UsersTable)
var userColNames, firstValProps []string
for colName := range userColMap {
if colName == "id" {
Expand Down Expand Up @@ -705,7 +705,7 @@ func (bq *BigQuery) createAndLoadStagingUsersTable(ctx context.Context, stagingT
gcsRef.MaxBadRecords = 0
gcsRef.IgnoreUnknownValues = false

usersSchema := getTableSchema(bq.uploader.GetTableSchemaInWarehouse(warehouseutils.UsersTable))
usersSchema := getTableSchema(bq.uploader.GetTableSchema(warehouseutils.UsersTable))
metaData := &bigquery.TableMetadata{
Schema: usersSchema,
}
Expand Down
6 changes: 3 additions & 3 deletions warehouse/integrations/deltalake/deltalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ func (d *Deltalake) LoadTable(
tableName string,
) (*types.LoadTableStats, error) {
uploadTableSchema := d.Uploader.GetTableSchemaInUpload(tableName)
warehouseTableSchema := d.Uploader.GetTableSchemaInWarehouse(tableName)
warehouseTableSchema := d.Uploader.GetTableSchema(tableName)

loadTableStat, _, err := d.loadTable(
ctx,
Expand Down Expand Up @@ -980,9 +980,9 @@ func (d *Deltalake) hasAWSCredentials() bool {
func (d *Deltalake) LoadUserTables(ctx context.Context) map[string]error {
var (
identifiesSchemaInUpload = d.Uploader.GetTableSchemaInUpload(warehouseutils.IdentifiesTable)
identifiesSchemaInWarehouse = d.Uploader.GetTableSchemaInWarehouse(warehouseutils.IdentifiesTable)
identifiesSchemaInWarehouse = d.Uploader.GetTableSchema(warehouseutils.IdentifiesTable)
usersSchemaInUpload = d.Uploader.GetTableSchemaInUpload(warehouseutils.UsersTable)
usersSchemaInWarehouse = d.Uploader.GetTableSchemaInWarehouse(warehouseutils.UsersTable)
usersSchemaInWarehouse = d.Uploader.GetTableSchema(warehouseutils.UsersTable)
)

d.logger.Infow("started loading for identifies and users tables",
Expand Down
2 changes: 1 addition & 1 deletion warehouse/integrations/mssql/mssql.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ func (ms *MSSQL) loadUserTables(ctx context.Context) (errorMap map[string]error)
defer ms.dropStagingTable(ctx, unionStagingTableName)
defer ms.dropStagingTable(ctx, identifyStagingTable)

userColMap := ms.Uploader.GetTableSchemaInWarehouse(warehouseutils.UsersTable)
userColMap := ms.Uploader.GetTableSchema(warehouseutils.UsersTable)
var userColNames, firstValProps []string
for colName := range userColMap {
if colName == "id" {
Expand Down
2 changes: 1 addition & 1 deletion warehouse/integrations/postgres/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func (pg *Postgres) LoadUserTables(ctx context.Context) map[string]error {

identifiesSchemaInUpload := pg.Uploader.GetTableSchemaInUpload(warehouseutils.IdentifiesTable)
usersSchemaInUpload := pg.Uploader.GetTableSchemaInUpload(warehouseutils.UsersTable)
usersSchemaInWarehouse := pg.Uploader.GetTableSchemaInWarehouse(warehouseutils.UsersTable)
usersSchemaInWarehouse := pg.Uploader.GetTableSchema(warehouseutils.UsersTable)

var loadingError loadUsersTableResponse
_ = pg.DB.WithTx(ctx, func(tx *sqlmiddleware.Tx) error {
Expand Down
8 changes: 4 additions & 4 deletions warehouse/integrations/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ func (rs *Redshift) loadUserTables(ctx context.Context) map[string]error {
_, identifyStagingTable, err = rs.loadTable(ctx,
warehouseutils.IdentifiesTable,
rs.Uploader.GetTableSchemaInUpload(warehouseutils.IdentifiesTable),
rs.Uploader.GetTableSchemaInWarehouse(warehouseutils.IdentifiesTable),
rs.Uploader.GetTableSchema(warehouseutils.IdentifiesTable),
true,
)
if err != nil {
Expand All @@ -760,7 +760,7 @@ func (rs *Redshift) loadUserTables(ctx context.Context) map[string]error {
_, _, err := rs.loadTable(ctx,
warehouseutils.UsersTable,
rs.Uploader.GetTableSchemaInUpload(warehouseutils.UsersTable),
rs.Uploader.GetTableSchemaInWarehouse(warehouseutils.UsersTable),
rs.Uploader.GetTableSchema(warehouseutils.UsersTable),
false,
)
if err != nil {
Expand All @@ -775,7 +775,7 @@ func (rs *Redshift) loadUserTables(ctx context.Context) map[string]error {
}
}

userColMap := rs.Uploader.GetTableSchemaInWarehouse(warehouseutils.UsersTable)
userColMap := rs.Uploader.GetTableSchema(warehouseutils.UsersTable)
for colName := range userColMap {
// do not reference uuid in queries as it can be an autoincrement field set by segment compatible tables
if colName == "id" || colName == "user_id" || colName == "uuid" {
Expand Down Expand Up @@ -1311,7 +1311,7 @@ func (rs *Redshift) LoadTable(ctx context.Context, tableName string) (*types.Loa
ctx,
tableName,
rs.Uploader.GetTableSchemaInUpload(tableName),
rs.Uploader.GetTableSchemaInWarehouse(tableName),
rs.Uploader.GetTableSchema(tableName),
false,
)
return loadTableStat, err
Expand Down
2 changes: 1 addition & 1 deletion warehouse/integrations/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ func (sf *Snowflake) LoadUserTables(ctx context.Context) map[string]error {
log.Infow("identifies temp table loaded")
}

userColMap := sf.Uploader.GetTableSchemaInWarehouse(usersTable)
userColMap := sf.Uploader.GetTableSchema(usersTable)
for colName := range userColMap {
if colName == "ID" {
continue
Expand Down
4 changes: 2 additions & 2 deletions warehouse/internal/mocks/utils/mock_uploader.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions warehouse/router/state_export_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (job *UploadJob) updateSchema(tName string) (alteredSchema bool, err error)
return
}

job.schemaHandle.UpdateWarehouseTableSchema(tName, tableSchemaDiff.UpdatedSchema)
job.schemaHandle.UpdateTableSchema(tName, tableSchemaDiff.UpdatedSchema)
alteredSchema = true
}
return
Expand Down Expand Up @@ -518,7 +518,7 @@ func (job *UploadJob) loadIdentityTables(populateHistoricIdentities bool) (loadE
errorMap := map[string]error{tableName: err}
return job.processLoadTableResponse(errorMap)
}
job.schemaHandle.UpdateWarehouseTableSchema(tableName, tableSchemaDiff.UpdatedSchema)
job.schemaHandle.UpdateTableSchema(tableName, tableSchemaDiff.UpdatedSchema)

status := model.TableUploadUpdatedSchema
_ = job.tableUploadsRepo.Set(job.ctx, job.upload.ID, tableName, repo.TableUploadSetOptions{
Expand Down
4 changes: 2 additions & 2 deletions warehouse/router/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,8 +845,8 @@ func (job *UploadJob) IsSchemaEmpty() bool {
return job.schemaHandle.IsSchemaEmpty()
}

func (job *UploadJob) GetTableSchemaInWarehouse(tableName string) model.TableSchema {
return job.schemaHandle.GetTableSchemaInWarehouse(tableName)
func (job *UploadJob) GetTableSchema(tableName string) model.TableSchema {
return job.schemaHandle.GetTableSchema(tableName)
}

func (job *UploadJob) GetTableSchemaInUpload(tableName string) model.TableSchema {
Expand Down
2 changes: 1 addition & 1 deletion warehouse/router/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestColumnCountStat(t *testing.T) {
statsFactory: statsStore,
schemaHandle: &schema.Schema{}, // TODO use constructor
}
j.schemaHandle.UpdateWarehouseTableSchema(tableName, model.TableSchema{
j.schemaHandle.UpdateTableSchema(tableName, model.TableSchema{
"test-column-1": "string",
"test-column-2": "string",
"test-column-3": "string",
Expand Down
4 changes: 2 additions & 2 deletions warehouse/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,13 +385,13 @@ func (sh *Schema) TableSchemaDiff(tableName string, tableSchema model.TableSchem
return diff
}

func (sh *Schema) GetTableSchemaInWarehouse(tableName string) model.TableSchema {
func (sh *Schema) GetTableSchema(tableName string) model.TableSchema {
sh.localSchemaMu.RLock()
defer sh.localSchemaMu.RUnlock()
return sh.localSchema[tableName]
}

func (sh *Schema) UpdateWarehouseTableSchema(tableName string, tableSchema model.TableSchema) {
func (sh *Schema) UpdateTableSchema(tableName string, tableSchema model.TableSchema) {
sh.localSchemaMu.RLock()
defer sh.localSchemaMu.RUnlock()
if sh.localSchema == nil {
Expand Down
8 changes: 4 additions & 4 deletions warehouse/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func TestSchema_FetchSchemaFromWarehouse(t *testing.T) {

ctx := context.Background()

require.Empty(t, s.GetTableSchemaInWarehouse(tableName))
require.Empty(t, s.GetTableSchema(tableName))
require.True(t, s.IsSchemaEmpty())

_, err := s.FetchSchemaFromWarehouse(ctx, &mockRepo)
Expand All @@ -209,15 +209,15 @@ func TestSchema_FetchSchemaFromWarehouse(t *testing.T) {
} else {
require.Error(t, err, fmt.Sprintf("got error %v, want error %v", err, tc.wantError))
}
require.Equal(t, tc.expectedSchema[tableName], s.GetTableSchemaInWarehouse(tableName))
require.Equal(t, tc.expectedSchema[tableName], s.GetTableSchema(tableName))
require.Equal(t, len(tc.expectedSchema[tableName]), s.GetColumnsCountInSchema(tableName))
if len(tc.expectedSchema) > 0 {
require.False(t, s.IsSchemaEmpty())
} else {
require.True(t, s.IsSchemaEmpty())
}
s.UpdateWarehouseTableSchema(updatedTable, updatedSchema)
require.Equal(t, updatedSchema, s.GetTableSchemaInWarehouse(updatedTable))
s.UpdateTableSchema(updatedTable, updatedSchema)
require.Equal(t, updatedSchema, s.GetTableSchema(updatedTable))
require.False(t, s.IsSchemaEmpty())
})
}
Expand Down
2 changes: 1 addition & 1 deletion warehouse/utils/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Uploader interface {
IsSchemaEmpty() bool
GetLocalSchema(ctx context.Context) (model.Schema, error)
UpdateLocalSchema(ctx context.Context, schema model.Schema) error
GetTableSchemaInWarehouse(tableName string) model.TableSchema
GetTableSchema(tableName string) model.TableSchema
GetTableSchemaInUpload(tableName string) model.TableSchema
GetLoadFilesMetadata(ctx context.Context, options GetLoadFilesOptions) ([]LoadFile, error)
GetSampleLoadFileLocation(ctx context.Context, tableName string) (string, error)
Expand Down

0 comments on commit 1c7ebd9

Please sign in to comment.