diff --git a/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go b/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go index fc8f710e63..4393395127 100644 --- a/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go +++ b/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go @@ -173,13 +173,15 @@ func (m *Manager) Upload(asyncDest *common.AsyncDestinationStruct) common.AsyncU discardsChannel, err := m.initializeChannelWithSchema(ctx, asyncDest.Destination.ID, &destConf, discardsTable(), discardsSchema()) if err != nil { - if errors.Is(err, errAuthz) || errors.Is(err, errBackoff) { - if errors.Is(err, errAuthz) { - m.setBackOff() - } + switch { + case errors.Is(err, errAuthz): + m.setBackOff() + return m.failedJobs(asyncDest, err.Error()) + case errors.Is(err, errBackoff): return m.failedJobs(asyncDest, err.Error()) + default: + return m.abortJobs(asyncDest, fmt.Errorf("failed to prepare discards channel: %w", err).Error()) } - return m.abortJobs(asyncDest, fmt.Errorf("failed to prepare discards channel: %w", err).Error()) } m.logger.Infon("Prepared discards channel") @@ -216,11 +218,15 @@ func (m *Manager) Upload(asyncDest *common.AsyncDestinationStruct) common.AsyncU for _, info := range uploadInfos { imInfo, discardImInfo, err := m.sendEventsToSnowpipe(ctx, asyncDest.Destination.ID, &destConf, info) if err != nil { - if errors.Is(err, errAuthz) || errors.Is(err, errBackoff) { + switch { + case errors.Is(err, errAuthz): shouldResetBackoff = false - if errors.Is(err, errAuthz) && !isBackoffSet { + if !isBackoffSet { + isBackoffSet = true m.setBackOff() } + case errors.Is(err, errBackoff): + shouldResetBackoff = false } m.logger.Warnn("Failed to send events to Snowpipe", logger.NewStringField("table", info.tableName), diff --git a/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming_test.go b/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming_test.go index fe50fa88ef..3826cb62c7 100644 --- a/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming_test.go +++ b/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming_test.go @@ -353,11 +353,11 @@ func TestSnowpipeStreaming(t *testing.T) { } managerCreatorCallCount := 0 sm.managerCreator = func(_ context.Context, _ whutils.ModelWarehouse, _ *config.Config, _ logger.Logger, _ stats.Stats) (manager.Manager, error) { - sm := snowflake.New(config.New(), logger.NOP, stats.NOP) + sf := snowflake.New(config.New(), logger.NOP, stats.NOP) managerCreatorCallCount++ - mockManager := newMockManager(sm) - mockManager.createSchemaErr = fmt.Errorf("failed to create schema") - return mockManager, nil + mm := newMockManager(sf) + mm.createSchemaErr = fmt.Errorf("failed to create schema") + return mm, nil } sm.config.backoff.initialInterval = config.SingleValueLoader(time.Second * 10) asyncDestStruct := &common.AsyncDestinationStruct{ @@ -390,9 +390,9 @@ func TestSnowpipeStreaming(t *testing.T) { require.True(t, sm.isInBackoff()) sm.managerCreator = func(_ context.Context, _ whutils.ModelWarehouse, _ *config.Config, _ logger.Logger, _ stats.Stats) (manager.Manager, error) { - sm := snowflake.New(config.New(), logger.NOP, stats.NOP) + sf := snowflake.New(config.New(), logger.NOP, stats.NOP) managerCreatorCallCount++ - return newMockManager(sm), nil + return newMockManager(sf), nil } sm.now = func() time.Time { return timeutil.Now().Add(time.Second * 50) @@ -415,10 +415,10 @@ func TestSnowpipeStreaming(t *testing.T) { }, } sm.managerCreator = func(_ context.Context, _ whutils.ModelWarehouse, _ *config.Config, _ logger.Logger, _ stats.Stats) (manager.Manager, error) { - sm := snowflake.New(config.New(), logger.NOP, stats.NOP) - mockManager := newMockManager(sm) - mockManager.createSchemaErr = fmt.Errorf("failed to create schema") - return mockManager, nil + sf := snowflake.New(config.New(), logger.NOP, stats.NOP) + mm := newMockManager(sf) + mm.createSchemaErr = fmt.Errorf("failed to create schema") + return mm, nil } output := sm.Upload(&common.AsyncDestinationStruct{ ImportingJobIDs: []int64{1},