Skip to content

Commit

Permalink
Use uint64 for stream IDs (#11901)
Browse files Browse the repository at this point in the history
* Use uint64 for stream IDs

* Reduce size of index
  • Loading branch information
samsondav authored Jan 26, 2024
1 parent c76a01e commit f4a17af
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 48 deletions.
1 change: 1 addition & 0 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ var (
type Job struct {
ID int32 `toml:"-"`
ExternalJobID uuid.UUID `toml:"externalJobID"`
StreamID *uint64 `toml:"streamID"`
OCROracleSpecID *int32
OCROracleSpec *OCROracleSpec
OCR2OracleSpecID *int32
Expand Down
8 changes: 4 additions & 4 deletions core/services/job/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,18 +530,18 @@ func (o *orm) InsertJob(job *Job, qopts ...pg.QOpt) error {

// if job has id, emplace otherwise insert with a new id.
if job.ID == 0 {
query = `INSERT INTO jobs (pipeline_spec_id, name, schema_version, type, max_task_duration, ocr_oracle_spec_id, ocr2_oracle_spec_id, direct_request_spec_id, flux_monitor_spec_id,
query = `INSERT INTO jobs (pipeline_spec_id, name, stream_id, schema_version, type, max_task_duration, ocr_oracle_spec_id, ocr2_oracle_spec_id, direct_request_spec_id, flux_monitor_spec_id,
keeper_spec_id, cron_spec_id, vrf_spec_id, webhook_spec_id, blockhash_store_spec_id, bootstrap_spec_id, block_header_feeder_spec_id, gateway_spec_id,
legacy_gas_station_server_spec_id, legacy_gas_station_sidecar_spec_id, external_job_id, gas_limit, forwarding_allowed, created_at)
VALUES (:pipeline_spec_id, :name, :schema_version, :type, :max_task_duration, :ocr_oracle_spec_id, :ocr2_oracle_spec_id, :direct_request_spec_id, :flux_monitor_spec_id,
VALUES (:pipeline_spec_id, :name, :stream_id, :schema_version, :type, :max_task_duration, :ocr_oracle_spec_id, :ocr2_oracle_spec_id, :direct_request_spec_id, :flux_monitor_spec_id,
:keeper_spec_id, :cron_spec_id, :vrf_spec_id, :webhook_spec_id, :blockhash_store_spec_id, :bootstrap_spec_id, :block_header_feeder_spec_id, :gateway_spec_id,
:legacy_gas_station_server_spec_id, :legacy_gas_station_sidecar_spec_id, :external_job_id, :gas_limit, :forwarding_allowed, NOW())
RETURNING *;`
} else {
query = `INSERT INTO jobs (id, pipeline_spec_id, name, schema_version, type, max_task_duration, ocr_oracle_spec_id, ocr2_oracle_spec_id, direct_request_spec_id, flux_monitor_spec_id,
query = `INSERT INTO jobs (id, pipeline_spec_id, name, stream_id, schema_version, type, max_task_duration, ocr_oracle_spec_id, ocr2_oracle_spec_id, direct_request_spec_id, flux_monitor_spec_id,
keeper_spec_id, cron_spec_id, vrf_spec_id, webhook_spec_id, blockhash_store_spec_id, bootstrap_spec_id, block_header_feeder_spec_id, gateway_spec_id,
legacy_gas_station_server_spec_id, legacy_gas_station_sidecar_spec_id, external_job_id, gas_limit, forwarding_allowed, created_at)
VALUES (:id, :pipeline_spec_id, :name, :schema_version, :type, :max_task_duration, :ocr_oracle_spec_id, :ocr2_oracle_spec_id, :direct_request_spec_id, :flux_monitor_spec_id,
VALUES (:id, :pipeline_spec_id, :name, :stream_id, :schema_version, :type, :max_task_duration, :ocr_oracle_spec_id, :ocr2_oracle_spec_id, :direct_request_spec_id, :flux_monitor_spec_id,
:keeper_spec_id, :cron_spec_id, :vrf_spec_id, :webhook_spec_id, :blockhash_store_spec_id, :bootstrap_spec_id, :block_header_feeder_spec_id, :gateway_spec_id,
:legacy_gas_station_server_spec_id, :legacy_gas_station_sidecar_spec_id, :external_job_id, :gas_limit, :forwarding_allowed, NOW())
RETURNING *;`
Expand Down
14 changes: 7 additions & 7 deletions core/services/streams/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Delegate struct {
var _ job.Delegate = (*Delegate)(nil)

func NewDelegate(lggr logger.Logger, registry Registry, runner ocrcommon.Runner, cfg DelegateConfig) *Delegate {
return &Delegate{lggr, registry, runner, cfg}
return &Delegate{lggr.Named("StreamsDelegate"), registry, runner, cfg}
}

func (d *Delegate) JobType() job.Type {
Expand All @@ -44,11 +44,11 @@ func (d *Delegate) BeforeJobDeleted(jb job.Job) {}
func (d *Delegate) OnDeleteJob(jb job.Job, q pg.Queryer) error { return nil }

func (d *Delegate) ServicesForSpec(jb job.Job) (services []job.ServiceCtx, err error) {
if !jb.Name.Valid {
return nil, errors.New("job name is required to be present for stream specs")
if jb.StreamID == nil {
return nil, errors.New("streamID is required to be present for stream specs")
}
id := jb.Name.String
lggr := d.lggr.Named(id).With("streamID", id)
id := *jb.StreamID
lggr := d.lggr.With("streamID", id)

rrs := ocrcommon.NewResultRunSaver(d.runner, lggr, d.cfg.MaxSuccessfulRuns(), d.cfg.ResultWriteQueueDepth())
services = append(services, rrs, &StreamService{
Expand Down Expand Up @@ -102,8 +102,8 @@ func ValidatedStreamSpec(tomlString string) (job.Job, error) {
return jb, errors.Errorf("unsupported type: %q", jb.Type)
}

if !jb.Name.Valid {
return jb, errors.New("jobs of type 'stream' require a non-blank name as stream ID")
if jb.StreamID == nil {
return jb, errors.New("jobs of type 'stream' require streamID to be specified")
}

return jb, nil
Expand Down
35 changes: 28 additions & 7 deletions core/services/streams/delegate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v4"
)

type mockRegistry struct{}
Expand All @@ -35,11 +34,11 @@ func Test_Delegate(t *testing.T) {

t.Run("ServicesForSpec", func(t *testing.T) {
jb := job.Job{PipelineSpec: &pipeline.Spec{ID: 1}}
t.Run("errors if job is missing name", func(t *testing.T) {
t.Run("errors if job is missing streamID", func(t *testing.T) {
_, err := d.ServicesForSpec(jb)
assert.EqualError(t, err, "job name is required to be present for stream specs")
assert.EqualError(t, err, "streamID is required to be present for stream specs")
})
jb.Name = null.StringFrom("jobname")
jb.StreamID = ptr(uint64(42))
t.Run("returns services", func(t *testing.T) {
srvs, err := d.ServicesForSpec(jb)
require.NoError(t, err)
Expand All @@ -49,7 +48,7 @@ func Test_Delegate(t *testing.T) {

strmSrv := srvs[1].(*StreamService)
assert.Equal(t, registry, strmSrv.registry)
assert.Equal(t, StreamID("jobname"), strmSrv.id)
assert.Equal(t, StreamID(42), strmSrv.id)
assert.Equal(t, jb.PipelineSpec, strmSrv.spec)
assert.NotNil(t, strmSrv.lggr)
assert.Equal(t, srvs[0], strmSrv.rrs)
Expand All @@ -67,6 +66,7 @@ func Test_ValidatedStreamSpec(t *testing.T) {
name: "minimal stream spec",
toml: `
type = "stream"
streamID = 12345
name = "voter-turnout"
schemaVersion = 1
observationSource = """
Expand All @@ -82,6 +82,8 @@ answer1 [type=median index=0];
assert.Equal(t, job.Type("stream"), jb.Type)
assert.Equal(t, uint32(1), jb.SchemaVersion)
assert.True(t, jb.Name.Valid)
require.NotNil(t, jb.StreamID)
assert.Equal(t, uint64(12345), *jb.StreamID)
assert.Equal(t, "voter-turnout", jb.Name.String)
},
},
Expand Down Expand Up @@ -134,10 +136,11 @@ answer1 [type=median index=0];
},
},
{
name: "error if missing name",
name: "no error if missing name",
toml: `
type = "stream"
schemaVersion = 1
streamID = 12345
observationSource = """
ds1 [type=bridge name=voter_turnout];
ds1_parse [type=jsonparse path="one,two"];
Expand All @@ -147,7 +150,24 @@ answer1 [type=median index=0];
"""
`,
assertion: func(t *testing.T, jb job.Job, err error) {
assert.EqualError(t, err, "jobs of type 'stream' require a non-blank name as stream ID")
require.NoError(t, err)
},
},
{
name: "error if missing streamID",
toml: `
type = "stream"
schemaVersion = 1
observationSource = """
ds1 [type=bridge name=voter_turnout];
ds1_parse [type=jsonparse path="one,two"];
ds1_multiply [type=multiply times=1.23];
ds1 -> ds1_parse -> ds1_multiply -> answer1;
answer1 [type=median index=0];
"""
`,
assertion: func(t *testing.T, jb job.Job, err error) {
assert.EqualError(t, err, "jobs of type 'stream' require streamID to be specified")
},
},
}
Expand All @@ -159,3 +179,4 @@ answer1 [type=median index=0];
})
}
}
func ptr[T any](t T) *T { return &t }
4 changes: 2 additions & 2 deletions core/services/streams/stream_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
)

type StreamID = string
type StreamID = uint64

type Registry interface {
Get(streamID StreamID) (strm Stream, exists bool)
Expand Down Expand Up @@ -47,7 +47,7 @@ func (s *streamRegistry) Register(streamID StreamID, spec pipeline.Spec, rrs Res
s.Lock()
defer s.Unlock()
if _, exists := s.streams[streamID]; exists {
return fmt.Errorf("stream already registered for id: %q", streamID)
return fmt.Errorf("stream already registered for id: %d", streamID)
}
s.streams[streamID] = NewStream(s.lggr, streamID, spec, s.runner, rrs)
return nil
Expand Down
48 changes: 24 additions & 24 deletions core/services/streams/stream_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,23 @@ func Test_Registry(t *testing.T) {
t.Run("Get", func(t *testing.T) {
sr := newRegistry(lggr, runner)

sr.streams["foo"] = &mockStream{run: &pipeline.Run{ID: 1}}
sr.streams["bar"] = &mockStream{run: &pipeline.Run{ID: 2}}
sr.streams["baz"] = &mockStream{run: &pipeline.Run{ID: 3}}
sr.streams[1] = &mockStream{run: &pipeline.Run{ID: 1}}
sr.streams[2] = &mockStream{run: &pipeline.Run{ID: 2}}
sr.streams[3] = &mockStream{run: &pipeline.Run{ID: 3}}

v, exists := sr.Get("foo")
v, exists := sr.Get(1)
assert.True(t, exists)
assert.Equal(t, sr.streams["foo"], v)
assert.Equal(t, sr.streams[1], v)

v, exists = sr.Get("bar")
v, exists = sr.Get(2)
assert.True(t, exists)
assert.Equal(t, sr.streams["bar"], v)
assert.Equal(t, sr.streams[2], v)

v, exists = sr.Get("baz")
v, exists = sr.Get(3)
assert.True(t, exists)
assert.Equal(t, sr.streams["baz"], v)
assert.Equal(t, sr.streams[3], v)

v, exists = sr.Get("qux")
v, exists = sr.Get(4)
assert.Nil(t, v)
assert.False(t, exists)
})
Expand All @@ -53,54 +53,54 @@ func Test_Registry(t *testing.T) {

t.Run("registers new stream", func(t *testing.T) {
assert.Len(t, sr.streams, 0)
err := sr.Register("foo", pipeline.Spec{ID: 32, DotDagSource: "source"}, nil)
err := sr.Register(1, pipeline.Spec{ID: 32, DotDagSource: "source"}, nil)
require.NoError(t, err)
assert.Len(t, sr.streams, 1)

v, exists := sr.Get("foo")
v, exists := sr.Get(1)
require.True(t, exists)
strm := v.(*stream)
assert.Equal(t, StreamID("foo"), strm.id)
assert.Equal(t, StreamID(1), strm.id)
assert.Equal(t, int32(32), strm.spec.ID)
})

t.Run("errors when attempt to re-register a stream with an existing ID", func(t *testing.T) {
assert.Len(t, sr.streams, 1)
err := sr.Register("foo", pipeline.Spec{ID: 33, DotDagSource: "source"}, nil)
err := sr.Register(1, pipeline.Spec{ID: 33, DotDagSource: "source"}, nil)
require.Error(t, err)
assert.Len(t, sr.streams, 1)
assert.EqualError(t, err, "stream already registered for id: \"foo\"")
assert.EqualError(t, err, "stream already registered for id: 1")

v, exists := sr.Get("foo")
v, exists := sr.Get(1)
require.True(t, exists)
strm := v.(*stream)
assert.Equal(t, StreamID("foo"), strm.id)
assert.Equal(t, StreamID(1), strm.id)
assert.Equal(t, int32(32), strm.spec.ID)
})
})
t.Run("Unregister", func(t *testing.T) {
sr := newRegistry(lggr, runner)

sr.streams["foo"] = &mockStream{run: &pipeline.Run{ID: 1}}
sr.streams["bar"] = &mockStream{run: &pipeline.Run{ID: 2}}
sr.streams["baz"] = &mockStream{run: &pipeline.Run{ID: 3}}
sr.streams[1] = &mockStream{run: &pipeline.Run{ID: 1}}
sr.streams[2] = &mockStream{run: &pipeline.Run{ID: 2}}
sr.streams[3] = &mockStream{run: &pipeline.Run{ID: 3}}

t.Run("unregisters a stream", func(t *testing.T) {
assert.Len(t, sr.streams, 3)

sr.Unregister("foo")
sr.Unregister(1)

assert.Len(t, sr.streams, 2)
_, exists := sr.streams["foo"]
_, exists := sr.streams[1]
assert.False(t, exists)
})
t.Run("no effect when unregistering a non-existent stream", func(t *testing.T) {
assert.Len(t, sr.streams, 2)

sr.Unregister("foo")
sr.Unregister(1)

assert.Len(t, sr.streams, 2)
_, exists := sr.streams["foo"]
_, exists := sr.streams[1]
assert.False(t, exists)
})
})
Expand Down
2 changes: 1 addition & 1 deletion core/services/streams/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func Test_Stream(t *testing.T) {
lggr := logger.TestLogger(t)
runner := &mockRunner{}
spec := pipeline.Spec{}
id := StreamID("stream-id-foo")
id := StreamID(123)
ctx := testutils.Context(t)

t.Run("Run", func(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions core/store/migrate/migrations/0222_jobs_stream_id.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- +goose Up
ALTER TABLE jobs ADD COLUMN stream_id BIGINT;
CREATE UNIQUE INDEX idx_jobs_unique_stream_id ON jobs(stream_id) WHERE stream_id IS NOT NULL;

-- +goose Down
ALTER TABLE jobs DROP COLUMN stream_id;

6 changes: 4 additions & 2 deletions core/testdata/testspecs/v2_specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,8 @@ storeBlockhashesBatchSize = %d
}

type StreamSpecParams struct {
Name string
Name string
StreamID uint64
}

type StreamSpec struct {
Expand All @@ -848,6 +849,7 @@ func GenerateStreamSpec(params StreamSpecParams) StreamSpec {
type = "stream"
schemaVersion = 1
name = "%s"
streamID = %d
observationSource = """
ds [type=http method=GET url="https://chain.link/ETH-USD"];
ds_parse [type=jsonparse path="data,price"];
Expand All @@ -856,6 +858,6 @@ ds -> ds_parse -> ds_multiply;
"""
`

toml := fmt.Sprintf(template, params.Name)
toml := fmt.Sprintf(template, params.Name, params.StreamID)
return StreamSpec{StreamSpecParams: params, toml: toml}
}
3 changes: 2 additions & 1 deletion core/web/jobs_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func TestJobController_Create_HappyPath(t *testing.T) {
{
name: "stream",
tomlTemplate: func(_ string) string {
return testspecs.GenerateStreamSpec(testspecs.StreamSpecParams{Name: "ETH/USD"}).Toml()
return testspecs.GenerateStreamSpec(testspecs.StreamSpecParams{Name: "ETH/USD", StreamID: 32}).Toml()
},
assertion: func(t *testing.T, nameAndExternalJobID string, r *http.Response) {
require.Equal(t, http.StatusOK, r.StatusCode)
Expand All @@ -384,6 +384,7 @@ func TestJobController_Create_HappyPath(t *testing.T) {

assert.NotNil(t, resource.PipelineSpec.DotDAGSource)
assert.Equal(t, jb.Name.ValueOrZero(), resource.Name)
assert.Equal(t, jb.StreamID, resource.StreamID)
},
},
}
Expand Down
2 changes: 2 additions & 0 deletions core/web/presenters/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ func NewJobError(e job.SpecError) JobError {
type JobResource struct {
JAID
Name string `json:"name"`
StreamID *uint64 `json:"streamID,omitempty"`
Type JobSpecType `json:"type"`
SchemaVersion uint32 `json:"schemaVersion"`
GasLimit clnull.Uint32 `json:"gasLimit"`
Expand Down Expand Up @@ -479,6 +480,7 @@ func NewJobResource(j job.Job) *JobResource {
resource := &JobResource{
JAID: NewJAIDInt32(j.ID),
Name: j.Name.ValueOrZero(),
StreamID: j.StreamID,
Type: JobSpecType(j.Type),
SchemaVersion: j.SchemaVersion,
GasLimit: j.GasLimit,
Expand Down

0 comments on commit f4a17af

Please sign in to comment.