Skip to content

Commit

Permalink
chore: review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhar-rudder committed Jan 8, 2025
1 parent f74c0de commit f8bfa49
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,15 @@ func (m *Manager) Upload(asyncDest *common.AsyncDestinationStruct) common.AsyncU

discardsChannel, err := m.initializeChannelWithSchema(ctx, asyncDest.Destination.ID, &destConf, discardsTable(), discardsSchema())
if err != nil {
if errors.Is(err, errAuthz) || errors.Is(err, errBackoff) {
if errors.Is(err, errAuthz) {
m.setBackOff()
}
switch {
case errors.Is(err, errAuthz):
m.setBackOff()
return m.failedJobs(asyncDest, err.Error())
case errors.Is(err, errBackoff):
return m.failedJobs(asyncDest, err.Error())

Check warning on line 181 in router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go#L180-L181

Added lines #L180 - L181 were not covered by tests
default:
return m.abortJobs(asyncDest, fmt.Errorf("failed to prepare discards channel: %w", err).Error())
}
return m.abortJobs(asyncDest, fmt.Errorf("failed to prepare discards channel: %w", err).Error())
}
m.logger.Infon("Prepared discards channel")

Expand Down Expand Up @@ -216,11 +218,15 @@ func (m *Manager) Upload(asyncDest *common.AsyncDestinationStruct) common.AsyncU
for _, info := range uploadInfos {
imInfo, discardImInfo, err := m.sendEventsToSnowpipe(ctx, asyncDest.Destination.ID, &destConf, info)
if err != nil {
if errors.Is(err, errAuthz) || errors.Is(err, errBackoff) {
switch {
case errors.Is(err, errAuthz):
shouldResetBackoff = false
if errors.Is(err, errAuthz) && !isBackoffSet {
if !isBackoffSet {
isBackoffSet = true
m.setBackOff()
}
case errors.Is(err, errBackoff):
shouldResetBackoff = false
}
m.logger.Warnn("Failed to send events to Snowpipe",
logger.NewStringField("table", info.tableName),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,11 +353,11 @@ func TestSnowpipeStreaming(t *testing.T) {
}
managerCreatorCallCount := 0
sm.managerCreator = func(_ context.Context, _ whutils.ModelWarehouse, _ *config.Config, _ logger.Logger, _ stats.Stats) (manager.Manager, error) {
sm := snowflake.New(config.New(), logger.NOP, stats.NOP)
sf := snowflake.New(config.New(), logger.NOP, stats.NOP)
managerCreatorCallCount++
mockManager := newMockManager(sm)
mockManager.createSchemaErr = fmt.Errorf("failed to create schema")
return mockManager, nil
mm := newMockManager(sf)
mm.createSchemaErr = fmt.Errorf("failed to create schema")
return mm, nil
}
sm.config.backoff.initialInterval = config.SingleValueLoader(time.Second * 10)
asyncDestStruct := &common.AsyncDestinationStruct{
Expand Down Expand Up @@ -390,9 +390,9 @@ func TestSnowpipeStreaming(t *testing.T) {
require.True(t, sm.isInBackoff())

sm.managerCreator = func(_ context.Context, _ whutils.ModelWarehouse, _ *config.Config, _ logger.Logger, _ stats.Stats) (manager.Manager, error) {
sm := snowflake.New(config.New(), logger.NOP, stats.NOP)
sf := snowflake.New(config.New(), logger.NOP, stats.NOP)
managerCreatorCallCount++
return newMockManager(sm), nil
return newMockManager(sf), nil
}
sm.now = func() time.Time {
return timeutil.Now().Add(time.Second * 50)
Expand All @@ -415,10 +415,10 @@ func TestSnowpipeStreaming(t *testing.T) {
},
}
sm.managerCreator = func(_ context.Context, _ whutils.ModelWarehouse, _ *config.Config, _ logger.Logger, _ stats.Stats) (manager.Manager, error) {
sm := snowflake.New(config.New(), logger.NOP, stats.NOP)
mockManager := newMockManager(sm)
mockManager.createSchemaErr = fmt.Errorf("failed to create schema")
return mockManager, nil
sf := snowflake.New(config.New(), logger.NOP, stats.NOP)
mm := newMockManager(sf)
mm.createSchemaErr = fmt.Errorf("failed to create schema")
return mm, nil
}
output := sm.Upload(&common.AsyncDestinationStruct{
ImportingJobIDs: []int64{1},
Expand Down

0 comments on commit f8bfa49

Please sign in to comment.