Skip to content

Commit

Permalink
fix: warehouse router tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Jan 5, 2025
1 parent 18f4bdf commit 89bad44
Show file tree
Hide file tree
Showing 5 changed files with 482 additions and 362 deletions.
4 changes: 4 additions & 0 deletions warehouse/internal/model/warehouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,7 @@ func (w *Warehouse) GetPreferAppendSetting() bool {
}
return value
}

func (w *Warehouse) IsEnabled() bool {
return w.Source.Enabled && w.Destination.Enabled
}
43 changes: 31 additions & 12 deletions warehouse/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,17 @@ type Router struct {
stagingFilesBatchSize config.ValueLoader[int]
warehouseSyncFreqIgnore config.ValueLoader[bool]
cronTrackerRetries config.ValueLoader[int64]
uploadBufferTimeInMin config.ValueLoader[time.Duration]
}

stats struct {
processingPendingJobsStat stats.Measurement
processingAvailableWorkersStat stats.Measurement
processingPickupLagStat stats.Measurement
processingPickupWaitTimeStat stats.Measurement

schedulerWarehouseLengthStat stats.Measurement
schedulerTotalSchedulingTimeStat stats.Measurement
processingPendingJobsStat stats.Gauge
processingAvailableWorkersStat stats.Gauge
processingPickupLagStat stats.Timer
processingPickupWaitTimeStat stats.Timer
schedulerWarehouseLengthStat stats.Gauge
schedulerTotalSchedulingTimeStat stats.Timer
cronTrackerExecTimestamp stats.Gauge
}
}

Expand Down Expand Up @@ -150,7 +151,7 @@ func New(
r.tenantManager = tenantManager
r.bcManager = bcManager
r.destType = destType
r.now = time.Now
r.now = timeutil.Now
r.triggerStore = triggerStore
r.createJobMarkerMap = make(map[string]time.Time)
r.createUploadAlways = createUploadAlways
Expand Down Expand Up @@ -200,7 +201,7 @@ func (r *Router) Start(ctx context.Context) error {
return nil
}))
g.Go(crash.NotifyWarehouse(func() error {
return r.CronTracker(gCtx)
return r.cronTracker(gCtx)
}))
return g.Wait()
}
Expand Down Expand Up @@ -711,14 +712,32 @@ func (r *Router) loadReloadableConfig(whName string) {
r.config.enableJitterForSyncs = r.conf.GetReloadableBoolVar(false, "Warehouse.enableJitterForSyncs")
r.config.warehouseSyncFreqIgnore = r.conf.GetReloadableBoolVar(false, "Warehouse.warehouseSyncFreqIgnore")
r.config.cronTrackerRetries = r.conf.GetReloadableInt64Var(5, 1, "Warehouse.cronTrackerRetries")
r.config.uploadBufferTimeInMin = r.conf.GetReloadableDurationVar(180, time.Minute, "Warehouse.uploadBufferTimeInMin")
}

func (r *Router) loadStats() {
tags := stats.Tags{"destType": r.destType}
tags := stats.Tags{"module": moduleName, "destType": r.destType}
r.stats.processingPendingJobsStat = r.statsFactory.NewTaggedStat("wh_processing_pending_jobs", stats.GaugeType, tags)
r.stats.processingAvailableWorkersStat = r.statsFactory.NewTaggedStat("wh_processing_available_workers", stats.GaugeType, tags)
r.stats.processingPickupLagStat = r.statsFactory.NewTaggedStat("wh_processing_pickup_lag", stats.TimerType, tags)
r.stats.processingPickupWaitTimeStat = r.statsFactory.NewTaggedStat("wh_processing_pickup_wait_time", stats.TimerType, tags)
r.stats.schedulerWarehouseLengthStat = r.statsFactory.NewTaggedStat("wh_scheduler.warehouse_length", stats.GaugeType, tags)
r.stats.schedulerTotalSchedulingTimeStat = r.statsFactory.NewTaggedStat("wh_scheduler.total_scheduling_time", stats.TimerType, tags)
r.stats.schedulerWarehouseLengthStat = r.statsFactory.NewTaggedStat("wh_scheduler_warehouse_length", stats.GaugeType, tags)
r.stats.schedulerTotalSchedulingTimeStat = r.statsFactory.NewTaggedStat("wh_scheduler_total_scheduling_time", stats.TimerType, tags)
r.stats.cronTrackerExecTimestamp = r.statsFactory.NewTaggedStat("warehouse_cron_tracker_timestamp_seconds", stats.GaugeType, tags)
}

func (r *Router) copyWarehouses() []model.Warehouse {
r.configSubscriberLock.RLock()
defer r.configSubscriberLock.RUnlock()

warehouses := make([]model.Warehouse, len(r.warehouses))
copy(warehouses, r.warehouses)
return warehouses
}

func (r *Router) getNowSQL() string {
if r.nowSQL != "" {
return r.nowSQL
}
return "NOW()"
}
23 changes: 0 additions & 23 deletions warehouse/router/testdata/sql/seed_tracker_test.sql

This file was deleted.

Loading

0 comments on commit 89bad44

Please sign in to comment.