Skip to content

Commit

Permalink
Remove Repeatable From Lookout queries (#3780)
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Martin <chris@cmartinit.co.uk>
Co-authored-by: Chris Martin <chris@cmartinit.co.uk>
  • Loading branch information
d80tb7 and d80tb7 authored Jul 11, 2024
1 parent c4af2fc commit f6f44b0
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 70 deletions.
2 changes: 1 addition & 1 deletion internal/armada/queryapi/query_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (q *QueryApi) GetJobDetails(ctx context.Context, req *api.JobDetailsRequest
detailsById := make(map[string]*api.JobDetails)

err := pgx.BeginTxFunc(ctx, q.db, pgx.TxOptions{
IsoLevel: pgx.RepeatableRead,
IsoLevel: pgx.ReadCommitted,
AccessMode: pgx.ReadOnly,
DeferrableMode: pgx.Deferrable,
}, func(tx pgx.Tx) error {
Expand Down
106 changes: 49 additions & 57 deletions internal/lookoutv2/repository/getjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"k8s.io/utils/clock"

Expand Down Expand Up @@ -69,66 +68,59 @@ func (r *SqlGetJobsRepository) getJobs(ctx *armadacontext.Context, filters []*mo
}
logQuery(query, "GetJobs")
var jobs []*model.Job
if err := pgx.BeginTxFunc(ctx, r.db, pgx.TxOptions{
IsoLevel: pgx.RepeatableRead,
DeferrableMode: pgx.Deferrable,
}, func(tx pgx.Tx) error {
rows, err := tx.Query(ctx, query.Sql, query.Args...)
if err != nil {
return err

rows, err := r.db.Query(ctx, query.Sql, query.Args...)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var row jobRow
var annotations sql.NullString
var runs sql.NullString
if err := rows.Scan(
&row.jobId,
&row.queue,
&row.owner,
&row.namespace,
&row.jobSet,
&row.cpu,
&row.memory,
&row.ephemeralStorage,
&row.gpu,
&row.priority,
&row.submitted,
&row.cancelled,
&row.state,
&row.lastTransitionTime,
&row.duplicate,
&row.priorityClass,
&row.latestRunId,
&row.cancelReason,
&annotations,
&runs,
); err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var row jobRow
var annotations sql.NullString
var runs sql.NullString
if err := rows.Scan(
&row.jobId,
&row.queue,
&row.owner,
&row.namespace,
&row.jobSet,
&row.cpu,
&row.memory,
&row.ephemeralStorage,
&row.gpu,
&row.priority,
&row.submitted,
&row.cancelled,
&row.state,
&row.lastTransitionTime,
&row.duplicate,
&row.priorityClass,
&row.latestRunId,
&row.cancelReason,
&annotations,
&runs,
); err != nil {
return err
}
job := jobRowToModel(&row)
if annotations.Valid {
if err := json.Unmarshal([]byte(annotations.String), &job.Annotations); err != nil {
return err
}
job := jobRowToModel(&row)
if annotations.Valid {
if err := json.Unmarshal([]byte(annotations.String), &job.Annotations); err != nil {
return nil, err
}
if runs.Valid {
if err := json.Unmarshal([]byte(runs.String), &job.Runs); err != nil {
return err
}
}
if len(job.Runs) > 0 {
lastRun := job.Runs[len(job.Runs)-1] // Get the last run
job.Node = lastRun.Node
job.Cluster = lastRun.Cluster
job.ExitCode = lastRun.ExitCode
job.RuntimeSeconds = calculateJobRuntime(lastRun.Started, lastRun.Finished, r.clock)
}
if runs.Valid {
if err := json.Unmarshal([]byte(runs.String), &job.Runs); err != nil {
return nil, err
}
jobs = append(jobs, job)
}
return nil
}); err != nil {
return nil, err
if len(job.Runs) > 0 {
lastRun := job.Runs[len(job.Runs)-1] // Get the last run
job.Node = lastRun.Node
job.Cluster = lastRun.Cluster
job.ExitCode = lastRun.ExitCode
job.RuntimeSeconds = calculateJobRuntime(lastRun.Started, lastRun.Finished, r.clock)
}
jobs = append(jobs, job)
}
return &GetJobsResult{Jobs: jobs}, nil
}
Expand Down
18 changes: 6 additions & 12 deletions internal/lookoutv2/repository/groupjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,12 @@ func (r *SqlGroupJobsRepository) GroupBy(

var groups []*model.JobGroup

if err := pgx.BeginTxFunc(ctx, r.db, pgx.TxOptions{
IsoLevel: pgx.RepeatableRead,
AccessMode: pgx.ReadOnly,
DeferrableMode: pgx.Deferrable,
}, func(tx pgx.Tx) error {
groupRows, err := tx.Query(ctx, query.Sql, query.Args...)
if err != nil {
return err
}
groups, err = rowsToGroups(groupRows, groupedField, aggregates, filters)
return err
}); err != nil {
groupRows, err := r.db.Query(ctx, query.Sql, query.Args...)
if err != nil {
return nil, err
}
groups, err = rowsToGroups(groupRows, groupedField, aggregates, filters)
if err != nil {
return nil, err
}

Expand Down

0 comments on commit f6f44b0

Please sign in to comment.