diff --git a/internal/armada/server/lease.go b/internal/armada/server/lease.go index 9a776d0e15f..2b9d7bb753e 100644 --- a/internal/armada/server/lease.go +++ b/internal/armada/server/lease.go @@ -344,7 +344,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str lastSeen, ) if err != nil { - logging.WithStacktrace(ctx.Log, err).Warnf( + logging.WithStacktrace(ctx, err).Warnf( "skipping node %s from executor %s", nodeInfo.GetName(), req.GetClusterId(), ) continue @@ -566,7 +566,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str if q.SchedulingContextRepository != nil { sctx.ClearJobSpecs() if err := q.SchedulingContextRepository.AddSchedulingContext(sctx); err != nil { - logging.WithStacktrace(ctx.Log, err).Error("failed to store scheduling context") + logging.WithStacktrace(ctx, err).Error("failed to store scheduling context") } } @@ -641,7 +641,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str jobIdsToDelete := util.Map(jobsToDelete, func(job *api.Job) string { return job.Id }) log.Infof("deleting preempted jobs: %v", jobIdsToDelete) if deletionResult, err := q.jobRepository.DeleteJobs(jobsToDelete); err != nil { - logging.WithStacktrace(ctx.Log, err).Error("failed to delete preempted jobs from Redis") + logging.WithStacktrace(ctx, err).Error("failed to delete preempted jobs from Redis") } else { deleteErrorByJobId := armadamaps.MapKeys(deletionResult, func(job *api.Job) string { return job.Id }) for jobId := range preemptedApiJobsById { @@ -704,7 +704,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str } } if err := q.usageRepository.UpdateClusterQueueResourceUsage(req.ClusterId, currentExecutorReport); err != nil { - logging.WithStacktrace(ctx.Log, err).Errorf("failed to update cluster usage") + logging.WithStacktrace(ctx, err).Errorf("failed to update cluster usage") } allocatedByQueueAndPriorityClassForPool = q.aggregateAllocationAcrossExecutor(reportsByExecutor, req.Pool) @@ -728,7 +728,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str } node, err := nodeDb.GetNode(nodeId) if err != nil { - logging.WithStacktrace(ctx.Log, err).Warnf("failed to set node id selector on job %s: node with id %s not found", apiJob.Id, nodeId) + logging.WithStacktrace(ctx, err).Warnf("failed to set node id selector on job %s: node with id %s not found", apiJob.Id, nodeId) continue } v := node.Labels[q.schedulingConfig.Preemption.NodeIdLabel] @@ -764,7 +764,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str } node, err := nodeDb.GetNode(nodeId) if err != nil { - logging.WithStacktrace(ctx.Log, err).Warnf("failed to set node name on job %s: node with id %s not found", apiJob.Id, nodeId) + logging.WithStacktrace(ctx, err).Warnf("failed to set node name on job %s: node with id %s not found", apiJob.Id, nodeId) continue } podSpec.NodeName = node.Name diff --git a/internal/armada/server/submit_from_log.go b/internal/armada/server/submit_from_log.go index 90b5ece3553..995e9785d5b 100644 --- a/internal/armada/server/submit_from_log.go +++ b/internal/armada/server/submit_from_log.go @@ -125,12 +125,12 @@ func (srv *SubmitFromLog) Run(ctx *armadacontext.Context) error { sequence, err := eventutil.UnmarshalEventSequence(ctxWithLogger, msg.Payload()) if err != nil { srv.ack(ctx, msg) - logging.WithStacktrace(ctxWithLogger.Log, err).Warnf("processing message failed; ignoring") + logging.WithStacktrace(ctxWithLogger, err).Warnf("processing message failed; ignoring") numErrored++ break } - ctxWithLogger.Log.WithField("numEvents", len(sequence.Events)).Info("processing sequence") + ctxWithLogger.WithField("numEvents", len(sequence.Events)).Info("processing sequence") // TODO: Improve retry logic. srv.ProcessSequence(ctxWithLogger, sequence) srv.ack(ctx, msg) @@ -155,11 +155,11 @@ func (srv *SubmitFromLog) ProcessSequence(ctx *armadacontext.Context, sequence * for i < len(sequence.Events) && time.Since(lastProgress) < timeout { j, err := srv.ProcessSubSequence(ctx, i, sequence) if err != nil { - logging.WithStacktrace(ctx.Log, err).WithFields(logrus.Fields{"lowerIndex": i, "upperIndex": j}).Warnf("processing subsequence failed; ignoring") + logging.WithStacktrace(ctx, err).WithFields(logrus.Fields{"lowerIndex": i, "upperIndex": j}).Warnf("processing subsequence failed; ignoring") } if j == i { - ctx.Log.WithFields(logrus.Fields{"lowerIndex": i, "upperIndex": j}).Info("made no progress") + ctx.WithFields(logrus.Fields{"lowerIndex": i, "upperIndex": j}).Info("made no progress") // We should only get here if a transient error occurs. // Sleep for a bit before retrying. diff --git a/internal/common/armadacontext/armada_context.go b/internal/common/armadacontext/armada_context.go index a6985ee5df7..0e41a66a1e4 100644 --- a/internal/common/armadacontext/armada_context.go +++ b/internal/common/armadacontext/armada_context.go @@ -13,22 +13,22 @@ import ( // while retaining type-safety type Context struct { context.Context - Log *logrus.Entry + logrus.FieldLogger } // Background creates an empty context with a default logger. It is analogous to context.Background() func Background() *Context { return &Context{ - Context: context.Background(), - Log: logrus.NewEntry(logrus.New()), + Context: context.Background(), + FieldLogger: logrus.NewEntry(logrus.New()), } } // TODO creates an empty context with a default logger. It is analogous to context.TODO() func TODO() *Context { return &Context{ - Context: context.TODO(), - Log: logrus.NewEntry(logrus.New()), + Context: context.TODO(), + FieldLogger: logrus.NewEntry(logrus.New()), } } @@ -42,8 +42,8 @@ func FromGrpcCtx(ctx context.Context) *Context { // New returns an armada context that encapsulates both a go context and a logger func New(ctx context.Context, log *logrus.Entry) *Context { return &Context{ - Context: ctx, - Log: log, + Context: ctx, + FieldLogger: log, } } @@ -51,8 +51,8 @@ func New(ctx context.Context, log *logrus.Entry) *Context { func WithCancel(parent *Context) (*Context, context.CancelFunc) { c, cancel := context.WithCancel(parent.Context) return &Context{ - Context: c, - Log: parent.Log, + Context: c, + FieldLogger: parent.FieldLogger, }, cancel } @@ -61,8 +61,8 @@ func WithCancel(parent *Context) (*Context, context.CancelFunc) { func WithDeadline(parent *Context, d time.Time) (*Context, context.CancelFunc) { c, cancel := context.WithDeadline(parent.Context, d) return &Context{ - Context: c, - Log: parent.Log, + Context: c, + FieldLogger: parent.FieldLogger, }, cancel } @@ -74,16 +74,16 @@ func WithTimeout(parent *Context, timeout time.Duration) (*Context, context.Canc // WithLogField returns a copy of parent with the supplied key-value added to the logger func WithLogField(parent *Context, key string, val interface{}) *Context { return &Context{ - Context: parent.Context, - Log: parent.Log.WithField(key, val), + Context: parent.Context, + FieldLogger: parent.FieldLogger.WithField(key, val), } } // WithLogFields returns a copy of parent with the supplied key-values added to the logger func WithLogFields(parent *Context, fields logrus.Fields) *Context { return &Context{ - Context: parent.Context, - Log: parent.Log.WithFields(fields), + Context: parent.Context, + FieldLogger: parent.FieldLogger.WithFields(fields), } } @@ -91,8 +91,8 @@ func WithLogFields(parent *Context, fields logrus.Fields) *Context { // val. It is analogous to context.WithValue() func WithValue(parent *Context, key, val any) *Context { return &Context{ - Context: context.WithValue(parent, key, val), - Log: parent.Log, + Context: context.WithValue(parent, key, val), + FieldLogger: parent.FieldLogger, } } @@ -101,7 +101,7 @@ func WithValue(parent *Context, key, val any) *Context { func ErrGroup(ctx *Context) (*errgroup.Group, *Context) { group, goctx := errgroup.WithContext(ctx) return group, &Context{ - Context: goctx, - Log: ctx.Log, + Context: goctx, + FieldLogger: ctx.FieldLogger, } } diff --git a/internal/common/armadacontext/armada_context_test.go b/internal/common/armadacontext/armada_context_test.go index a98d7b611df..4cda401c1b1 100644 --- a/internal/common/armadacontext/armada_context_test.go +++ b/internal/common/armadacontext/armada_context_test.go @@ -15,7 +15,7 @@ var defaultLogger = logrus.WithField("foo", "bar") func TestNew(t *testing.T) { ctx := New(context.Background(), defaultLogger) - require.Equal(t, defaultLogger, ctx.Log) + require.Equal(t, defaultLogger, ctx.FieldLogger) require.Equal(t, context.Background(), ctx.Context) } @@ -23,7 +23,7 @@ func TestFromGrpcContext(t *testing.T) { grpcCtx := ctxlogrus.ToContext(context.Background(), defaultLogger) ctx := FromGrpcCtx(grpcCtx) require.Equal(t, grpcCtx, ctx.Context) - require.Equal(t, defaultLogger, ctx.Log) + require.Equal(t, defaultLogger, ctx.FieldLogger) } func TestBackground(t *testing.T) { @@ -39,13 +39,13 @@ func TestTODO(t *testing.T) { func TestWithLogField(t *testing.T) { ctx := WithLogField(Background(), "fish", "chips") require.Equal(t, context.Background(), ctx.Context) - require.Equal(t, logrus.Fields{"fish": "chips"}, ctx.Log.Data) + require.Equal(t, logrus.Fields{"fish": "chips"}, ctx.FieldLogger.(*logrus.Entry).Data) } func TestWithLogFields(t *testing.T) { ctx := WithLogFields(Background(), logrus.Fields{"fish": "chips", "salt": "pepper"}) require.Equal(t, context.Background(), ctx.Context) - require.Equal(t, logrus.Fields{"fish": "chips", "salt": "pepper"}, ctx.Log.Data) + require.Equal(t, logrus.Fields{"fish": "chips", "salt": "pepper"}, ctx.FieldLogger.(*logrus.Entry).Data) } func TestWithTimeout(t *testing.T) { diff --git a/internal/common/logging/stacktrace.go b/internal/common/logging/stacktrace.go index cdcf4aef525..7d546915b31 100644 --- a/internal/common/logging/stacktrace.go +++ b/internal/common/logging/stacktrace.go @@ -10,9 +10,9 @@ type stackTracer interface { StackTrace() errors.StackTrace } -// WithStacktrace returns a new logrus.Entry obtained by adding error information and, if available, a stack trace -// as fields to the provided logrus.Entry. -func WithStacktrace(logger *logrus.Entry, err error) *logrus.Entry { +// WithStacktrace returns a new logrus.FieldLogger obtained by adding error information and, if available, a stack trace +// as fields to the provided logrus.FieldLogger. +func WithStacktrace(logger logrus.FieldLogger, err error) logrus.FieldLogger { logger = logger.WithError(err) if stackErr, ok := err.(stackTracer); ok { return logger.WithField("stacktrace", stackErr.StackTrace()) diff --git a/internal/scheduler/api.go b/internal/scheduler/api.go index a31eba85f5e..533abc4b728 100644 --- a/internal/scheduler/api.go +++ b/internal/scheduler/api.go @@ -103,7 +103,7 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns if err != nil { return err } - ctx.Log.Infof( + ctx.Infof( "executor currently has %d job runs; sending %d cancellations and %d new runs", len(requestRuns), len(runsToCancel), len(newRuns), ) @@ -226,7 +226,7 @@ func (srv *ExecutorApi) executorFromLeaseRequest(ctx *armadacontext.Context, req now := srv.clock.Now().UTC() for _, nodeInfo := range req.Nodes { if node, err := api.NewNodeFromNodeInfo(nodeInfo, req.ExecutorId, srv.allowedPriorities, now); err != nil { - logging.WithStacktrace(ctx.Log, err).Warnf( + logging.WithStacktrace(ctx, err).Warnf( "skipping node %s from executor %s", nodeInfo.GetName(), req.GetExecutorId(), ) } else { diff --git a/internal/scheduler/database/db_pruner.go b/internal/scheduler/database/db_pruner.go index 1448122f60a..8da7dd7935d 100644 --- a/internal/scheduler/database/db_pruner.go +++ b/internal/scheduler/database/db_pruner.go @@ -40,11 +40,11 @@ func PruneDb(ctx *armadacontext.Context, db *pgx.Conn, batchLimit int, keepAfter return errors.WithStack(err) } if totalJobsToDelete == 0 { - ctx.Log.Infof("Found no jobs to be deleted. Exiting") + ctx.Infof("Found no jobs to be deleted. Exiting") return nil } - ctx.Log.Infof("Found %d jobs to be deleted", totalJobsToDelete) + ctx.Infof("Found %d jobs to be deleted", totalJobsToDelete) // create temp table to hold a batch of results _, err = db.Exec(ctx, "CREATE TEMP TABLE batch (job_id TEXT);") @@ -93,10 +93,10 @@ func PruneDb(ctx *armadacontext.Context, db *pgx.Conn, batchLimit int, keepAfter taken := time.Now().Sub(batchStart) jobsDeleted += batchSize - ctx.Log. + ctx. Infof("Deleted %d jobs in %s. Deleted %d jobs out of %d", batchSize, taken, jobsDeleted, totalJobsToDelete) } taken := time.Now().Sub(start) - ctx.Log.Infof("Deleted %d jobs in %s", jobsDeleted, taken) + ctx.Infof("Deleted %d jobs in %s", jobsDeleted, taken) return nil } diff --git a/internal/scheduler/database/util.go b/internal/scheduler/database/util.go index 7f4ba26244c..af338ee3b42 100644 --- a/internal/scheduler/database/util.go +++ b/internal/scheduler/database/util.go @@ -24,7 +24,7 @@ func Migrate(ctx *armadacontext.Context, db database.Querier) error { if err != nil { return err } - ctx.Log.Infof("Updated scheduler database in %s", time.Now().Sub(start)) + ctx.Infof("Updated scheduler database in %s", time.Now().Sub(start)) return nil } diff --git a/internal/scheduler/leader.go b/internal/scheduler/leader.go index 0482184a7a8..714cf243f52 100644 --- a/internal/scheduler/leader.go +++ b/internal/scheduler/leader.go @@ -145,7 +145,7 @@ func (lc *KubernetesLeaderController) Run(ctx *armadacontext.Context) error { return ctx.Err() default: lock := lc.getNewLock() - ctx.Log.Infof("attempting to become leader") + ctx.Infof("attempting to become leader") leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ Lock: lock, ReleaseOnCancel: true, @@ -154,14 +154,14 @@ func (lc *KubernetesLeaderController) Run(ctx *armadacontext.Context) error { RetryPeriod: lc.config.RetryPeriod, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(c context.Context) { - ctx.Log.Infof("I am now leader") + ctx.Infof("I am now leader") lc.token.Store(NewLeaderToken()) for _, listener := range lc.listeners { listener.onStartedLeading(ctx) } }, OnStoppedLeading: func() { - ctx.Log.Infof("I am no longer leader") + ctx.Infof("I am no longer leader") lc.token.Store(InvalidLeaderToken()) for _, listener := range lc.listeners { listener.onStoppedLeading() @@ -174,7 +174,7 @@ func (lc *KubernetesLeaderController) Run(ctx *armadacontext.Context) error { }, }, }) - ctx.Log.Infof("leader election round finished") + ctx.Infof("leader election round finished") } } } diff --git a/internal/scheduler/metrics.go b/internal/scheduler/metrics.go index 6526f0e358a..168295ff91f 100644 --- a/internal/scheduler/metrics.go +++ b/internal/scheduler/metrics.go @@ -78,17 +78,17 @@ func NewMetricsCollector( // Run enters s a loop which updates the metrics every refreshPeriod until the supplied context is cancelled func (c *MetricsCollector) Run(ctx *armadacontext.Context) error { ticker := c.clock.NewTicker(c.refreshPeriod) - ctx.Log.Infof("Will update metrics every %s", c.refreshPeriod) + ctx.Infof("Will update metrics every %s", c.refreshPeriod) for { select { case <-ctx.Done(): - ctx.Log.Debugf("Context cancelled, returning..") + ctx.Debugf("Context cancelled, returning..") return nil case <-ticker.C(): err := c.refresh(ctx) if err != nil { logging. - WithStacktrace(ctx.Log, err). + WithStacktrace(ctx, err). Warnf("error refreshing metrics state") } } @@ -111,7 +111,7 @@ func (c *MetricsCollector) Collect(metrics chan<- prometheus.Metric) { } func (c *MetricsCollector) refresh(ctx *armadacontext.Context) error { - ctx.Log.Debugf("Refreshing prometheus metrics") + ctx.Debugf("Refreshing prometheus metrics") start := time.Now() queueMetrics, err := c.updateQueueMetrics(ctx) if err != nil { @@ -123,7 +123,7 @@ func (c *MetricsCollector) refresh(ctx *armadacontext.Context) error { } allMetrics := append(queueMetrics, clusterMetrics...) c.state.Store(allMetrics) - ctx.Log.Debugf("Refreshed prometheus metrics in %s", time.Since(start)) + ctx.Debugf("Refreshed prometheus metrics in %s", time.Since(start)) return nil } @@ -156,8 +156,7 @@ func (c *MetricsCollector) updateQueueMetrics(ctx *armadacontext.Context) ([]pro } qs, ok := provider.queueStates[job.Queue()] if !ok { - ctx.Log. - Warnf("job %s is in queue %s, but this queue does not exist; skipping", job.Id(), job.Queue()) + ctx.Warnf("job %s is in queue %s, but this queue does not exist; skipping", job.Id(), job.Queue()) continue } @@ -184,7 +183,7 @@ func (c *MetricsCollector) updateQueueMetrics(ctx *armadacontext.Context) ([]pro timeInState = currentTime.Sub(time.Unix(0, run.Created())) recorder = qs.runningJobRecorder } else { - ctx.Log.Warnf("Job %s is marked as leased but has no runs", job.Id()) + ctx.Warnf("Job %s is marked as leased but has no runs", job.Id()) } recorder.RecordJobRuntime(pool, priorityClass, timeInState) recorder.RecordResources(pool, priorityClass, jobResources) diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index fd0c0d9e079..adfbe4d86b5 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -129,11 +129,11 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche sch.nodeEvictionProbability, func(ctx *armadacontext.Context, job interfaces.LegacySchedulerJob) bool { if job.GetAnnotations() == nil { - ctx.Log.Errorf("can't evict job %s: annotations not initialised", job.GetId()) + ctx.Errorf("can't evict job %s: annotations not initialised", job.GetId()) return false } if job.GetNodeSelector() == nil { - ctx.Log.Errorf("can't evict job %s: nodeSelector not initialised", job.GetId()) + ctx.Errorf("can't evict job %s: nodeSelector not initialised", job.GetId()) return false } if qctx, ok := sch.schedulingContext.QueueSchedulingContexts[job.GetQueue()]; ok { @@ -241,10 +241,10 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche return nil, err } if s := JobsSummary(preemptedJobs); s != "" { - ctx.Log.Infof("preempting running jobs; %s", s) + ctx.Infof("preempting running jobs; %s", s) } if s := JobsSummary(scheduledJobs); s != "" { - ctx.Log.Infof("scheduling new jobs; %s", s) + ctx.Infof("scheduling new jobs; %s", s) } if sch.enableAssertions { err := sch.assertions( @@ -805,7 +805,7 @@ func NewOversubscribedEvictor( }, jobFilter: func(ctx *armadacontext.Context, job interfaces.LegacySchedulerJob) bool { if job.GetAnnotations() == nil { - ctx.Log.Warnf("can't evict job %s: annotations not initialised", job.GetId()) + ctx.Warnf("can't evict job %s: annotations not initialised", job.GetId()) return false } priorityClassName := job.GetPriorityClassName() @@ -884,7 +884,7 @@ func defaultPostEvictFunc(ctx *armadacontext.Context, job interfaces.LegacySched // Add annotation indicating to the scheduler this this job was evicted. annotations := job.GetAnnotations() if annotations == nil { - ctx.Log.Errorf("error evicting job %s: annotations not initialised", job.GetId()) + ctx.Errorf("error evicting job %s: annotations not initialised", job.GetId()) } else { annotations[schedulerconfig.IsEvictedAnnotation] = "true" } @@ -892,7 +892,7 @@ func defaultPostEvictFunc(ctx *armadacontext.Context, job interfaces.LegacySched // Add node selector ensuring this job is only re-scheduled onto the node it was evicted from. nodeSelector := job.GetNodeSelector() if nodeSelector == nil { - ctx.Log.Errorf("error evicting job %s: nodeSelector not initialised", job.GetId()) + ctx.Errorf("error evicting job %s: nodeSelector not initialised", job.GetId()) } else { nodeSelector[schedulerconfig.NodeIdLabel] = node.Id } diff --git a/internal/scheduler/publisher.go b/internal/scheduler/publisher.go index 4c30717ac62..598a00fc755 100644 --- a/internal/scheduler/publisher.go +++ b/internal/scheduler/publisher.go @@ -103,14 +103,14 @@ func (p *PulsarPublisher) PublishMessages(ctx *armadacontext.Context, events []* // Send messages if shouldPublish() { - ctx.Log.Debugf("Am leader so will publish") + ctx.Debugf("Am leader so will publish") sendCtx, cancel := armadacontext.WithTimeout(ctx, p.pulsarSendTimeout) errored := false for _, msg := range msgs { p.producer.SendAsync(sendCtx, msg, func(_ pulsar.MessageID, _ *pulsar.ProducerMessage, err error) { if err != nil { logging. - WithStacktrace(ctx.Log, err). + WithStacktrace(ctx, err). Error("error sending message to Pulsar") errored = true } @@ -123,7 +123,7 @@ func (p *PulsarPublisher) PublishMessages(ctx *armadacontext.Context, events []* return errors.New("One or more messages failed to send to Pulsar") } } else { - ctx.Log.Debugf("No longer leader so not publishing") + ctx.Debugf("No longer leader so not publishing") } return nil } diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 97585f8c6cf..4084ae726a8 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -117,37 +117,37 @@ func NewScheduler( // Run enters the scheduling loop, which will continue until ctx is cancelled. func (s *Scheduler) Run(ctx *armadacontext.Context) error { - ctx.Log.Infof("starting scheduler with cycle time %s", s.cyclePeriod) - defer ctx.Log.Info("scheduler stopped") + ctx.Infof("starting scheduler with cycle time %s", s.cyclePeriod) + defer ctx.Info("scheduler stopped") // JobDb initialisation. start := s.clock.Now() if err := s.initialise(ctx); err != nil { return err } - ctx.Log.Infof("JobDb initialised in %s", s.clock.Since(start)) + ctx.Infof("JobDb initialised in %s", s.clock.Since(start)) ticker := s.clock.NewTicker(s.cyclePeriod) prevLeaderToken := InvalidLeaderToken() for { select { case <-ctx.Done(): - ctx.Log.Infof("context cancelled; returning.") + ctx.Infof("context cancelled; returning.") return ctx.Err() case <-ticker.C(): start := s.clock.Now() ctx := armadacontext.WithLogField(ctx, "cycleId", shortuuid.New()) leaderToken := s.leaderController.GetToken() fullUpdate := false - ctx.Log.Infof("received leaderToken; leader status is %t", leaderToken.leader) + ctx.Infof("received leaderToken; leader status is %t", leaderToken.leader) // If we are becoming leader then we must ensure we have caught up to all Pulsar messages if leaderToken.leader && leaderToken != prevLeaderToken { - ctx.Log.Infof("becoming leader") + ctx.Infof("becoming leader") syncContext, cancel := armadacontext.WithTimeout(ctx, 5*time.Minute) err := s.ensureDbUpToDate(syncContext, 1*time.Second) if err != nil { - logging.WithStacktrace(ctx.Log, err).Error("could not become leader") + logging.WithStacktrace(ctx, err).Error("could not become leader") leaderToken = InvalidLeaderToken() } else { fullUpdate = true @@ -167,7 +167,7 @@ func (s *Scheduler) Run(ctx *armadacontext.Context) error { result, err := s.cycle(ctx, fullUpdate, leaderToken, shouldSchedule) if err != nil { - logging.WithStacktrace(ctx.Log, err).Error("scheduling cycle failure") + logging.WithStacktrace(ctx, err).Error("scheduling cycle failure") leaderToken = InvalidLeaderToken() } @@ -179,10 +179,10 @@ func (s *Scheduler) Run(ctx *armadacontext.Context) error { // Only the leader does real scheduling rounds. s.metrics.ReportScheduleCycleTime(cycleTime) s.metrics.ReportSchedulerResult(ctx, result) - ctx.Log.Infof("scheduling cycle completed in %s", cycleTime) + ctx.Infof("scheduling cycle completed in %s", cycleTime) } else { s.metrics.ReportReconcileCycleTime(cycleTime) - ctx.Log.Infof("reconciliation cycle completed in %s", cycleTime) + ctx.Infof("reconciliation cycle completed in %s", cycleTime) } prevLeaderToken = leaderToken @@ -258,7 +258,7 @@ func (s *Scheduler) cycle(ctx *armadacontext.Context, updateAll bool, leaderToke if err = s.publisher.PublishMessages(ctx, events, isLeader); err != nil { return } - ctx.Log.Infof("published %d events to pulsar in %s", len(events), s.clock.Since(start)) + ctx.Infof("published %d events to pulsar in %s", len(events), s.clock.Since(start)) txn.Commit() return } @@ -270,7 +270,7 @@ func (s *Scheduler) syncState(ctx *armadacontext.Context) ([]*jobdb.Job, error) if err != nil { return nil, err } - ctx.Log.Infof("received %d updated jobs and %d updated job runs in %s", len(updatedJobs), len(updatedRuns), s.clock.Since(start)) + ctx.Infof("received %d updated jobs and %d updated job runs in %s", len(updatedJobs), len(updatedRuns), s.clock.Since(start)) txn := s.jobDb.WriteTxn() defer txn.Abort() @@ -314,7 +314,7 @@ func (s *Scheduler) syncState(ctx *armadacontext.Context) ([]*jobdb.Job, error) // If the job is nil or terminal at this point then it cannot be active. // In this case we can ignore the run. if job == nil || job.InTerminalState() { - ctx.Log.Debugf("job %s is not active; ignoring update for run %s", jobId, dbRun.RunID) + ctx.Debugf("job %s is not active; ignoring update for run %s", jobId, dbRun.RunID) continue } } @@ -716,14 +716,14 @@ func (s *Scheduler) expireJobsIfNecessary(ctx *armadacontext.Context, txn *jobdb // has been completely removed for executor, heartbeat := range heartbeatTimes { if heartbeat.Before(cutOff) { - ctx.Log.Warnf("Executor %s has not reported a hearbeart since %v. Will expire all jobs running on this executor", executor, heartbeat) + ctx.Warnf("Executor %s has not reported a hearbeart since %v. Will expire all jobs running on this executor", executor, heartbeat) staleExecutors[executor] = true } } // All clusters have had a heartbeat recently. No need to expire any jobs if len(staleExecutors) == 0 { - ctx.Log.Infof("No stale executors found. No jobs need to be expired") + ctx.Infof("No stale executors found. No jobs need to be expired") return nil, nil } @@ -740,7 +740,7 @@ func (s *Scheduler) expireJobsIfNecessary(ctx *armadacontext.Context, txn *jobdb run := job.LatestRun() if run != nil && !job.Queued() && staleExecutors[run.Executor()] { - ctx.Log.Warnf("Cancelling job %s as it is running on lost executor %s", job.Id(), run.Executor()) + ctx.Warnf("Cancelling job %s as it is running on lost executor %s", job.Id(), run.Executor()) jobsToUpdate = append(jobsToUpdate, job.WithQueued(false).WithFailed(true).WithUpdatedRun(run.WithFailed(true))) jobId, err := armadaevents.ProtoUuidFromUlidString(job.Id()) @@ -805,7 +805,7 @@ func (s *Scheduler) initialise(ctx *armadacontext.Context) error { return nil default: if _, err := s.syncState(ctx); err != nil { - logging.WithStacktrace(ctx.Log, err).Error("failed to initialise; trying again in 1 second") + logging.WithStacktrace(ctx, err).Error("failed to initialise; trying again in 1 second") time.Sleep(1 * time.Second) } else { // Initialisation succeeded. @@ -832,7 +832,7 @@ func (s *Scheduler) ensureDbUpToDate(ctx *armadacontext.Context, pollInterval ti default: numSent, err = s.publisher.PublishMarkers(ctx, groupId) if err != nil { - logging.WithStacktrace(ctx.Log, err).Error("Error sending marker messages to pulsar") + logging.WithStacktrace(ctx, err).Error("Error sending marker messages to pulsar") s.clock.Sleep(pollInterval) } else { messagesSent = true @@ -849,14 +849,14 @@ func (s *Scheduler) ensureDbUpToDate(ctx *armadacontext.Context, pollInterval ti numReceived, err := s.jobRepository.CountReceivedPartitions(ctx, groupId) if err != nil { logging. - WithStacktrace(ctx.Log, err). + WithStacktrace(ctx, err). Error("Error querying the database or marker messages") } if numSent == numReceived { - ctx.Log.Infof("Successfully ensured that database state is up to date") + ctx.Infof("Successfully ensured that database state is up to date") return nil } - ctx.Log.Infof("Recevied %d partitions, still waiting on %d", numReceived, numSent-numReceived) + ctx.Infof("Recevied %d partitions, still waiting on %d", numReceived, numSent-numReceived) s.clock.Sleep(pollInterval) } } diff --git a/internal/scheduler/scheduler_metrics.go b/internal/scheduler/scheduler_metrics.go index 4e39bdc3668..3ba197ebeba 100644 --- a/internal/scheduler/scheduler_metrics.go +++ b/internal/scheduler/scheduler_metrics.go @@ -209,7 +209,7 @@ func observeJobAggregates(ctx *armadacontext.Context, metric prometheus.CounterV if err != nil { // A metric failure isn't reason to kill the programme. - ctx.Log.Errorf("error reteriving considered jobs observer for queue %s, priorityClass %s", queue, priorityClassName) + ctx.Errorf("error reteriving considered jobs observer for queue %s, priorityClass %s", queue, priorityClassName) } else { observer.Add(float64(count)) } @@ -224,7 +224,7 @@ func (metrics *SchedulerMetrics) reportNumberOfJobsConsidered(ctx *armadacontext observer, err := metrics.consideredJobs.GetMetricWithLabelValues(queue, pool) if err != nil { - ctx.Log.Errorf("error reteriving considered jobs observer for queue %s, pool %s", queue, pool) + ctx.Errorf("error reteriving considered jobs observer for queue %s, pool %s", queue, pool) } else { observer.Add(float64(count)) } @@ -243,7 +243,7 @@ func (metrics *SchedulerMetrics) reportQueueShares(ctx *armadacontext.Context, s observer, err := metrics.fairSharePerQueue.GetMetricWithLabelValues(queue, pool) if err != nil { - ctx.Log.Errorf("error retrieving considered jobs observer for queue %s, pool %s", queue, pool) + ctx.Errorf("error retrieving considered jobs observer for queue %s, pool %s", queue, pool) } else { observer.Set(fairShare) } @@ -252,7 +252,7 @@ func (metrics *SchedulerMetrics) reportQueueShares(ctx *armadacontext.Context, s observer, err = metrics.actualSharePerQueue.GetMetricWithLabelValues(queue, pool) if err != nil { - ctx.Log.Errorf("error reteriving considered jobs observer for queue %s, pool %s", queue, pool) + ctx.Errorf("error reteriving considered jobs observer for queue %s, pool %s", queue, pool) } else { observer.Set(actualShare) } diff --git a/internal/scheduler/schedulerapp.go b/internal/scheduler/schedulerapp.go index e5a735481a3..c045591b175 100644 --- a/internal/scheduler/schedulerapp.go +++ b/internal/scheduler/schedulerapp.go @@ -55,7 +55,7 @@ func Run(config schedulerconfig.Configuration) error { ////////////////////////////////////////////////////////////////////////// // Database setup (postgres and redis) ////////////////////////////////////////////////////////////////////////// - ctx.Log.Infof("Setting up database connections") + ctx.Infof("Setting up database connections") db, err := dbcommon.OpenPgxPool(config.Postgres) if err != nil { return errors.WithMessage(err, "Error opening connection to postgres") @@ -69,7 +69,7 @@ func Run(config schedulerconfig.Configuration) error { err := redisClient.Close() if err != nil { logging. - WithStacktrace(ctx.Log, err). + WithStacktrace(ctx, err). Warnf("Redis client didn't close down cleanly") } }() @@ -79,7 +79,7 @@ func Run(config schedulerconfig.Configuration) error { ////////////////////////////////////////////////////////////////////////// // Pulsar ////////////////////////////////////////////////////////////////////////// - ctx.Log.Infof("Setting up Pulsar connectivity") + ctx.Infof("Setting up Pulsar connectivity") pulsarClient, err := pulsarutils.NewPulsarClient(&config.Pulsar) if err != nil { return errors.WithMessage(err, "Error creating pulsar client") @@ -108,7 +108,7 @@ func Run(config schedulerconfig.Configuration) error { ////////////////////////////////////////////////////////////////////////// // Executor Api ////////////////////////////////////////////////////////////////////////// - ctx.Log.Infof("Setting up executor api") + ctx.Infof("Setting up executor api") apiProducer, err := pulsarClient.CreateProducer(pulsar.ProducerOptions{ Name: fmt.Sprintf("armada-executor-api-%s", uuid.NewString()), CompressionType: config.Pulsar.CompressionType, @@ -146,7 +146,7 @@ func Run(config schedulerconfig.Configuration) error { } executorapi.RegisterExecutorApiServer(grpcServer, executorServer) services = append(services, func() error { - ctx.Log.Infof("Executor api listening on %s", lis.Addr()) + ctx.Infof("Executor api listening on %s", lis.Addr()) return grpcServer.Serve(lis) }) services = append(services, grpcCommon.CreateShutdownHandler(ctx, 5*time.Second, grpcServer)) @@ -154,7 +154,7 @@ func Run(config schedulerconfig.Configuration) error { ////////////////////////////////////////////////////////////////////////// // Scheduling ////////////////////////////////////////////////////////////////////////// - ctx.Log.Infof("setting up scheduling loop") + ctx.Infof("setting up scheduling loop") stringInterner, err := stringinterner.New(config.InternedStringsCacheSize) if err != nil { return errors.WithMessage(err, "error creating string interner") @@ -243,10 +243,10 @@ func Run(config schedulerconfig.Configuration) error { func createLeaderController(ctx *armadacontext.Context, config schedulerconfig.LeaderConfig) (LeaderController, error) { switch mode := strings.ToLower(config.Mode); mode { case "standalone": - ctx.Log.Infof("Scheduler will run in standalone mode") + ctx.Infof("Scheduler will run in standalone mode") return NewStandaloneLeaderController(), nil case "kubernetes": - ctx.Log.Infof("Scheduler will run kubernetes mode") + ctx.Infof("Scheduler will run kubernetes mode") clusterConfig, err := loadClusterConfig(ctx) if err != nil { return nil, errors.Wrapf(err, "Error creating kubernetes client") @@ -268,11 +268,11 @@ func createLeaderController(ctx *armadacontext.Context, config schedulerconfig.L func loadClusterConfig(ctx *armadacontext.Context) (*rest.Config, error) { config, err := rest.InClusterConfig() if err == rest.ErrNotInCluster { - ctx.Log.Info("Running with default client configuration") + ctx.Info("Running with default client configuration") rules := clientcmd.NewDefaultClientConfigLoadingRules() overrides := &clientcmd.ConfigOverrides{} return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides).ClientConfig() } - ctx.Log.Info("Running with in cluster client configuration") + ctx.Info("Running with in cluster client configuration") return config, err } diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index a1865d1601b..563ab93f000 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -99,7 +99,7 @@ func (l *FairSchedulingAlgo) Schedule( // Exit immediately if scheduling is disabled. if l.schedulingConfig.DisableScheduling { - ctx.Log.Info("skipping scheduling - scheduling disabled") + ctx.Info("skipping scheduling - scheduling disabled") return overallSchedulerResult, nil } @@ -121,7 +121,7 @@ func (l *FairSchedulingAlgo) Schedule( select { case <-ctxWithTimeout.Done(): // We've reached the scheduling time limit; exit gracefully. - ctx.Log.Info("ending scheduling round early as we have hit the maximum scheduling duration") + ctx.Info("ending scheduling round early as we have hit the maximum scheduling duration") return overallSchedulerResult, nil default: } @@ -140,7 +140,7 @@ func (l *FairSchedulingAlgo) Schedule( // Assume pool and minimumJobSize are consistent within the group. pool := executorGroup[0].Pool minimumJobSize := executorGroup[0].MinimumJobSize - ctx.Log.Infof( + ctx.Infof( "scheduling on executor group %s with capacity %s", executorGroupLabel, fsctx.totalCapacityByPool[pool].CompactString(), ) @@ -156,14 +156,14 @@ func (l *FairSchedulingAlgo) Schedule( // add the executorGroupLabel back to l.executorGroupsToSchedule such that we try it again next time, // and exit gracefully. l.executorGroupsToSchedule = append(l.executorGroupsToSchedule, executorGroupLabel) - ctx.Log.Info("stopped scheduling early as we have hit the maximum scheduling duration") + ctx.Info("stopped scheduling early as we have hit the maximum scheduling duration") break } else if err != nil { return nil, err } if l.schedulingContextRepository != nil { if err := l.schedulingContextRepository.AddSchedulingContext(sctx); err != nil { - logging.WithStacktrace(ctx.Log, err).Error("failed to add scheduling context") + logging.WithStacktrace(ctx, err).Error("failed to add scheduling context") } } @@ -563,7 +563,9 @@ func (l *FairSchedulingAlgo) filterLaggingExecutors( leasedJobs := leasedJobsByExecutor[executor.Id] executorRuns, err := executor.AllRuns() if err != nil { - logging.WithStacktrace(ctx.Log, err).Errorf("failed to retrieve runs for executor %s; will not be considered for scheduling", executor.Id) + logging. + WithStacktrace(ctx, err). + Errorf("failed to retrieve runs for executor %s; will not be considered for scheduling", executor.Id) continue } executorRunIds := make(map[uuid.UUID]bool, len(executorRuns)) @@ -582,7 +584,7 @@ func (l *FairSchedulingAlgo) filterLaggingExecutors( if numUnacknowledgedJobs <= l.schedulingConfig.MaxUnacknowledgedJobsPerExecutor { activeExecutors = append(activeExecutors, executor) } else { - ctx.Log.Warnf( + ctx.Warnf( "%d unacknowledged jobs on executor %s exceeds limit of %d; executor will not be considered for scheduling", numUnacknowledgedJobs, executor.Id, l.schedulingConfig.MaxUnacknowledgedJobsPerExecutor, ) diff --git a/internal/scheduler/submitcheck.go b/internal/scheduler/submitcheck.go index 5eefdb8f970..d390f3c5037 100644 --- a/internal/scheduler/submitcheck.go +++ b/internal/scheduler/submitcheck.go @@ -102,7 +102,7 @@ func (srv *SubmitChecker) updateExecutors(ctx *armadacontext.Context) { executors, err := srv.executorRepository.GetExecutors(ctx) if err != nil { logging. - WithStacktrace(ctx.Log, err). + WithStacktrace(ctx, err). Error("Error fetching executors") return } @@ -117,12 +117,12 @@ func (srv *SubmitChecker) updateExecutors(ctx *armadacontext.Context) { srv.mu.Unlock() if err != nil { logging. - WithStacktrace(ctx.Log, err). + WithStacktrace(ctx, err). Errorf("Error constructing node db for executor %s", executor.Id) } } else { logging. - WithStacktrace(ctx.Log, err). + WithStacktrace(ctx, err). Warnf("Error clearing nodedb for executor %s", executor.Id) } }