Skip to content

Commit

Permalink
Merge branch 'master' into f/chrisma/remove-config-always-schedule
Browse files Browse the repository at this point in the history
  • Loading branch information
d80tb7 committed Sep 23, 2024
2 parents ec09500 + c3d5faf commit 5962003
Show file tree
Hide file tree
Showing 19 changed files with 293 additions and 381 deletions.
114 changes: 106 additions & 8 deletions internal/lookoutingesterv2/lookoutdb/insertion.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,18 @@ func (l *LookoutDb) Store(ctx *armadacontext.Context, instructions *model.Instru

start := time.Now()
// Jobs need to be ingested first as other updates may reference these
l.CreateJobs(ctx, instructions.JobsToCreate)
wgJobIngestion := sync.WaitGroup{}
wgJobIngestion.Add(2)
go func() {
defer wgJobIngestion.Done()
l.CreateJobs(ctx, instructions.JobsToCreate)
}()
go func() {
defer wgJobIngestion.Done()
l.CreateJobSpecs(ctx, instructions.JobsToCreate)
}()

wgJobIngestion.Wait()

// Now we can job updates, annotations and new job runs
wg := sync.WaitGroup{}
Expand Down Expand Up @@ -98,6 +109,22 @@ func (l *LookoutDb) CreateJobs(ctx *armadacontext.Context, instructions []*model
log.Infof("Inserted %d jobs in %s", len(instructions), taken)
}

func (l *LookoutDb) CreateJobSpecs(ctx *armadacontext.Context, instructions []*model.CreateJobInstruction) {
if len(instructions) == 0 {
return
}
start := time.Now()
err := l.CreateJobSpecsBatch(ctx, instructions)
if err != nil {
log.WithError(err).Warn("Creating job specs via batch failed, will attempt to insert serially (this might be slow).")
l.CreateJobSpecsScalar(ctx, instructions)
}
taken := time.Since(start)
l.metrics.RecordAvRowChangeTimeByOperation("job_spec", commonmetrics.DBOperationInsert, len(instructions), taken)
l.metrics.RecordRowsChange("job_spec", commonmetrics.DBOperationInsert, len(instructions))
log.Infof("Inserted %d job specs in %s", len(instructions), taken)
}

func (l *LookoutDb) UpdateJobs(ctx *armadacontext.Context, instructions []*model.UpdateJobInstruction) {
if len(instructions) == 0 {
return
Expand Down Expand Up @@ -185,7 +212,6 @@ func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []*
state smallint,
last_transition_time timestamp,
last_transition_time_seconds bigint,
job_spec bytea,
priority_class varchar(63),
annotations jsonb
) ON COMMIT DROP;`, tmpTable))
Expand Down Expand Up @@ -213,7 +239,6 @@ func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []*
"state",
"last_transition_time",
"last_transition_time_seconds",
"job_spec",
"priority_class",
"annotations",
},
Expand All @@ -233,7 +258,6 @@ func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []*
instructions[i].State,
instructions[i].LastTransitionTime,
instructions[i].LastTransitionTimeSeconds,
instructions[i].JobProto,
instructions[i].PriorityClass,
instructions[i].Annotations,
}, nil
Expand Down Expand Up @@ -261,7 +285,6 @@ func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []*
state,
last_transition_time,
last_transition_time_seconds,
job_spec,
priority_class,
annotations
) SELECT * from %s
Expand Down Expand Up @@ -294,11 +317,10 @@ func (l *LookoutDb) CreateJobsScalar(ctx *armadacontext.Context, instructions []
state,
last_transition_time,
last_transition_time_seconds,
job_spec,
priority_class,
annotations
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
ON CONFLICT DO NOTHING`
for _, i := range instructions {
err := l.withDatabaseRetryInsert(func() error {
Expand All @@ -317,7 +339,6 @@ func (l *LookoutDb) CreateJobsScalar(ctx *armadacontext.Context, instructions []
i.State,
i.LastTransitionTime,
i.LastTransitionTimeSeconds,
i.JobProto,
i.PriorityClass,
i.Annotations,
)
Expand Down Expand Up @@ -446,6 +467,83 @@ func (l *LookoutDb) UpdateJobsScalar(ctx *armadacontext.Context, instructions []
}
}

func (l *LookoutDb) CreateJobSpecsBatch(ctx *armadacontext.Context, instructions []*model.CreateJobInstruction) error {
return l.withDatabaseRetryInsert(func() error {
tmpTable := "job_spec_create_tmp"

createTmp := func(tx pgx.Tx) error {
_, err := tx.Exec(ctx, fmt.Sprintf(`
CREATE TEMPORARY TABLE %s (
job_id varchar(32),
job_spec bytea
) ON COMMIT DROP;`, tmpTable))
if err != nil {
l.metrics.RecordDBError(commonmetrics.DBOperationCreateTempTable)
}
return err
}

insertTmp := func(tx pgx.Tx) error {
_, err := tx.CopyFrom(ctx,
pgx.Identifier{tmpTable},
[]string{
"job_id",
"job_spec",
},
pgx.CopyFromSlice(len(instructions), func(i int) ([]interface{}, error) {
return []interface{}{
instructions[i].JobId,
instructions[i].JobProto,
}, nil
}),
)
return err
}

copyToDest := func(tx pgx.Tx) error {
_, err := tx.Exec(
ctx,
fmt.Sprintf(`
INSERT INTO job_spec (
job_id,
job_spec
) SELECT * from %s
ON CONFLICT DO NOTHING`, tmpTable),
)
if err != nil {
l.metrics.RecordDBError(commonmetrics.DBOperationInsert)
}
return err
}

return batchInsert(ctx, l.db, createTmp, insertTmp, copyToDest)
})
}

func (l *LookoutDb) CreateJobSpecsScalar(ctx *armadacontext.Context, instructions []*model.CreateJobInstruction) {
sqlStatement := `INSERT INTO job_spec (
job_id,
job_spec
)
VALUES ($1, $2)
ON CONFLICT DO NOTHING`
for _, i := range instructions {
err := l.withDatabaseRetryInsert(func() error {
_, err := l.db.Exec(ctx, sqlStatement,
i.JobId,
i.JobProto,
)
if err != nil {
l.metrics.RecordDBError(commonmetrics.DBOperationInsert)
}
return err
})
if err != nil {
log.WithError(err).Warnf("Create job spec for job %s, jobset %s failed", i.JobId, i.JobSet)
}
}
}

func (l *LookoutDb) CreateJobRunsBatch(ctx *armadacontext.Context, instructions []*model.CreateJobRunInstruction) error {
return l.withDatabaseRetryInsert(func() error {
tmpTable := "job_run_create_tmp"
Expand Down
30 changes: 26 additions & 4 deletions internal/lookoutingesterv2/lookoutdb/insertion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ type JobRow struct {
Annotations map[string]string
}

type JobSpecRow struct {
JobId string
JobProto []byte
}

type JobRunRow struct {
RunId string
JobId string
Expand Down Expand Up @@ -147,7 +152,7 @@ var expectedJobAfterSubmit = JobRow{
State: lookout.JobQueuedOrdinal,
LastTransitionTime: baseTime,
LastTransitionTimeSeconds: baseTime.Unix(),
JobProto: []byte(jobProto),
JobProto: []byte(nil),
Duplicate: false,
PriorityClass: priorityClass,
Annotations: annotations,
Expand All @@ -167,7 +172,7 @@ var expectedJobAfterUpdate = JobRow{
State: lookout.JobFailedOrdinal,
LastTransitionTime: updateTime,
LastTransitionTimeSeconds: updateTime.Unix(),
JobProto: []byte(jobProto),
JobProto: []byte(nil),
Duplicate: false,
PriorityClass: priorityClass,
Annotations: annotations,
Expand Down Expand Up @@ -841,10 +846,10 @@ func TestStoreNullValue(t *testing.T) {
err := ldb.Store(armadacontext.Background(), instructions)
assert.NoError(t, err)

job := getJob(t, ldb.db, jobIdString)
jobSpec := getJobSpec(t, ldb.db, jobIdString)
jobRun := getJobRun(t, ldb.db, runIdString)

assert.Equal(t, jobProto, job.JobProto)
assert.Equal(t, jobProto, jobSpec.JobProto)
assert.Equal(t, errorMsg, jobRun.Error)
assert.Equal(t, debugMsg, jobRun.Debug)
return nil
Expand Down Expand Up @@ -988,6 +993,23 @@ func getJob(t *testing.T, db *pgxpool.Pool, jobId string) JobRow {
return job
}

func getJobSpec(t *testing.T, db *pgxpool.Pool, jobId string) JobSpecRow {
jobSpec := JobSpecRow{}
r := db.QueryRow(
armadacontext.Background(),
`SELECT
job_id,
job_spec
FROM job_spec WHERE job_id = $1`,
jobId)
err := r.Scan(
&jobSpec.JobId,
&jobSpec.JobProto,
)
assert.Nil(t, err)
return jobSpec
}

func getJobRun(t *testing.T, db *pgxpool.Pool, runId string) JobRunRow {
run := JobRunRow{}
r := db.QueryRow(
Expand Down
1 change: 1 addition & 0 deletions internal/lookoutv2/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func deleteBatch(ctx *armadacontext.Context, tx pgx.Tx, batchLimit int) (int, er
}
_, err = tx.Exec(ctx, `
DELETE FROM job WHERE job_id in (SELECT job_id from batch);
DELETE FROM job_spec WHERE job_id in (SELECT job_id from batch);
DELETE FROM job_run WHERE job_id in (SELECT job_id from batch);
DELETE FROM job_ids_to_delete WHERE job_id in (SELECT job_id from batch);
TRUNCATE TABLE batch;`)
Expand Down
1 change: 1 addition & 0 deletions internal/lookoutv2/pruner/pruner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func TestPruneDb(t *testing.T) {

queriedJobIdsPerTable := []map[string]bool{
selectStringSet(t, db, "SELECT job_id FROM job"),
selectStringSet(t, db, "SELECT job_id FROM job_spec"),
selectStringSet(t, db, "SELECT DISTINCT job_id FROM job_run"),
}
for _, queriedJobs := range queriedJobIdsPerTable {
Expand Down
11 changes: 9 additions & 2 deletions internal/lookoutv2/repository/getjobspec.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,17 @@ func NewSqlGetJobSpecRepository(db *pgxpool.Pool, decompressor compress.Decompre

func (r *SqlGetJobSpecRepository) GetJobSpec(ctx *armadacontext.Context, jobId string) (*api.Job, error) {
var rawBytes []byte
err := r.db.QueryRow(ctx, "SELECT job_spec FROM job WHERE job_id = $1", jobId).Scan(&rawBytes)

err := r.db.QueryRow(
ctx, `
SELECT
COALESCE(job_spec.job_spec, job.job_spec)
FROM job LEFT JOIN job_spec
ON job.job_id = job_spec.job_id
WHERE job.job_id = $1`, jobId).Scan(&rawBytes)
if err != nil {
if err == pgx.ErrNoRows {
return nil, errors.Errorf("job with id %s not found", jobId)
return nil, errors.Errorf("job_spec with job id %s not found", jobId)
}
return nil, err
}
Expand Down
79 changes: 79 additions & 0 deletions internal/lookoutv2/repository/getjobspec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package repository
import (
"testing"

"github.com/gogo/protobuf/proto"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/stretchr/testify/assert"

Expand Down Expand Up @@ -50,6 +51,84 @@ func TestGetJobSpec(t *testing.T) {
assert.NoError(t, err)
}

func TestMIGRATEDGetJobSpec(t *testing.T) {
var migratedResult *api.Job
err := lookout.WithLookoutDb(func(db *pgxpool.Pool) error {
converter := instructions.NewInstructionConverter(metrics.Get().Metrics, userAnnotationPrefix, &compress.NoOpCompressor{})
store := lookoutdb.NewLookoutDb(db, nil, metrics.Get(), 10)

_ = NewJobSimulator(converter, store).
Submit(queue, jobSet, owner, namespace, baseTime, &JobOptions{
JobId: jobId,
Priority: priority,
PriorityClass: "other-default",
Cpu: cpu,
Memory: memory,
EphemeralStorage: ephemeralStorage,
Gpu: gpu,
Annotations: map[string]string{
"step_path": "/1/2/3",
"hello": "world",
},
}).
Pending(runId, cluster, baseTime).
Running(runId, node, baseTime).
RunSucceeded(runId, baseTime).
Succeeded(baseTime).
Build().
ApiJob()

repo := NewSqlGetJobSpecRepository(db, &compress.NoOpDecompressor{})
var err error
migratedResult, err = repo.GetJobSpec(armadacontext.TODO(), jobId)
assert.NoError(t, err)
return nil
})
assert.NoError(t, err)

var result *api.Job
err = lookout.WithLookoutDb(func(db *pgxpool.Pool) error {
bytes, err := proto.Marshal(migratedResult)
assert.NoError(t, err)

_, err = db.Exec(armadacontext.Background(),
`INSERT INTO job (
job_id, queue, owner, namespace, jobset,
cpu,
memory,
ephemeral_storage,
gpu,
priority,
submitted,
state,
last_transition_time,
last_transition_time_seconds,
job_spec,
priority_class,
annotations
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)
ON CONFLICT DO NOTHING`,
jobId, queue, owner, namespace, jobSet,
int64(15), int64(48*1024*1024*1024), int64(100*1024*1024*1024), 8,
priority, baseTime, 1, baseTime, baseTime.Unix(), bytes, "other-default",
map[string]string{
"step_path": "/1/2/3",
"hello": "world",
})
assert.NoError(t, err)

repo := NewSqlGetJobSpecRepository(db, &compress.NoOpDecompressor{})
result, err = repo.GetJobSpec(armadacontext.TODO(), jobId)
assert.NoError(t, err)

return nil
})
assert.NoError(t, err)

assertApiJobsEquivalent(t, migratedResult, result)
}

func TestGetJobSpecError(t *testing.T) {
err := lookout.WithLookoutDb(func(db *pgxpool.Pool) error {
repo := NewSqlGetJobSpecRepository(db, &compress.NoOpDecompressor{})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE IF NOT EXISTS job_spec (
job_id varchar(32) NOT NULL PRIMARY KEY,
job_spec bytea NOT NULL
);
ALTER TABLE job_spec ALTER COLUMN job_spec SET STORAGE EXTERNAL;
ALTER TABLE job ALTER COLUMN job_spec DROP NOT NULL;
Loading

0 comments on commit 5962003

Please sign in to comment.