From f4a17af9ca27c80c44bc49334d9bf34a352bafdf Mon Sep 17 00:00:00 2001 From: Sam Date: Fri, 26 Jan 2024 11:23:37 -0500 Subject: [PATCH] Use uint64 for stream IDs (#11901) * Use uint64 for stream IDs * Reduce size of index --- core/services/job/models.go | 1 + core/services/job/orm.go | 8 ++-- core/services/streams/delegate.go | 14 +++--- core/services/streams/delegate_test.go | 35 +++++++++++--- core/services/streams/stream_registry.go | 4 +- core/services/streams/stream_registry_test.go | 48 +++++++++---------- core/services/streams/stream_test.go | 2 +- .../migrations/0222_jobs_stream_id.sql | 7 +++ core/testdata/testspecs/v2_specs.go | 6 ++- core/web/jobs_controller_test.go | 3 +- core/web/presenters/job.go | 2 + 11 files changed, 82 insertions(+), 48 deletions(-) create mode 100644 core/store/migrate/migrations/0222_jobs_stream_id.sql diff --git a/core/services/job/models.go b/core/services/job/models.go index 17bac545be1..cde1e7a740d 100644 --- a/core/services/job/models.go +++ b/core/services/job/models.go @@ -126,6 +126,7 @@ var ( type Job struct { ID int32 `toml:"-"` ExternalJobID uuid.UUID `toml:"externalJobID"` + StreamID *uint64 `toml:"streamID"` OCROracleSpecID *int32 OCROracleSpec *OCROracleSpec OCR2OracleSpecID *int32 diff --git a/core/services/job/orm.go b/core/services/job/orm.go index 82c6be2963c..475b749dfc5 100644 --- a/core/services/job/orm.go +++ b/core/services/job/orm.go @@ -530,18 +530,18 @@ func (o *orm) InsertJob(job *Job, qopts ...pg.QOpt) error { // if job has id, emplace otherwise insert with a new id. if job.ID == 0 { - query = `INSERT INTO jobs (pipeline_spec_id, name, schema_version, type, max_task_duration, ocr_oracle_spec_id, ocr2_oracle_spec_id, direct_request_spec_id, flux_monitor_spec_id, + query = `INSERT INTO jobs (pipeline_spec_id, name, stream_id, schema_version, type, max_task_duration, ocr_oracle_spec_id, ocr2_oracle_spec_id, direct_request_spec_id, flux_monitor_spec_id, keeper_spec_id, cron_spec_id, vrf_spec_id, webhook_spec_id, blockhash_store_spec_id, bootstrap_spec_id, block_header_feeder_spec_id, gateway_spec_id, legacy_gas_station_server_spec_id, legacy_gas_station_sidecar_spec_id, external_job_id, gas_limit, forwarding_allowed, created_at) - VALUES (:pipeline_spec_id, :name, :schema_version, :type, :max_task_duration, :ocr_oracle_spec_id, :ocr2_oracle_spec_id, :direct_request_spec_id, :flux_monitor_spec_id, + VALUES (:pipeline_spec_id, :name, :stream_id, :schema_version, :type, :max_task_duration, :ocr_oracle_spec_id, :ocr2_oracle_spec_id, :direct_request_spec_id, :flux_monitor_spec_id, :keeper_spec_id, :cron_spec_id, :vrf_spec_id, :webhook_spec_id, :blockhash_store_spec_id, :bootstrap_spec_id, :block_header_feeder_spec_id, :gateway_spec_id, :legacy_gas_station_server_spec_id, :legacy_gas_station_sidecar_spec_id, :external_job_id, :gas_limit, :forwarding_allowed, NOW()) RETURNING *;` } else { - query = `INSERT INTO jobs (id, pipeline_spec_id, name, schema_version, type, max_task_duration, ocr_oracle_spec_id, ocr2_oracle_spec_id, direct_request_spec_id, flux_monitor_spec_id, + query = `INSERT INTO jobs (id, pipeline_spec_id, name, stream_id, schema_version, type, max_task_duration, ocr_oracle_spec_id, ocr2_oracle_spec_id, direct_request_spec_id, flux_monitor_spec_id, keeper_spec_id, cron_spec_id, vrf_spec_id, webhook_spec_id, blockhash_store_spec_id, bootstrap_spec_id, block_header_feeder_spec_id, gateway_spec_id, legacy_gas_station_server_spec_id, legacy_gas_station_sidecar_spec_id, external_job_id, gas_limit, forwarding_allowed, created_at) - VALUES (:id, :pipeline_spec_id, :name, :schema_version, :type, :max_task_duration, :ocr_oracle_spec_id, :ocr2_oracle_spec_id, :direct_request_spec_id, :flux_monitor_spec_id, + VALUES (:id, :pipeline_spec_id, :name, :stream_id, :schema_version, :type, :max_task_duration, :ocr_oracle_spec_id, :ocr2_oracle_spec_id, :direct_request_spec_id, :flux_monitor_spec_id, :keeper_spec_id, :cron_spec_id, :vrf_spec_id, :webhook_spec_id, :blockhash_store_spec_id, :bootstrap_spec_id, :block_header_feeder_spec_id, :gateway_spec_id, :legacy_gas_station_server_spec_id, :legacy_gas_station_sidecar_spec_id, :external_job_id, :gas_limit, :forwarding_allowed, NOW()) RETURNING *;` diff --git a/core/services/streams/delegate.go b/core/services/streams/delegate.go index b62ceb9857b..4e83aed2b94 100644 --- a/core/services/streams/delegate.go +++ b/core/services/streams/delegate.go @@ -31,7 +31,7 @@ type Delegate struct { var _ job.Delegate = (*Delegate)(nil) func NewDelegate(lggr logger.Logger, registry Registry, runner ocrcommon.Runner, cfg DelegateConfig) *Delegate { - return &Delegate{lggr, registry, runner, cfg} + return &Delegate{lggr.Named("StreamsDelegate"), registry, runner, cfg} } func (d *Delegate) JobType() job.Type { @@ -44,11 +44,11 @@ func (d *Delegate) BeforeJobDeleted(jb job.Job) {} func (d *Delegate) OnDeleteJob(jb job.Job, q pg.Queryer) error { return nil } func (d *Delegate) ServicesForSpec(jb job.Job) (services []job.ServiceCtx, err error) { - if !jb.Name.Valid { - return nil, errors.New("job name is required to be present for stream specs") + if jb.StreamID == nil { + return nil, errors.New("streamID is required to be present for stream specs") } - id := jb.Name.String - lggr := d.lggr.Named(id).With("streamID", id) + id := *jb.StreamID + lggr := d.lggr.With("streamID", id) rrs := ocrcommon.NewResultRunSaver(d.runner, lggr, d.cfg.MaxSuccessfulRuns(), d.cfg.ResultWriteQueueDepth()) services = append(services, rrs, &StreamService{ @@ -102,8 +102,8 @@ func ValidatedStreamSpec(tomlString string) (job.Job, error) { return jb, errors.Errorf("unsupported type: %q", jb.Type) } - if !jb.Name.Valid { - return jb, errors.New("jobs of type 'stream' require a non-blank name as stream ID") + if jb.StreamID == nil { + return jb, errors.New("jobs of type 'stream' require streamID to be specified") } return jb, nil diff --git a/core/services/streams/delegate_test.go b/core/services/streams/delegate_test.go index 77b10260375..d6d1ca68876 100644 --- a/core/services/streams/delegate_test.go +++ b/core/services/streams/delegate_test.go @@ -10,7 +10,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gopkg.in/guregu/null.v4" ) type mockRegistry struct{} @@ -35,11 +34,11 @@ func Test_Delegate(t *testing.T) { t.Run("ServicesForSpec", func(t *testing.T) { jb := job.Job{PipelineSpec: &pipeline.Spec{ID: 1}} - t.Run("errors if job is missing name", func(t *testing.T) { + t.Run("errors if job is missing streamID", func(t *testing.T) { _, err := d.ServicesForSpec(jb) - assert.EqualError(t, err, "job name is required to be present for stream specs") + assert.EqualError(t, err, "streamID is required to be present for stream specs") }) - jb.Name = null.StringFrom("jobname") + jb.StreamID = ptr(uint64(42)) t.Run("returns services", func(t *testing.T) { srvs, err := d.ServicesForSpec(jb) require.NoError(t, err) @@ -49,7 +48,7 @@ func Test_Delegate(t *testing.T) { strmSrv := srvs[1].(*StreamService) assert.Equal(t, registry, strmSrv.registry) - assert.Equal(t, StreamID("jobname"), strmSrv.id) + assert.Equal(t, StreamID(42), strmSrv.id) assert.Equal(t, jb.PipelineSpec, strmSrv.spec) assert.NotNil(t, strmSrv.lggr) assert.Equal(t, srvs[0], strmSrv.rrs) @@ -67,6 +66,7 @@ func Test_ValidatedStreamSpec(t *testing.T) { name: "minimal stream spec", toml: ` type = "stream" +streamID = 12345 name = "voter-turnout" schemaVersion = 1 observationSource = """ @@ -82,6 +82,8 @@ answer1 [type=median index=0]; assert.Equal(t, job.Type("stream"), jb.Type) assert.Equal(t, uint32(1), jb.SchemaVersion) assert.True(t, jb.Name.Valid) + require.NotNil(t, jb.StreamID) + assert.Equal(t, uint64(12345), *jb.StreamID) assert.Equal(t, "voter-turnout", jb.Name.String) }, }, @@ -134,10 +136,11 @@ answer1 [type=median index=0]; }, }, { - name: "error if missing name", + name: "no error if missing name", toml: ` type = "stream" schemaVersion = 1 +streamID = 12345 observationSource = """ ds1 [type=bridge name=voter_turnout]; ds1_parse [type=jsonparse path="one,two"]; @@ -147,7 +150,24 @@ answer1 [type=median index=0]; """ `, assertion: func(t *testing.T, jb job.Job, err error) { - assert.EqualError(t, err, "jobs of type 'stream' require a non-blank name as stream ID") + require.NoError(t, err) + }, + }, + { + name: "error if missing streamID", + toml: ` +type = "stream" +schemaVersion = 1 +observationSource = """ +ds1 [type=bridge name=voter_turnout]; +ds1_parse [type=jsonparse path="one,two"]; +ds1_multiply [type=multiply times=1.23]; +ds1 -> ds1_parse -> ds1_multiply -> answer1; +answer1 [type=median index=0]; +""" +`, + assertion: func(t *testing.T, jb job.Job, err error) { + assert.EqualError(t, err, "jobs of type 'stream' require streamID to be specified") }, }, } @@ -159,3 +179,4 @@ answer1 [type=median index=0]; }) } } +func ptr[T any](t T) *T { return &t } diff --git a/core/services/streams/stream_registry.go b/core/services/streams/stream_registry.go index c79c6c4e043..0c822010a53 100644 --- a/core/services/streams/stream_registry.go +++ b/core/services/streams/stream_registry.go @@ -8,7 +8,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" ) -type StreamID = string +type StreamID = uint64 type Registry interface { Get(streamID StreamID) (strm Stream, exists bool) @@ -47,7 +47,7 @@ func (s *streamRegistry) Register(streamID StreamID, spec pipeline.Spec, rrs Res s.Lock() defer s.Unlock() if _, exists := s.streams[streamID]; exists { - return fmt.Errorf("stream already registered for id: %q", streamID) + return fmt.Errorf("stream already registered for id: %d", streamID) } s.streams[streamID] = NewStream(s.lggr, streamID, spec, s.runner, rrs) return nil diff --git a/core/services/streams/stream_registry_test.go b/core/services/streams/stream_registry_test.go index 2c7c2bd6ecc..738b68f5d4d 100644 --- a/core/services/streams/stream_registry_test.go +++ b/core/services/streams/stream_registry_test.go @@ -28,23 +28,23 @@ func Test_Registry(t *testing.T) { t.Run("Get", func(t *testing.T) { sr := newRegistry(lggr, runner) - sr.streams["foo"] = &mockStream{run: &pipeline.Run{ID: 1}} - sr.streams["bar"] = &mockStream{run: &pipeline.Run{ID: 2}} - sr.streams["baz"] = &mockStream{run: &pipeline.Run{ID: 3}} + sr.streams[1] = &mockStream{run: &pipeline.Run{ID: 1}} + sr.streams[2] = &mockStream{run: &pipeline.Run{ID: 2}} + sr.streams[3] = &mockStream{run: &pipeline.Run{ID: 3}} - v, exists := sr.Get("foo") + v, exists := sr.Get(1) assert.True(t, exists) - assert.Equal(t, sr.streams["foo"], v) + assert.Equal(t, sr.streams[1], v) - v, exists = sr.Get("bar") + v, exists = sr.Get(2) assert.True(t, exists) - assert.Equal(t, sr.streams["bar"], v) + assert.Equal(t, sr.streams[2], v) - v, exists = sr.Get("baz") + v, exists = sr.Get(3) assert.True(t, exists) - assert.Equal(t, sr.streams["baz"], v) + assert.Equal(t, sr.streams[3], v) - v, exists = sr.Get("qux") + v, exists = sr.Get(4) assert.Nil(t, v) assert.False(t, exists) }) @@ -53,54 +53,54 @@ func Test_Registry(t *testing.T) { t.Run("registers new stream", func(t *testing.T) { assert.Len(t, sr.streams, 0) - err := sr.Register("foo", pipeline.Spec{ID: 32, DotDagSource: "source"}, nil) + err := sr.Register(1, pipeline.Spec{ID: 32, DotDagSource: "source"}, nil) require.NoError(t, err) assert.Len(t, sr.streams, 1) - v, exists := sr.Get("foo") + v, exists := sr.Get(1) require.True(t, exists) strm := v.(*stream) - assert.Equal(t, StreamID("foo"), strm.id) + assert.Equal(t, StreamID(1), strm.id) assert.Equal(t, int32(32), strm.spec.ID) }) t.Run("errors when attempt to re-register a stream with an existing ID", func(t *testing.T) { assert.Len(t, sr.streams, 1) - err := sr.Register("foo", pipeline.Spec{ID: 33, DotDagSource: "source"}, nil) + err := sr.Register(1, pipeline.Spec{ID: 33, DotDagSource: "source"}, nil) require.Error(t, err) assert.Len(t, sr.streams, 1) - assert.EqualError(t, err, "stream already registered for id: \"foo\"") + assert.EqualError(t, err, "stream already registered for id: 1") - v, exists := sr.Get("foo") + v, exists := sr.Get(1) require.True(t, exists) strm := v.(*stream) - assert.Equal(t, StreamID("foo"), strm.id) + assert.Equal(t, StreamID(1), strm.id) assert.Equal(t, int32(32), strm.spec.ID) }) }) t.Run("Unregister", func(t *testing.T) { sr := newRegistry(lggr, runner) - sr.streams["foo"] = &mockStream{run: &pipeline.Run{ID: 1}} - sr.streams["bar"] = &mockStream{run: &pipeline.Run{ID: 2}} - sr.streams["baz"] = &mockStream{run: &pipeline.Run{ID: 3}} + sr.streams[1] = &mockStream{run: &pipeline.Run{ID: 1}} + sr.streams[2] = &mockStream{run: &pipeline.Run{ID: 2}} + sr.streams[3] = &mockStream{run: &pipeline.Run{ID: 3}} t.Run("unregisters a stream", func(t *testing.T) { assert.Len(t, sr.streams, 3) - sr.Unregister("foo") + sr.Unregister(1) assert.Len(t, sr.streams, 2) - _, exists := sr.streams["foo"] + _, exists := sr.streams[1] assert.False(t, exists) }) t.Run("no effect when unregistering a non-existent stream", func(t *testing.T) { assert.Len(t, sr.streams, 2) - sr.Unregister("foo") + sr.Unregister(1) assert.Len(t, sr.streams, 2) - _, exists := sr.streams["foo"] + _, exists := sr.streams[1] assert.False(t, exists) }) }) diff --git a/core/services/streams/stream_test.go b/core/services/streams/stream_test.go index 3a556411bc6..3c0b4d0721f 100644 --- a/core/services/streams/stream_test.go +++ b/core/services/streams/stream_test.go @@ -59,7 +59,7 @@ func Test_Stream(t *testing.T) { lggr := logger.TestLogger(t) runner := &mockRunner{} spec := pipeline.Spec{} - id := StreamID("stream-id-foo") + id := StreamID(123) ctx := testutils.Context(t) t.Run("Run", func(t *testing.T) { diff --git a/core/store/migrate/migrations/0222_jobs_stream_id.sql b/core/store/migrate/migrations/0222_jobs_stream_id.sql new file mode 100644 index 00000000000..5732011cb61 --- /dev/null +++ b/core/store/migrate/migrations/0222_jobs_stream_id.sql @@ -0,0 +1,7 @@ +-- +goose Up +ALTER TABLE jobs ADD COLUMN stream_id BIGINT; +CREATE UNIQUE INDEX idx_jobs_unique_stream_id ON jobs(stream_id) WHERE stream_id IS NOT NULL; + +-- +goose Down +ALTER TABLE jobs DROP COLUMN stream_id; + diff --git a/core/testdata/testspecs/v2_specs.go b/core/testdata/testspecs/v2_specs.go index e66971a7a11..a78f5d8530c 100644 --- a/core/testdata/testspecs/v2_specs.go +++ b/core/testdata/testspecs/v2_specs.go @@ -830,7 +830,8 @@ storeBlockhashesBatchSize = %d } type StreamSpecParams struct { - Name string + Name string + StreamID uint64 } type StreamSpec struct { @@ -848,6 +849,7 @@ func GenerateStreamSpec(params StreamSpecParams) StreamSpec { type = "stream" schemaVersion = 1 name = "%s" +streamID = %d observationSource = """ ds [type=http method=GET url="https://chain.link/ETH-USD"]; ds_parse [type=jsonparse path="data,price"]; @@ -856,6 +858,6 @@ ds -> ds_parse -> ds_multiply; """ ` - toml := fmt.Sprintf(template, params.Name) + toml := fmt.Sprintf(template, params.Name, params.StreamID) return StreamSpec{StreamSpecParams: params, toml: toml} } diff --git a/core/web/jobs_controller_test.go b/core/web/jobs_controller_test.go index eec58c30571..0dc04ff34e8 100644 --- a/core/web/jobs_controller_test.go +++ b/core/web/jobs_controller_test.go @@ -369,7 +369,7 @@ func TestJobController_Create_HappyPath(t *testing.T) { { name: "stream", tomlTemplate: func(_ string) string { - return testspecs.GenerateStreamSpec(testspecs.StreamSpecParams{Name: "ETH/USD"}).Toml() + return testspecs.GenerateStreamSpec(testspecs.StreamSpecParams{Name: "ETH/USD", StreamID: 32}).Toml() }, assertion: func(t *testing.T, nameAndExternalJobID string, r *http.Response) { require.Equal(t, http.StatusOK, r.StatusCode) @@ -384,6 +384,7 @@ func TestJobController_Create_HappyPath(t *testing.T) { assert.NotNil(t, resource.PipelineSpec.DotDAGSource) assert.Equal(t, jb.Name.ValueOrZero(), resource.Name) + assert.Equal(t, jb.StreamID, resource.StreamID) }, }, } diff --git a/core/web/presenters/job.go b/core/web/presenters/job.go index d0a6cfb5ca9..c97314495b2 100644 --- a/core/web/presenters/job.go +++ b/core/web/presenters/job.go @@ -452,6 +452,7 @@ func NewJobError(e job.SpecError) JobError { type JobResource struct { JAID Name string `json:"name"` + StreamID *uint64 `json:"streamID,omitempty"` Type JobSpecType `json:"type"` SchemaVersion uint32 `json:"schemaVersion"` GasLimit clnull.Uint32 `json:"gasLimit"` @@ -479,6 +480,7 @@ func NewJobResource(j job.Job) *JobResource { resource := &JobResource{ JAID: NewJAIDInt32(j.ID), Name: j.Name.ValueOrZero(), + StreamID: j.StreamID, Type: JobSpecType(j.Type), SchemaVersion: j.SchemaVersion, GasLimit: j.GasLimit,