Skip to content

Commit

Permalink
refactor armadacontext to implement a FieldLogger
Browse files Browse the repository at this point in the history
  • Loading branch information
d80tb7 committed Sep 13, 2023
1 parent c17dce2 commit 6a90c09
Show file tree
Hide file tree
Showing 17 changed files with 111 additions and 110 deletions.
12 changes: 6 additions & 6 deletions internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
lastSeen,
)
if err != nil {
logging.WithStacktrace(ctx.Log, err).Warnf(
logging.WithStacktrace(ctx, err).Warnf(
"skipping node %s from executor %s", nodeInfo.GetName(), req.GetClusterId(),
)
continue
Expand Down Expand Up @@ -566,7 +566,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
if q.SchedulingContextRepository != nil {
sctx.ClearJobSpecs()
if err := q.SchedulingContextRepository.AddSchedulingContext(sctx); err != nil {
logging.WithStacktrace(ctx.Log, err).Error("failed to store scheduling context")
logging.WithStacktrace(ctx, err).Error("failed to store scheduling context")
}
}

Expand Down Expand Up @@ -641,7 +641,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
jobIdsToDelete := util.Map(jobsToDelete, func(job *api.Job) string { return job.Id })
log.Infof("deleting preempted jobs: %v", jobIdsToDelete)
if deletionResult, err := q.jobRepository.DeleteJobs(jobsToDelete); err != nil {
logging.WithStacktrace(ctx.Log, err).Error("failed to delete preempted jobs from Redis")
logging.WithStacktrace(ctx, err).Error("failed to delete preempted jobs from Redis")
} else {
deleteErrorByJobId := armadamaps.MapKeys(deletionResult, func(job *api.Job) string { return job.Id })
for jobId := range preemptedApiJobsById {
Expand Down Expand Up @@ -704,7 +704,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
}
}
if err := q.usageRepository.UpdateClusterQueueResourceUsage(req.ClusterId, currentExecutorReport); err != nil {
logging.WithStacktrace(ctx.Log, err).Errorf("failed to update cluster usage")
logging.WithStacktrace(ctx, err).Errorf("failed to update cluster usage")
}

allocatedByQueueAndPriorityClassForPool = q.aggregateAllocationAcrossExecutor(reportsByExecutor, req.Pool)
Expand All @@ -728,7 +728,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
}
node, err := nodeDb.GetNode(nodeId)
if err != nil {
logging.WithStacktrace(ctx.Log, err).Warnf("failed to set node id selector on job %s: node with id %s not found", apiJob.Id, nodeId)
logging.WithStacktrace(ctx, err).Warnf("failed to set node id selector on job %s: node with id %s not found", apiJob.Id, nodeId)
continue
}
v := node.Labels[q.schedulingConfig.Preemption.NodeIdLabel]
Expand Down Expand Up @@ -764,7 +764,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
}
node, err := nodeDb.GetNode(nodeId)
if err != nil {
logging.WithStacktrace(ctx.Log, err).Warnf("failed to set node name on job %s: node with id %s not found", apiJob.Id, nodeId)
logging.WithStacktrace(ctx, err).Warnf("failed to set node name on job %s: node with id %s not found", apiJob.Id, nodeId)
continue
}
podSpec.NodeName = node.Name
Expand Down
8 changes: 4 additions & 4 deletions internal/armada/server/submit_from_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ func (srv *SubmitFromLog) Run(ctx *armadacontext.Context) error {
sequence, err := eventutil.UnmarshalEventSequence(ctxWithLogger, msg.Payload())
if err != nil {
srv.ack(ctx, msg)
logging.WithStacktrace(ctxWithLogger.Log, err).Warnf("processing message failed; ignoring")
logging.WithStacktrace(ctxWithLogger, err).Warnf("processing message failed; ignoring")
numErrored++
break
}

ctxWithLogger.Log.WithField("numEvents", len(sequence.Events)).Info("processing sequence")
ctxWithLogger.WithField("numEvents", len(sequence.Events)).Info("processing sequence")
// TODO: Improve retry logic.
srv.ProcessSequence(ctxWithLogger, sequence)
srv.ack(ctx, msg)
Expand All @@ -155,11 +155,11 @@ func (srv *SubmitFromLog) ProcessSequence(ctx *armadacontext.Context, sequence *
for i < len(sequence.Events) && time.Since(lastProgress) < timeout {
j, err := srv.ProcessSubSequence(ctx, i, sequence)
if err != nil {
logging.WithStacktrace(ctx.Log, err).WithFields(logrus.Fields{"lowerIndex": i, "upperIndex": j}).Warnf("processing subsequence failed; ignoring")
logging.WithStacktrace(ctx, err).WithFields(logrus.Fields{"lowerIndex": i, "upperIndex": j}).Warnf("processing subsequence failed; ignoring")
}

if j == i {
ctx.Log.WithFields(logrus.Fields{"lowerIndex": i, "upperIndex": j}).Info("made no progress")
ctx.WithFields(logrus.Fields{"lowerIndex": i, "upperIndex": j}).Info("made no progress")

// We should only get here if a transient error occurs.
// Sleep for a bit before retrying.
Expand Down
38 changes: 19 additions & 19 deletions internal/common/armadacontext/armada_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@ import (
// while retaining type-safety
type Context struct {
context.Context
Log *logrus.Entry
logrus.FieldLogger
}

// Background creates an empty context with a default logger. It is analogous to context.Background()
func Background() *Context {
return &Context{
Context: context.Background(),
Log: logrus.NewEntry(logrus.New()),
Context: context.Background(),
FieldLogger: logrus.NewEntry(logrus.New()),
}
}

// TODO creates an empty context with a default logger. It is analogous to context.TODO()
func TODO() *Context {
return &Context{
Context: context.TODO(),
Log: logrus.NewEntry(logrus.New()),
Context: context.TODO(),
FieldLogger: logrus.NewEntry(logrus.New()),
}
}

Expand All @@ -42,17 +42,17 @@ func FromGrpcCtx(ctx context.Context) *Context {
// New returns an armada context that encapsulates both a go context and a logger
func New(ctx context.Context, log *logrus.Entry) *Context {
return &Context{
Context: ctx,
Log: log,
Context: ctx,
FieldLogger: log,
}
}

// WithCancel returns a copy of parent with a new Done channel. It is analogous to context.WithCancel()
func WithCancel(parent *Context) (*Context, context.CancelFunc) {
c, cancel := context.WithCancel(parent.Context)
return &Context{
Context: c,
Log: parent.Log,
Context: c,
FieldLogger: parent.FieldLogger,
}, cancel
}

Expand All @@ -61,8 +61,8 @@ func WithCancel(parent *Context) (*Context, context.CancelFunc) {
func WithDeadline(parent *Context, d time.Time) (*Context, context.CancelFunc) {
c, cancel := context.WithDeadline(parent.Context, d)
return &Context{
Context: c,
Log: parent.Log,
Context: c,
FieldLogger: parent.FieldLogger,
}, cancel
}

Expand All @@ -74,25 +74,25 @@ func WithTimeout(parent *Context, timeout time.Duration) (*Context, context.Canc
// WithLogField returns a copy of parent with the supplied key-value added to the logger
func WithLogField(parent *Context, key string, val interface{}) *Context {
return &Context{
Context: parent.Context,
Log: parent.Log.WithField(key, val),
Context: parent.Context,
FieldLogger: parent.FieldLogger.WithField(key, val),
}
}

// WithLogFields returns a copy of parent with the supplied key-values added to the logger
func WithLogFields(parent *Context, fields logrus.Fields) *Context {
return &Context{
Context: parent.Context,
Log: parent.Log.WithFields(fields),
Context: parent.Context,
FieldLogger: parent.FieldLogger.WithFields(fields),
}
}

// WithValue returns a copy of parent in which the value associated with key is
// val. It is analogous to context.WithValue()
func WithValue(parent *Context, key, val any) *Context {
return &Context{
Context: context.WithValue(parent, key, val),
Log: parent.Log,
Context: context.WithValue(parent, key, val),
FieldLogger: parent.FieldLogger,
}
}

Expand All @@ -101,7 +101,7 @@ func WithValue(parent *Context, key, val any) *Context {
func ErrGroup(ctx *Context) (*errgroup.Group, *Context) {
group, goctx := errgroup.WithContext(ctx)
return group, &Context{
Context: goctx,
Log: ctx.Log,
Context: goctx,
FieldLogger: ctx.FieldLogger,
}
}
8 changes: 4 additions & 4 deletions internal/common/armadacontext/armada_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ var defaultLogger = logrus.WithField("foo", "bar")

func TestNew(t *testing.T) {
ctx := New(context.Background(), defaultLogger)
require.Equal(t, defaultLogger, ctx.Log)
require.Equal(t, defaultLogger, ctx.FieldLogger)
require.Equal(t, context.Background(), ctx.Context)
}

func TestFromGrpcContext(t *testing.T) {
grpcCtx := ctxlogrus.ToContext(context.Background(), defaultLogger)
ctx := FromGrpcCtx(grpcCtx)
require.Equal(t, grpcCtx, ctx.Context)
require.Equal(t, defaultLogger, ctx.Log)
require.Equal(t, defaultLogger, ctx.FieldLogger)
}

func TestBackground(t *testing.T) {
Expand All @@ -39,13 +39,13 @@ func TestTODO(t *testing.T) {
func TestWithLogField(t *testing.T) {
ctx := WithLogField(Background(), "fish", "chips")
require.Equal(t, context.Background(), ctx.Context)
require.Equal(t, logrus.Fields{"fish": "chips"}, ctx.Log.Data)
require.Equal(t, logrus.Fields{"fish": "chips"}, ctx.FieldLogger.(*logrus.Entry).Data)
}

func TestWithLogFields(t *testing.T) {
ctx := WithLogFields(Background(), logrus.Fields{"fish": "chips", "salt": "pepper"})
require.Equal(t, context.Background(), ctx.Context)
require.Equal(t, logrus.Fields{"fish": "chips", "salt": "pepper"}, ctx.Log.Data)
require.Equal(t, logrus.Fields{"fish": "chips", "salt": "pepper"}, ctx.FieldLogger.(*logrus.Entry).Data)
}

func TestWithTimeout(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions internal/common/logging/stacktrace.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ type stackTracer interface {
StackTrace() errors.StackTrace
}

// WithStacktrace returns a new logrus.Entry obtained by adding error information and, if available, a stack trace
// as fields to the provided logrus.Entry.
func WithStacktrace(logger *logrus.Entry, err error) *logrus.Entry {
// WithStacktrace returns a new logrus.FieldLogger obtained by adding error information and, if available, a stack trace
// as fields to the provided logrus.FieldLogger.
func WithStacktrace(logger logrus.FieldLogger, err error) logrus.FieldLogger {
logger = logger.WithError(err)
if stackErr, ok := err.(stackTracer); ok {
return logger.WithField("stacktrace", stackErr.StackTrace())
Expand Down
4 changes: 2 additions & 2 deletions internal/scheduler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns
if err != nil {
return err
}
ctx.Log.Infof(
ctx.Infof(
"executor currently has %d job runs; sending %d cancellations and %d new runs",
len(requestRuns), len(runsToCancel), len(newRuns),
)
Expand Down Expand Up @@ -226,7 +226,7 @@ func (srv *ExecutorApi) executorFromLeaseRequest(ctx *armadacontext.Context, req
now := srv.clock.Now().UTC()
for _, nodeInfo := range req.Nodes {
if node, err := api.NewNodeFromNodeInfo(nodeInfo, req.ExecutorId, srv.allowedPriorities, now); err != nil {
logging.WithStacktrace(ctx.Log, err).Warnf(
logging.WithStacktrace(ctx, err).Warnf(
"skipping node %s from executor %s", nodeInfo.GetName(), req.GetExecutorId(),
)
} else {
Expand Down
8 changes: 4 additions & 4 deletions internal/scheduler/database/db_pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ func PruneDb(ctx *armadacontext.Context, db *pgx.Conn, batchLimit int, keepAfter
return errors.WithStack(err)
}
if totalJobsToDelete == 0 {
ctx.Log.Infof("Found no jobs to be deleted. Exiting")
ctx.Infof("Found no jobs to be deleted. Exiting")
return nil
}

ctx.Log.Infof("Found %d jobs to be deleted", totalJobsToDelete)
ctx.Infof("Found %d jobs to be deleted", totalJobsToDelete)

// create temp table to hold a batch of results
_, err = db.Exec(ctx, "CREATE TEMP TABLE batch (job_id TEXT);")
Expand Down Expand Up @@ -93,10 +93,10 @@ func PruneDb(ctx *armadacontext.Context, db *pgx.Conn, batchLimit int, keepAfter

taken := time.Now().Sub(batchStart)
jobsDeleted += batchSize
ctx.Log.
ctx.
Infof("Deleted %d jobs in %s. Deleted %d jobs out of %d", batchSize, taken, jobsDeleted, totalJobsToDelete)
}
taken := time.Now().Sub(start)
ctx.Log.Infof("Deleted %d jobs in %s", jobsDeleted, taken)
ctx.Infof("Deleted %d jobs in %s", jobsDeleted, taken)
return nil
}
2 changes: 1 addition & 1 deletion internal/scheduler/database/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func Migrate(ctx *armadacontext.Context, db database.Querier) error {
if err != nil {
return err
}
ctx.Log.Infof("Updated scheduler database in %s", time.Now().Sub(start))
ctx.Infof("Updated scheduler database in %s", time.Now().Sub(start))
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions internal/scheduler/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (lc *KubernetesLeaderController) Run(ctx *armadacontext.Context) error {
return ctx.Err()
default:
lock := lc.getNewLock()
ctx.Log.Infof("attempting to become leader")
ctx.Infof("attempting to become leader")
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
Expand All @@ -154,14 +154,14 @@ func (lc *KubernetesLeaderController) Run(ctx *armadacontext.Context) error {
RetryPeriod: lc.config.RetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(c context.Context) {
ctx.Log.Infof("I am now leader")
ctx.Infof("I am now leader")
lc.token.Store(NewLeaderToken())
for _, listener := range lc.listeners {
listener.onStartedLeading(ctx)
}
},
OnStoppedLeading: func() {
ctx.Log.Infof("I am no longer leader")
ctx.Infof("I am no longer leader")
lc.token.Store(InvalidLeaderToken())
for _, listener := range lc.listeners {
listener.onStoppedLeading()
Expand All @@ -174,7 +174,7 @@ func (lc *KubernetesLeaderController) Run(ctx *armadacontext.Context) error {
},
},
})
ctx.Log.Infof("leader election round finished")
ctx.Infof("leader election round finished")
}
}
}
Expand Down
15 changes: 7 additions & 8 deletions internal/scheduler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,17 @@ func NewMetricsCollector(
// Run enters s a loop which updates the metrics every refreshPeriod until the supplied context is cancelled
func (c *MetricsCollector) Run(ctx *armadacontext.Context) error {
ticker := c.clock.NewTicker(c.refreshPeriod)
ctx.Log.Infof("Will update metrics every %s", c.refreshPeriod)
ctx.Infof("Will update metrics every %s", c.refreshPeriod)
for {
select {
case <-ctx.Done():
ctx.Log.Debugf("Context cancelled, returning..")
ctx.Debugf("Context cancelled, returning..")
return nil
case <-ticker.C():
err := c.refresh(ctx)
if err != nil {
logging.
WithStacktrace(ctx.Log, err).
WithStacktrace(ctx, err).
Warnf("error refreshing metrics state")
}
}
Expand All @@ -111,7 +111,7 @@ func (c *MetricsCollector) Collect(metrics chan<- prometheus.Metric) {
}

func (c *MetricsCollector) refresh(ctx *armadacontext.Context) error {
ctx.Log.Debugf("Refreshing prometheus metrics")
ctx.Debugf("Refreshing prometheus metrics")
start := time.Now()
queueMetrics, err := c.updateQueueMetrics(ctx)
if err != nil {
Expand All @@ -123,7 +123,7 @@ func (c *MetricsCollector) refresh(ctx *armadacontext.Context) error {
}
allMetrics := append(queueMetrics, clusterMetrics...)
c.state.Store(allMetrics)
ctx.Log.Debugf("Refreshed prometheus metrics in %s", time.Since(start))
ctx.Debugf("Refreshed prometheus metrics in %s", time.Since(start))
return nil
}

Expand Down Expand Up @@ -156,8 +156,7 @@ func (c *MetricsCollector) updateQueueMetrics(ctx *armadacontext.Context) ([]pro
}
qs, ok := provider.queueStates[job.Queue()]
if !ok {
ctx.Log.
Warnf("job %s is in queue %s, but this queue does not exist; skipping", job.Id(), job.Queue())
ctx.Warnf("job %s is in queue %s, but this queue does not exist; skipping", job.Id(), job.Queue())
continue
}

Expand All @@ -184,7 +183,7 @@ func (c *MetricsCollector) updateQueueMetrics(ctx *armadacontext.Context) ([]pro
timeInState = currentTime.Sub(time.Unix(0, run.Created()))
recorder = qs.runningJobRecorder
} else {
ctx.Log.Warnf("Job %s is marked as leased but has no runs", job.Id())
ctx.Warnf("Job %s is marked as leased but has no runs", job.Id())
}
recorder.RecordJobRuntime(pool, priorityClass, timeInState)
recorder.RecordResources(pool, priorityClass, jobResources)
Expand Down
Loading

0 comments on commit 6a90c09

Please sign in to comment.