From 89bad4455b01032dbf5878ae3be9df92510704eb Mon Sep 17 00:00:00 2001 From: achettyiitr Date: Mon, 6 Jan 2025 02:45:37 +0530 Subject: [PATCH] fix: warehouse router tracker --- warehouse/internal/model/warehouse.go | 4 + warehouse/router/router.go | 43 +- .../router/testdata/sql/seed_tracker_test.sql | 23 - warehouse/router/tracker.go | 253 ++++----- warehouse/router/tracker_test.go | 521 +++++++++++------- 5 files changed, 482 insertions(+), 362 deletions(-) delete mode 100644 warehouse/router/testdata/sql/seed_tracker_test.sql diff --git a/warehouse/internal/model/warehouse.go b/warehouse/internal/model/warehouse.go index 3cfe020c0e..10e7c07377 100644 --- a/warehouse/internal/model/warehouse.go +++ b/warehouse/internal/model/warehouse.go @@ -62,3 +62,7 @@ func (w *Warehouse) GetPreferAppendSetting() bool { } return value } + +func (w *Warehouse) IsEnabled() bool { + return w.Source.Enabled && w.Destination.Enabled +} diff --git a/warehouse/router/router.go b/warehouse/router/router.go index b2e66d249d..20fb4da3b1 100644 --- a/warehouse/router/router.go +++ b/warehouse/router/router.go @@ -105,16 +105,17 @@ type Router struct { stagingFilesBatchSize config.ValueLoader[int] warehouseSyncFreqIgnore config.ValueLoader[bool] cronTrackerRetries config.ValueLoader[int64] + uploadBufferTimeInMin config.ValueLoader[time.Duration] } stats struct { - processingPendingJobsStat stats.Measurement - processingAvailableWorkersStat stats.Measurement - processingPickupLagStat stats.Measurement - processingPickupWaitTimeStat stats.Measurement - - schedulerWarehouseLengthStat stats.Measurement - schedulerTotalSchedulingTimeStat stats.Measurement + processingPendingJobsStat stats.Gauge + processingAvailableWorkersStat stats.Gauge + processingPickupLagStat stats.Timer + processingPickupWaitTimeStat stats.Timer + schedulerWarehouseLengthStat stats.Gauge + schedulerTotalSchedulingTimeStat stats.Timer + cronTrackerExecTimestamp stats.Gauge } } @@ -150,7 +151,7 @@ func New( r.tenantManager = tenantManager r.bcManager = bcManager r.destType = destType - r.now = time.Now + r.now = timeutil.Now r.triggerStore = triggerStore r.createJobMarkerMap = make(map[string]time.Time) r.createUploadAlways = createUploadAlways @@ -200,7 +201,7 @@ func (r *Router) Start(ctx context.Context) error { return nil })) g.Go(crash.NotifyWarehouse(func() error { - return r.CronTracker(gCtx) + return r.cronTracker(gCtx) })) return g.Wait() } @@ -711,14 +712,32 @@ func (r *Router) loadReloadableConfig(whName string) { r.config.enableJitterForSyncs = r.conf.GetReloadableBoolVar(false, "Warehouse.enableJitterForSyncs") r.config.warehouseSyncFreqIgnore = r.conf.GetReloadableBoolVar(false, "Warehouse.warehouseSyncFreqIgnore") r.config.cronTrackerRetries = r.conf.GetReloadableInt64Var(5, 1, "Warehouse.cronTrackerRetries") + r.config.uploadBufferTimeInMin = r.conf.GetReloadableDurationVar(180, time.Minute, "Warehouse.uploadBufferTimeInMin") } func (r *Router) loadStats() { - tags := stats.Tags{"destType": r.destType} + tags := stats.Tags{"module": moduleName, "destType": r.destType} r.stats.processingPendingJobsStat = r.statsFactory.NewTaggedStat("wh_processing_pending_jobs", stats.GaugeType, tags) r.stats.processingAvailableWorkersStat = r.statsFactory.NewTaggedStat("wh_processing_available_workers", stats.GaugeType, tags) r.stats.processingPickupLagStat = r.statsFactory.NewTaggedStat("wh_processing_pickup_lag", stats.TimerType, tags) r.stats.processingPickupWaitTimeStat = r.statsFactory.NewTaggedStat("wh_processing_pickup_wait_time", stats.TimerType, tags) - r.stats.schedulerWarehouseLengthStat = r.statsFactory.NewTaggedStat("wh_scheduler.warehouse_length", stats.GaugeType, tags) - r.stats.schedulerTotalSchedulingTimeStat = r.statsFactory.NewTaggedStat("wh_scheduler.total_scheduling_time", stats.TimerType, tags) + r.stats.schedulerWarehouseLengthStat = r.statsFactory.NewTaggedStat("wh_scheduler_warehouse_length", stats.GaugeType, tags) + r.stats.schedulerTotalSchedulingTimeStat = r.statsFactory.NewTaggedStat("wh_scheduler_total_scheduling_time", stats.TimerType, tags) + r.stats.cronTrackerExecTimestamp = r.statsFactory.NewTaggedStat("warehouse_cron_tracker_timestamp_seconds", stats.GaugeType, tags) +} + +func (r *Router) copyWarehouses() []model.Warehouse { + r.configSubscriberLock.RLock() + defer r.configSubscriberLock.RUnlock() + + warehouses := make([]model.Warehouse, len(r.warehouses)) + copy(warehouses, r.warehouses) + return warehouses +} + +func (r *Router) getNowSQL() string { + if r.nowSQL != "" { + return r.nowSQL + } + return "NOW()" } diff --git a/warehouse/router/testdata/sql/seed_tracker_test.sql b/warehouse/router/testdata/sql/seed_tracker_test.sql deleted file mode 100644 index 5e41a5ed1a..0000000000 --- a/warehouse/router/testdata/sql/seed_tracker_test.sql +++ /dev/null @@ -1,23 +0,0 @@ -BEGIN; -INSERT INTO wh_staging_files (id, location, schema, source_id, destination_id, status, total_events, first_event_at, - last_event_at, created_at, updated_at, metadata) -VALUES (1, 'a.json.gz', '{}', 'test-sourceID', 'test-destinationID', 'succeeded', 1, NOW(), NOW(), - '2022-12-06 15:23:37.100685', NOW(), '{}'), - (2, 'a.json.gz', '{}', 'test-sourceID', 'test-destinationID', 'succeeded', 1, NOW(), NOW(), - '2022-12-06 15:24:37.100685', NOW(), '{}'), - (3, 'a.json.gz', '{}', 'test-sourceID', 'test-destinationID', 'succeeded', 1, NOW(), NOW(), - '2022-12-06 15:25:37.100685', NOW(), '{}'), - (4, 'a.json.gz', '{}', 'test-sourceID', 'test-destinationID', 'succeeded', 1, NOW(), NOW(), - '2022-12-06 15:26:37.100685', NOW(), '{}'), - (5, 'a.json.gz', '{}', 'test-sourceID', 'test-destinationID', 'succeeded', 1, NOW(), NOW(), - '2022-12-06 15:27:37.100685', NOW(), '{}'), - (6, 'a.json.gz', '{}', 'test-sourceID', 'test-destinationID-1', 'succeeded', 1, NOW(), NOW(), - '2022-12-06 15:27:37.100685', NOW(), '{}'); -INSERT INTO wh_uploads(id, source_id, namespace, destination_id, destination_type, start_staging_file_id, - end_staging_file_id, start_load_file_id, end_load_file_id, status, schema, error, first_event_at, - last_event_at, last_exec_at, timings, created_at, updated_at, metadata, - in_progress, workspace_id) -VALUES (1, 'test-sourceID', 'test-namespace', 'test-destinationID', 'POSTGRES', 0, 0, 0, 0, 'exported_data', '{}', - '{}', NULL, NULL, NULL, NULL, '2022-12-06 15:30:00', '2022-12-06 15:45:00', '{}', TRUE, - 'test-workspaceID'); -COMMIT; diff --git a/warehouse/router/tracker.go b/warehouse/router/tracker.go index 9b00dd06fc..03d6402061 100644 --- a/warehouse/router/tracker.go +++ b/warehouse/router/tracker.go @@ -9,197 +9,170 @@ import ( "time" "github.com/cenkalti/backoff/v4" - - "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/stats" - obskit "github.com/rudderlabs/rudder-observability-kit/go/labels" "github.com/rudderlabs/rudder-server/utils/misc" - "github.com/rudderlabs/rudder-server/utils/timeutil" "github.com/rudderlabs/rudder-server/warehouse/internal/model" - warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" + whutils "github.com/rudderlabs/rudder-server/warehouse/utils" ) -// CronTracker Track the status of the staging file whether it has reached the terminal state or not for every warehouse +// cronTracker Track the status of the staging file whether it has reached the terminal state or not for every warehouse // we pick the staging file which is oldest within the range NOW() - 2 * syncFrequency and NOW() - 3 * syncFrequency -func (r *Router) CronTracker(ctx context.Context) error { - cronTrackerExecTimestamp := r.statsFactory.NewTaggedStat("warehouse_cron_tracker_timestamp_seconds", stats.GaugeType, stats.Tags{ - "module": moduleName, - "destType": r.destType, - }) +// and checks if the corresponding upload has reached the terminal state or not. +// If the upload has not reached the terminal state, then we send a gauge metric with value 1 else 0 +func (r *Router) cronTracker(ctx context.Context) error { for { - execTime := time.Now() - cronTrackerExecTimestamp.Gauge(execTime.Unix()) - - r.configSubscriberLock.RLock() - warehouses := append([]model.Warehouse{}, r.warehouses...) - r.configSubscriberLock.RUnlock() - - for _, warehouse := range warehouses { - b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewExponentialBackOff(), uint64(r.config.cronTrackerRetries.Load())), ctx) - err := backoff.Retry(func() error { - return r.Track(ctx, &warehouse, r.conf) - }, b) - if err != nil { - r.logger.Errorn( - "cron tracker failed for", - obskit.SourceID(warehouse.Source.ID), - obskit.DestinationID(warehouse.Destination.ID), - obskit.Error(err), - ) - break + r.stats.cronTrackerExecTimestamp.Gauge(execTime.Unix()) + + for _, warehouse := range r.copyWarehouses() { + if err := r.retryTrackSync(ctx, &warehouse); err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + return fmt.Errorf("cron tracker: %w", err) } } nextExecTime := execTime.Add(r.config.uploadStatusTrackFrequency) select { case <-ctx.Done(): - r.logger.Infon("context is cancelled, stopped running tracking") + r.logger.Infon("Context cancelled. Exiting cron tracker") return nil case <-time.After(time.Until(nextExecTime)): } } } -// Track tracks the status of the warehouse uploads for the corresponding cases: -// 1. Staging files is not picked. -// 2. Upload job is struck -func (r *Router) Track( - ctx context.Context, - warehouse *model.Warehouse, - config *config.Config, -) error { - var ( - createdAt sql.NullTime - exists bool - syncFrequency = "1440" - now = timeutil.Now - nowSQL = "NOW()" - failedStatusRegex = "%_failed" - timeWindow = config.GetDuration("Warehouse.uploadBufferTimeInMin", 180, time.Minute) - source = warehouse.Source - destination = warehouse.Destination - ) - - if r.nowSQL != "" { - nowSQL = r.nowSQL +func (r *Router) retryTrackSync(ctx context.Context, warehouse *model.Warehouse) error { + o := func() error { + return r.trackSync(ctx, warehouse) } - if r.now != nil { - now = r.now - } - - trackUploadMissingStat := r.statsFactory.NewTaggedStat("warehouse_track_upload_missing", stats.GaugeType, stats.Tags{ - "workspaceId": warehouse.WorkspaceID, - "module": moduleName, - "destType": r.destType, - "sourceId": source.ID, - "destinationId": destination.ID, - "warehouseID": misc.GetTagName( - destination.ID, - source.Name, - destination.Name, - misc.TailTruncateStr(source.ID, 6)), - }) - trackUploadMissingStat.Gauge(0) + b := backoff.WithContext( + backoff.WithMaxRetries( + backoff.NewExponentialBackOff(), + uint64(r.config.cronTrackerRetries.Load()), + ), + ctx, + ) + return backoff.Retry(o, b) +} - if !source.Enabled || !destination.Enabled { +func (r *Router) trackSync(ctx context.Context, warehouse *model.Warehouse) error { + if !warehouse.IsEnabled() || r.isWithinExcludeWindow(warehouse) { return nil } - excludeWindow := warehouse.GetMapDestinationConfig(model.ExcludeWindowSetting) - excludeWindowStartTime, excludeWindowEndTime := excludeWindowStartEndTimes(excludeWindow) - if checkCurrentTimeExistsInExcludeWindow(now(), excludeWindowStartTime, excludeWindowEndTime) { + createdAt, err := r.getOldestStagingFile(ctx, warehouse) + if err != nil { + return err + } + if createdAt.IsZero() { return nil } - if sf := warehouse.GetStringDestinationConfig(r.conf, model.SyncFrequencySetting); sf != "" { - syncFrequency = sf - } - if value, err := strconv.Atoi(syncFrequency); err == nil { - timeWindow += time.Duration(value) * time.Minute + exists, err := r.checkUploadStatus(ctx, warehouse, createdAt) + if err != nil { + return err } + r.recordUploadMissingMetric(warehouse, exists) + return nil +} + +func (r *Router) isWithinExcludeWindow(warehouse *model.Warehouse) bool { + excludeWindow := warehouse.GetMapDestinationConfig(model.ExcludeWindowSetting) + startTime, endTime := excludeWindowStartEndTimes(excludeWindow) + return checkCurrentTimeExistsInExcludeWindow(r.now(), startTime, endTime) +} + +func (r *Router) getOldestStagingFile(ctx context.Context, warehouse *model.Warehouse) (time.Time, error) { + nowSQL := r.getNowSQL() + timeWindow := r.calculateTimeWindow(warehouse) + query := fmt.Sprintf(` - SELECT - created_at - FROM - %[1]s - WHERE - source_id = $1 AND - destination_id = $2 AND - created_at > %[2]s - $3 * INTERVAL '1 MIN' AND - created_at < %[2]s - $4 * INTERVAL '1 MIN' - ORDER BY - id DESC - LIMIT - 1; - `, - warehouseutils.WarehouseStagingFilesTable, + SELECT created_at + FROM `+whutils.WarehouseStagingFilesTable+` + WHERE source_id = $1 + AND destination_id = $2 + AND created_at > %[1]s - $3 * INTERVAL '1 MIN' + AND created_at < %[1]s - $4 * INTERVAL '1 MIN' + ORDER BY id DESC + LIMIT 1;`, nowSQL, ) - queryArgs := []interface{}{ - source.ID, - destination.ID, + queryArgs := []any{ + warehouse.Source.ID, + warehouse.Destination.ID, 2 * timeWindow / time.Minute, timeWindow / time.Minute, } + var createdAt sql.NullTime err := r.db.QueryRowContext(ctx, query, queryArgs...).Scan(&createdAt) - if errors.Is(err, sql.ErrNoRows) { - return nil + if err != nil && errors.Is(err, sql.ErrNoRows) { + return time.Time{}, nil } if err != nil { - return fmt.Errorf("fetching last upload time for source: %s and destination: %s: %w", source.ID, destination.ID, err) + return time.Time{}, fmt.Errorf("fetching oldest staging file for source %s and destination %s: %w", + warehouse.Source.ID, warehouse.Destination.ID, err) } - if !createdAt.Valid { - return fmt.Errorf("invalid last upload time for source: %s and destination: %s", source.ID, destination.ID) + return time.Time{}, fmt.Errorf("invalid created_at time for source %s and destination %s", + warehouse.Source.ID, warehouse.Destination.ID) + } + return createdAt.Time, nil +} + +func (r *Router) calculateTimeWindow(warehouse *model.Warehouse) time.Duration { + timeWindow := r.config.uploadBufferTimeInMin.Load() + syncFrequency := warehouse.GetStringDestinationConfig(r.conf, model.SyncFrequencySetting) + if syncFrequency != "" { + if value, err := strconv.Atoi(syncFrequency); err == nil { + timeWindow += time.Duration(value) * time.Minute + } } + return timeWindow +} - query = ` - SELECT - EXISTS ( - SELECT - 1 - FROM - ` + warehouseutils.WarehouseUploadsTable + ` - WHERE - source_id = $1 AND - destination_id = $2 AND - ( - status = $3 - OR status = $4 - OR status LIKE $5 - ) AND - updated_at > $6 - ); - ` - queryArgs = []interface{}{ - source.ID, - destination.ID, +func (r *Router) checkUploadStatus(ctx context.Context, warehouse *model.Warehouse, createdAt time.Time) (bool, error) { + query := ` + SELECT EXISTS ( + SELECT 1 + FROM ` + whutils.WarehouseUploadsTable + ` + WHERE source_id = $1 AND destination_id = $2 AND + (status = $3 OR status = $4 OR status LIKE $5) AND + updated_at > $6 + );` + queryArgs := []any{ + warehouse.Source.ID, + warehouse.Destination.ID, model.ExportedData, model.Aborted, - failedStatusRegex, - createdAt.Time.Format(misc.RFC3339Milli), + "%_failed", + createdAt.Format(misc.RFC3339Milli), } - err = r.db.QueryRowContext(ctx, query, queryArgs...).Scan(&exists) + var exists bool + err := r.db.QueryRowContext(ctx, query, queryArgs...).Scan(&exists) if err != nil && !errors.Is(err, sql.ErrNoRows) { - return fmt.Errorf("fetching last upload status for source: %s and destination: %s: %w", source.ID, destination.ID, err) + return false, fmt.Errorf("checking upload status for source %s and destination %s: %w", + warehouse.Source.ID, warehouse.Destination.ID, err) } + return exists, nil +} - if !exists { - r.logger.Warnn("pending staging files not picked", - obskit.SourceID(source.ID), - obskit.SourceType(source.SourceDefinition.Name), - obskit.DestinationID(destination.ID), - obskit.DestinationType(destination.DestinationDefinition.Name), - obskit.WorkspaceID(warehouse.WorkspaceID), - ) - - trackUploadMissingStat.Gauge(1) +func (r *Router) recordUploadMissingMetric(warehouse *model.Warehouse, exists bool) { + metric := r.statsFactory.NewTaggedStat("warehouse_track_upload_missing", stats.GaugeType, stats.Tags{ + "module": moduleName, + "workspaceId": warehouse.WorkspaceID, + "destType": r.destType, + "sourceId": warehouse.Source.ID, + "destinationId": warehouse.Destination.ID, + }) + if exists { + metric.Gauge(0) + } else { + metric.Gauge(1) } - - return nil } diff --git a/warehouse/router/tracker_test.go b/warehouse/router/tracker_test.go index 5de2942076..755862e28e 100644 --- a/warehouse/router/tracker_test.go +++ b/warehouse/router/tracker_test.go @@ -2,230 +2,377 @@ package router import ( "context" - "errors" - "os" + "fmt" "testing" "time" "github.com/ory/dockertest/v3" - "github.com/stretchr/testify/require" - "go.uber.org/mock/gomock" - "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" - "github.com/rudderlabs/rudder-go-kit/logger/mock_logger" "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-go-kit/stats/memstats" "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres" + "github.com/stretchr/testify/require" backendconfig "github.com/rudderlabs/rudder-server/backend-config" migrator "github.com/rudderlabs/rudder-server/services/sql-migrator" - "github.com/rudderlabs/rudder-server/utils/misc" - "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" + sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" "github.com/rudderlabs/rudder-server/warehouse/internal/model" - warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" + "github.com/rudderlabs/rudder-server/warehouse/internal/repo" + whutils "github.com/rudderlabs/rudder-server/warehouse/utils" ) -func TestRouter_Track(t *testing.T) { - var ( - workspaceID = "test-workspaceID" - sourceID = "test-sourceID" - sourceName = "test-sourceName" - destID = "test-destinationID" - destName = "test-destinationName" - destType = warehouseutils.POSTGRES - ) - - testcases := []struct { - name string - destID string - destDisabled bool - wantErr error - missing bool - NowSQL string - exclusionWindow map[string]any - uploadBufferTime string - }{ - { - name: "unknown destination", - destID: "unknown-destination", - }, - { - name: "disabled destination", - destID: destID, - destDisabled: true, - }, - { - name: "successful upload exists", - destID: destID, - missing: false, - }, - { - name: "successful upload exists with upload buffer time", - destID: destID, - missing: false, - uploadBufferTime: "0m", - }, - { - name: "exclusion window", - destID: destID, - missing: false, - exclusionWindow: map[string]any{ - "excludeWindowStartTime": "05:09", - "excludeWindowEndTime": "09:07", +func TestRouter_CronTrack(t *testing.T) { + t.Run("source / destination disabled", func(t *testing.T) { + ctx := context.Background() + + statsStore, err := memstats.New() + require.NoError(t, err) + + warehouse := model.Warehouse{ + WorkspaceID: "test-workspaceID", + Source: backendconfig.SourceT{ + ID: "test-sourceID", + Name: "test-sourceName", + Enabled: false, }, - }, - { - name: "no successful upload exists", - destID: "test-destinationID-1", - missing: true, - }, - { - name: "throw error while fetching last upload time", - destID: destID, - missing: false, - NowSQL: "ABC", - wantErr: errors.New("fetching last upload time for source: test-sourceID and destination: test-destinationID: pq: column \"abc\" does not exist"), - }, - } - - for _, tc := range testcases { - tc := tc - - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - pool, err := dockertest.NewPool("") - require.NoError(t, err) - - pgResource, err := postgres.Setup(pool, t) - require.NoError(t, err) - - t.Log("db:", pgResource.DBDsn) - - err = (&migrator.Migrator{ - Handle: pgResource.DB, - MigrationsTable: "wh_schema_migrations", - }).Migrate("warehouse") - require.NoError(t, err) - - sqlStatement, err := os.ReadFile("testdata/sql/seed_tracker_test.sql") - require.NoError(t, err) - - _, err = pgResource.DB.Exec(string(sqlStatement)) - require.NoError(t, err) - - statsStore, err := memstats.New() - require.NoError(t, err) - - ctx := context.Background() - nowSQL := "'2022-12-06 15:40:00'::timestamp" - - now, err := time.Parse(misc.RFC3339Milli, "2022-12-06T06:19:00.169Z") - require.NoError(t, err) - - conf := config.New() - if tc.uploadBufferTime != "" { - conf.Set("Warehouse.uploadBufferTimeInMin", tc.uploadBufferTime) - } else { - conf.Set("Warehouse.uploadBufferTimeInMin", 0) - } - - warehouse := model.Warehouse{ - WorkspaceID: workspaceID, - Source: backendconfig.SourceT{ - ID: sourceID, - Name: sourceName, - Enabled: true, + Destination: backendconfig.DestinationT{ + ID: "test-destinationID", + Name: "test-destinationName", + Enabled: false, + Config: map[string]any{ + "syncFrequency": "30", }, - Destination: backendconfig.DestinationT{ - ID: tc.destID, - Name: destName, - Enabled: !tc.destDisabled, - Config: map[string]any{ - "syncFrequency": "10", - "excludeWindow": tc.exclusionWindow, + }, + } + + r := Router{ + conf: config.New(), + destType: whutils.POSTGRES, + now: time.Now, + statsFactory: statsStore, + logger: logger.NOP, + } + + require.NoError(t, r.trackSync(ctx, &warehouse)) + require.Nil(t, statsStore.Get("warehouse_track_upload_missing", stats.Tags{ + "module": moduleName, + "workspaceId": warehouse.WorkspaceID, + "destType": r.destType, + "sourceId": warehouse.Source.ID, + "destinationId": warehouse.Destination.ID, + })) + }) + t.Run("exclusion window", func(t *testing.T) { + ctx := context.Background() + + statsStore, err := memstats.New() + require.NoError(t, err) + + warehouse := model.Warehouse{ + WorkspaceID: "test-workspaceID", + Source: backendconfig.SourceT{ + ID: "test-sourceID", + Name: "test-sourceName", + Enabled: true, + }, + Destination: backendconfig.DestinationT{ + ID: "test-destinationID", + Name: "test-destinationName", + Enabled: true, + Config: map[string]any{ + "syncFrequency": "30", + "excludeWindow": map[string]any{ + "excludeWindowStartTime": "05:09", + "excludeWindowEndTime": "09:07", }, }, - } + }, + } + + r := Router{ + conf: config.New(), + destType: whutils.POSTGRES, + now: func() time.Time { + return time.Date(2023, 1, 1, 6, 19, 0, 0, time.UTC) + }, + statsFactory: statsStore, + logger: logger.NOP, + } + + require.NoError(t, r.trackSync(ctx, &warehouse)) + require.Nil(t, statsStore.Get("warehouse_track_upload_missing", stats.Tags{ + "module": moduleName, + "workspaceId": warehouse.WorkspaceID, + "destType": r.destType, + "sourceId": warehouse.Source.ID, + "destinationId": warehouse.Destination.ID, + })) + }) + t.Run("no staging files present", func(t *testing.T) { + db, ctx := setupDB(t), context.Background() - if tc.NowSQL != "" { - nowSQL = tc.NowSQL - } + statsStore, err := memstats.New() + require.NoError(t, err) - handle := Router{ - conf: config.New(), - destType: destType, - now: func() time.Time { - return now + warehouse := model.Warehouse{ + WorkspaceID: "test-workspaceID", + Source: backendconfig.SourceT{ + ID: "test-sourceID", + Name: "test-sourceName", + Enabled: true, + }, + Destination: backendconfig.DestinationT{ + ID: "test-destinationID", + Name: "test-destinationName", + Enabled: true, + Config: map[string]any{ + "syncFrequency": "30", }, - nowSQL: nowSQL, - statsFactory: statsStore, - db: sqlquerywrapper.New(pgResource.DB), - logger: logger.NOP, - } - - err = handle.Track(ctx, &warehouse, conf) - if tc.wantErr != nil { - require.EqualError(t, err, tc.wantErr.Error()) - return - } - require.NoError(t, err) - - m := statsStore.Get("warehouse_track_upload_missing", stats.Tags{ - "module": moduleName, - "workspaceId": warehouse.WorkspaceID, - "destType": handle.destType, - "sourceId": warehouse.Source.ID, - "destinationId": warehouse.Destination.ID, - "warehouseID": misc.GetTagName( - warehouse.Destination.ID, - warehouse.Source.Name, - warehouse.Destination.Name, - misc.TailTruncateStr(warehouse.Source.ID, 6)), + }, + } + + r := Router{ + conf: config.New(), + destType: whutils.POSTGRES, + now: time.Now, + statsFactory: statsStore, + db: db, + logger: logger.NOP, + } + r.config.uploadBufferTimeInMin = config.SingleValueLoader(30 * time.Minute) + + require.NoError(t, r.trackSync(ctx, &warehouse)) + require.Nil(t, statsStore.Get("warehouse_track_upload_missing", stats.Tags{ + "module": moduleName, + "workspaceId": warehouse.WorkspaceID, + "destType": r.destType, + "sourceId": warehouse.Source.ID, + "destinationId": warehouse.Destination.ID, + })) + }) + t.Run("staging files without missing uploads", func(t *testing.T) { + testCases := []struct { + name string + status string + }{ + {name: "ExportedData", status: model.ExportedData}, + {name: "Aborted", status: model.Aborted}, + {name: "Failed", status: model.ExportingDataFailed}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + db, ctx := setupDB(t), context.Background() + + now := time.Date(2023, 1, 1, 6, 19, 0, 0, time.UTC) + nowSQL := fmt.Sprintf("'%s'::timestamp", now.Format(time.DateTime)) + + repoStaging := repo.NewStagingFiles(db, repo.WithNow(func() time.Time { + return now.Add(-time.Hour*1 - time.Minute*30) + })) + repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time { + return now.Add(-time.Hour * 1) + })) + + stagingID, err := repoStaging.Insert(ctx, &model.StagingFileWithSchema{ + StagingFile: model.StagingFile{ + WorkspaceID: "test-workspaceID", + SourceID: "test-sourceID", + DestinationID: "test-destinationID", + }, + }) + require.NoError(t, err) + + _, err = repoUpload.CreateWithStagingFiles(ctx, model.Upload{ + WorkspaceID: "test-workspaceID", + Namespace: "namespace", + SourceID: "test-sourceID", + DestinationID: "test-destinationID", + DestinationType: whutils.POSTGRES, + Status: tc.status, + }, []*model.StagingFile{{ + ID: stagingID, + WorkspaceID: "test-workspaceID", + SourceID: "test-sourceID", + DestinationID: "test-destinationID", + }}) + require.NoError(t, err) + + statsStore, err := memstats.New() + require.NoError(t, err) + + warehouse := model.Warehouse{ + WorkspaceID: "test-workspaceID", + Source: backendconfig.SourceT{ + ID: "test-sourceID", + Name: "test-sourceName", + Enabled: true, + }, + Destination: backendconfig.DestinationT{ + ID: "test-destinationID", + Name: whutils.POSTGRES, + Enabled: true, + Config: map[string]any{ + "syncFrequency": "30", + }, + }, + } + + r := Router{ + conf: config.New(), + destType: whutils.POSTGRES, + now: func() time.Time { + return now + }, + nowSQL: nowSQL, + statsFactory: statsStore, + db: db, + logger: logger.NOP, + } + r.config.uploadBufferTimeInMin = config.SingleValueLoader(30 * time.Minute) + + require.NoError(t, r.trackSync(ctx, &warehouse)) + + uploadMissingStat := statsStore.Get("warehouse_track_upload_missing", stats.Tags{ + "module": moduleName, + "workspaceId": warehouse.WorkspaceID, + "destType": r.destType, + "sourceId": warehouse.Source.ID, + "destinationId": warehouse.Destination.ID, + }) + require.NotNil(t, uploadMissingStat) + require.EqualValues(t, 0, uploadMissingStat.LastValue()) }) + } + }) + t.Run("staging files with missing uploads", func(t *testing.T) { + db, ctx := setupDB(t), context.Background() + + now := time.Date(2023, 1, 1, 6, 19, 0, 0, time.UTC) + nowSQL := fmt.Sprintf("'%s'::timestamp", now.Format(time.DateTime)) + + repoStaging := repo.NewStagingFiles(db, repo.WithNow(func() time.Time { + return now.Add(-time.Hour*1 - time.Minute*30) + })) - if tc.missing { - require.EqualValues(t, m.LastValue(), 1) - } else { - require.EqualValues(t, m.LastValue(), 0) - } + _, err := repoStaging.Insert(ctx, &model.StagingFileWithSchema{ + StagingFile: model.StagingFile{ + WorkspaceID: "test-workspaceID", + SourceID: "test-sourceID", + DestinationID: "test-destinationID", + }, }) - } -} + require.NoError(t, err) + + statsStore, err := memstats.New() + require.NoError(t, err) + + warehouse := model.Warehouse{ + WorkspaceID: "test-workspaceID", + Source: backendconfig.SourceT{ + ID: "test-sourceID", + Name: "test-sourceName", + Enabled: true, + }, + Destination: backendconfig.DestinationT{ + ID: "test-destinationID", + Name: whutils.POSTGRES, + Enabled: true, + Config: map[string]any{ + "syncFrequency": "30", + }, + }, + } -func TestRouter_CronTracker(t *testing.T) { + r := Router{ + conf: config.New(), + destType: whutils.POSTGRES, + now: func() time.Time { + return now + }, + nowSQL: nowSQL, + statsFactory: statsStore, + db: db, + logger: logger.NOP, + } + r.config.uploadBufferTimeInMin = config.SingleValueLoader(30 * time.Minute) + + require.NoError(t, r.trackSync(ctx, &warehouse)) + + uploadMissingStat := statsStore.Get("warehouse_track_upload_missing", stats.Tags{ + "module": moduleName, + "workspaceId": warehouse.WorkspaceID, + "destType": r.destType, + "sourceId": warehouse.Source.ID, + "destinationId": warehouse.Destination.ID, + }) + require.NotNil(t, uploadMissingStat) + require.EqualValues(t, 1, uploadMissingStat.LastValue()) + }) t.Run("context cancelled", func(t *testing.T) { - t.Parallel() + db, ctx := setupDB(t), context.Background() - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) cancel() - mockCtrl := gomock.NewController(t) - mockLogger := mock_logger.NewMockLogger(mockCtrl) - statsStore, err := memstats.New() require.NoError(t, err) + warehouse := model.Warehouse{ + WorkspaceID: "test-workspaceID", + Source: backendconfig.SourceT{ + ID: "test-sourceID", + Name: "test-sourceName", + Enabled: true, + }, + Destination: backendconfig.DestinationT{ + ID: "test-destinationID", + Name: "test-destinationName", + Enabled: true, + Config: map[string]any{ + "syncFrequency": "30", + }, + }, + } + r := Router{ - logger: mockLogger, + conf: config.New(), + destType: whutils.POSTGRES, + now: time.Now, statsFactory: statsStore, - destType: warehouseutils.POSTGRES, + db: db, + logger: logger.NOP, + warehouses: []model.Warehouse{warehouse}, } + r.config.uploadBufferTimeInMin = config.SingleValueLoader(30 * time.Minute) + r.config.cronTrackerRetries = config.SingleValueLoader(int64(5)) + r.stats.cronTrackerExecTimestamp = statsStore.NewTaggedStat("warehouse_cron_tracker_timestamp_seconds", stats.GaugeType, stats.Tags{"module": moduleName, "destType": r.destType}) + + require.NoError(t, r.cronTracker(ctx)) + require.Nil(t, statsStore.Get("warehouse_track_upload_missing", stats.Tags{ + "module": moduleName, + "workspaceId": warehouse.WorkspaceID, + "destType": r.destType, + "sourceId": warehouse.Source.ID, + "destinationId": warehouse.Destination.ID, + })) + }) +} - mockLogger.EXPECT().Infon("context is cancelled, stopped running tracking").Times(1) +func setupDB(t testing.TB) *sqlmiddleware.DB { + t.Helper() - executionTime := time.Now().Unix() - err = r.CronTracker(ctx) - require.NoError(t, err) + pool, err := dockertest.NewPool("") + require.NoError(t, err) - m := statsStore.GetByName("warehouse_cron_tracker_timestamp_seconds") - require.Equal(t, len(m), 1) - require.Equal(t, m[0].Name, "warehouse_cron_tracker_timestamp_seconds") - require.Equal(t, m[0].Tags, stats.Tags{ - "module": moduleName, - "destType": warehouseutils.POSTGRES, - }) - require.GreaterOrEqual(t, m[0].Value, float64(executionTime)) - }) + pgResource, err := postgres.Setup(pool, t) + require.NoError(t, err) + + require.NoError(t, (&migrator.Migrator{ + Handle: pgResource.DB, + MigrationsTable: "wh_schema_migrations", + }).Migrate("warehouse")) + + return sqlmiddleware.New(pgResource.DB) }