From aac3327938565ae0b2af3ea090d250fb2cc29d59 Mon Sep 17 00:00:00 2001 From: Michael Suchacz Date: Fri, 13 Dec 2024 19:28:12 +0100 Subject: [PATCH] added stream job delete capability --- core/services/job/job_orm_test.go | 12 ++++++++++++ core/services/job/orm.go | 19 ++++++++++++------- core/testdata/testspecs/v2_specs.go | 7 +++++++ 3 files changed, 31 insertions(+), 7 deletions(-) diff --git a/core/services/job/job_orm_test.go b/core/services/job/job_orm_test.go index fd54a39d431..af96c733123 100644 --- a/core/services/job/job_orm_test.go +++ b/core/services/job/job_orm_test.go @@ -444,6 +444,18 @@ func TestORM_DeleteJob_DeletesAssociatedRecords(t *testing.T) { cltest.AssertCount(t, db, "jobs", 0) }) + t.Run("it creates and deletes records for stream jobs", func(t *testing.T) { + ctx := testutils.Context(t) + jb, err := testspecs.GenerateStreamSpecJob(testspecs.GenerateStreamSpec(testspecs.StreamSpecParams{Name: "Test-stream", StreamID: 1})) + require.NoError(t, err) + err = jobORM.CreateJob(ctx, &jb) + require.NoError(t, err) + cltest.AssertCount(t, db, "jobs", 1) + err = jobORM.DeleteJob(ctx, jb.ID, jb.Type) + require.NoError(t, err) + cltest.AssertCount(t, db, "jobs", 0) + }) + t.Run("does not allow to delete external initiators if they have referencing external_initiator_webhook_specs", func(t *testing.T) { // create new db because this will rollback transaction and poison it db := pgtest.NewSqlxDB(t) diff --git a/core/services/job/orm.go b/core/services/job/orm.go index 38e3fa492ce..7c7f8e772ed 100644 --- a/core/services/job/orm.go +++ b/core/services/job/orm.go @@ -747,6 +747,7 @@ func (o *orm) DeleteJob(ctx context.Context, id int32, jobType Type) error { Workflow: `DELETE FROM workflow_specs WHERE id in (SELECT workflow_spec_id FROM deleted_jobs)`, StandardCapabilities: `DELETE FROM standardcapabilities_specs WHERE id in (SELECT standard_capabilities_spec_id FROM deleted_jobs)`, CCIP: `DELETE FROM ccip_specs WHERE id in (SELECT ccip_spec_id FROM deleted_jobs)`, + Stream: ``, } q, ok := queries[jobType] if !ok { @@ -757,7 +758,7 @@ func (o *orm) DeleteJob(ctx context.Context, id int32, jobType Type) error { // and this query was taking ~40secs. ctx, cancel := context.WithTimeout(sqlutil.WithoutDefaultTimeout(ctx), time.Minute) defer cancel() - query := fmt.Sprintf(` + query := ` WITH deleted_jobs AS ( DELETE FROM jobs WHERE id = $1 RETURNING id, @@ -775,15 +776,19 @@ func (o *orm) DeleteJob(ctx context.Context, id int32, jobType Type) error { gateway_spec_id, workflow_spec_id, standard_capabilities_spec_id, - ccip_spec_id - ), - deleted_specific_specs AS ( - %s - ), + ccip_spec_id, + stream_id + ),` + if len(q) > 0 { + query += fmt.Sprintf(`deleted_specific_specs AS ( + %s + ),)`, q) + } + query += ` deleted_job_pipeline_specs AS ( DELETE FROM job_pipeline_specs WHERE job_id IN (SELECT id FROM deleted_jobs) RETURNING pipeline_spec_id ) - DELETE FROM pipeline_specs WHERE id IN (SELECT pipeline_spec_id FROM deleted_job_pipeline_specs)`, q) + DELETE FROM pipeline_specs WHERE id IN (SELECT pipeline_spec_id FROM deleted_job_pipeline_specs)` res, err := o.ds.ExecContext(ctx, query, id) if err != nil { return errors.Wrap(err, "DeleteJob failed to delete job") diff --git a/core/testdata/testspecs/v2_specs.go b/core/testdata/testspecs/v2_specs.go index d519ace6479..823866e1353 100644 --- a/core/testdata/testspecs/v2_specs.go +++ b/core/testdata/testspecs/v2_specs.go @@ -9,6 +9,7 @@ import ( "time" "github.com/google/uuid" + tomlTools "github.com/pelletier/go-toml" "github.com/test-go/testify/require" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" @@ -883,6 +884,12 @@ ds -> ds_parse -> ds_multiply; return StreamSpec{StreamSpecParams: params, toml: toml} } +func GenerateStreamSpecJob(spec StreamSpec) (job.Job, error) { + jb := job.Job{ExternalJobID: uuid.New()} + err := tomlTools.Unmarshal([]byte(spec.Toml()), &jb) + return jb, err +} + // WorkflowJobSpec is a test helper that wraps both the TOML and job.Job representation of a workflow job spec type WorkflowJobSpec struct { toml string