From e242dfb9ae44a36ef05bb7b09f75fce0f415e308 Mon Sep 17 00:00:00 2001 From: Sam Date: Mon, 30 Oct 2023 10:17:18 -0400 Subject: [PATCH] Every instance of mercury transmitter should not load reports for all feeds on startup (#10829) * Every instance of mercury transmitter should not load reports for all feeds on startup * Fix a few persistence manager bugs * Bump Migration version --- core/services/relay/evm/mercury/orm.go | 38 +++++---- core/services/relay/evm/mercury/orm_test.go | 48 +++++++----- .../relay/evm/mercury/persistence_manager.go | 4 +- .../evm/mercury/persistence_manager_test.go | 78 ++++++++++++++----- .../services/relay/evm/mercury/transmitter.go | 2 +- .../relay/evm/mercury/transmitter_test.go | 4 + ...d_feed_id_to_mercury_transmit_requests.sql | 14 ++++ 7 files changed, 130 insertions(+), 58 deletions(-) create mode 100644 core/store/migrate/migrations/0205_add_feed_id_to_mercury_transmit_requests.sql diff --git a/core/services/relay/evm/mercury/orm.go b/core/services/relay/evm/mercury/orm.go index dd7d7b33e74..7273519f6b6 100644 --- a/core/services/relay/evm/mercury/orm.go +++ b/core/services/relay/evm/mercury/orm.go @@ -23,8 +23,8 @@ import ( type ORM interface { InsertTransmitRequest(req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext, qopts ...pg.QOpt) error DeleteTransmitRequests(reqs []*pb.TransmitRequest, qopts ...pg.QOpt) error - GetTransmitRequests(qopts ...pg.QOpt) ([]*Transmission, error) - PruneTransmitRequests(maxSize int, qopts ...pg.QOpt) error + GetTransmitRequests(jobID int32, qopts ...pg.QOpt) ([]*Transmission, error) + PruneTransmitRequests(jobID int32, maxSize int, qopts ...pg.QOpt) error LatestReport(ctx context.Context, feedID [32]byte, qopts ...pg.QOpt) (report []byte, err error) } @@ -49,6 +49,11 @@ func NewORM(db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) ORM { // InsertTransmitRequest inserts one transmit request if the payload does not exist already. func (o *orm) InsertTransmitRequest(req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext, qopts ...pg.QOpt) error { + feedID, err := FeedIDFromReport(req.Payload) + if err != nil { + return err + } + q := o.q.WithOpts(qopts...) var wg sync.WaitGroup wg.Add(2) @@ -57,16 +62,12 @@ func (o *orm) InsertTransmitRequest(req *pb.TransmitRequest, jobID int32, report go func() { defer wg.Done() err1 = q.ExecQ(` - INSERT INTO mercury_transmit_requests (payload, payload_hash, config_digest, epoch, round, extra_hash, job_id) - VALUES ($1, $2, $3, $4, $5, $6, $7) + INSERT INTO mercury_transmit_requests (payload, payload_hash, config_digest, epoch, round, extra_hash, job_id, feed_id) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (payload_hash) DO NOTHING - `, req.Payload, hashPayload(req.Payload), reportCtx.ConfigDigest[:], reportCtx.Epoch, reportCtx.Round, reportCtx.ExtraHash[:], jobID) + `, req.Payload, hashPayload(req.Payload), reportCtx.ConfigDigest[:], reportCtx.Epoch, reportCtx.Round, reportCtx.ExtraHash[:], jobID, feedID[:]) }() - feedID, err := FeedIDFromReport(req.Payload) - if err != nil { - return err - } go func() { defer wg.Done() err2 = q.ExecQ(` @@ -101,15 +102,16 @@ func (o *orm) DeleteTransmitRequests(reqs []*pb.TransmitRequest, qopts ...pg.QOp } // GetTransmitRequests returns all transmit requests in chronologically descending order. -func (o *orm) GetTransmitRequests(qopts ...pg.QOpt) ([]*Transmission, error) { +func (o *orm) GetTransmitRequests(jobID int32, qopts ...pg.QOpt) ([]*Transmission, error) { q := o.q.WithOpts(qopts...) // The priority queue uses epoch and round to sort transmissions so order by // the same fields here for optimal insertion into the pq. rows, err := q.QueryContext(q.ParentCtx, ` SELECT payload, config_digest, epoch, round, extra_hash FROM mercury_transmit_requests + WHERE job_id = $1 ORDER BY epoch DESC, round DESC - `) + `, jobID) if err != nil { return nil, err } @@ -142,20 +144,22 @@ func (o *orm) GetTransmitRequests(qopts ...pg.QOpt) ([]*Transmission, error) { return transmissions, nil } -// PruneTransmitRequests keeps at most maxSize rows in the table, deleting the -// oldest transactions. -func (o *orm) PruneTransmitRequests(maxSize int, qopts ...pg.QOpt) error { +// PruneTransmitRequests keeps at most maxSize rows for the given job ID, +// deleting the oldest transactions. +func (o *orm) PruneTransmitRequests(jobID int32, maxSize int, qopts ...pg.QOpt) error { q := o.q.WithOpts(qopts...) // Prune the oldest requests by epoch and round. return q.ExecQ(` DELETE FROM mercury_transmit_requests - WHERE payload_hash NOT IN ( + WHERE job_id = $1 AND + payload_hash NOT IN ( SELECT payload_hash FROM mercury_transmit_requests + WHERE job_id = $1 ORDER BY epoch DESC, round DESC - LIMIT $1 + LIMIT $2 ) - `, maxSize) + `, jobID, maxSize) } func (o *orm) LatestReport(ctx context.Context, feedID [32]byte, qopts ...pg.QOpt) (report []byte, err error) { diff --git a/core/services/relay/evm/mercury/orm_test.go b/core/services/relay/evm/mercury/orm_test.go index a6a72327677..56dea70417b 100644 --- a/core/services/relay/evm/mercury/orm_test.go +++ b/core/services/relay/evm/mercury/orm_test.go @@ -3,6 +3,7 @@ package mercury import ( "testing" + "github.com/cometbft/cometbft/libs/rand" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -16,7 +17,7 @@ import ( func TestORM(t *testing.T) { db := pgtest.NewSqlxDB(t) - var jobID int32 // foreign key constraints disabled so can leave as 0 + jobID := rand.Int32() // foreign key constraints disabled so value doesn't matter pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`) pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`) lggr := logger.TestLogger(t) @@ -48,7 +49,7 @@ func TestORM(t *testing.T) { err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[2]}, jobID, reportContexts[2]) require.NoError(t, err) - transmissions, err := orm.GetTransmitRequests() + transmissions, err := orm.GetTransmitRequests(jobID) require.NoError(t, err) require.Equal(t, transmissions, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[2]}, ReportCtx: reportContexts[2]}, @@ -65,7 +66,7 @@ func TestORM(t *testing.T) { err = orm.DeleteTransmitRequests([]*pb.TransmitRequest{{Payload: reports[1]}}) require.NoError(t, err) - transmissions, err = orm.GetTransmitRequests() + transmissions, err = orm.GetTransmitRequests(jobID) require.NoError(t, err) require.Equal(t, transmissions, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[2]}, ReportCtx: reportContexts[2]}, @@ -80,7 +81,7 @@ func TestORM(t *testing.T) { err = orm.DeleteTransmitRequests([]*pb.TransmitRequest{{Payload: []byte("does-not-exist")}}) require.NoError(t, err) - transmissions, err = orm.GetTransmitRequests() + transmissions, err = orm.GetTransmitRequests(jobID) require.NoError(t, err) require.Equal(t, transmissions, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[2]}, ReportCtx: reportContexts[2]}, @@ -98,7 +99,7 @@ func TestORM(t *testing.T) { require.NoError(t, err) assert.Equal(t, reports[2], l) - transmissions, err = orm.GetTransmitRequests() + transmissions, err = orm.GetTransmitRequests(jobID) require.NoError(t, err) require.Empty(t, transmissions) @@ -106,7 +107,7 @@ func TestORM(t *testing.T) { err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3]) require.NoError(t, err) - transmissions, err = orm.GetTransmitRequests() + transmissions, err = orm.GetTransmitRequests(jobID) require.NoError(t, err) require.Equal(t, transmissions, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[3]}, ReportCtx: reportContexts[3]}, @@ -118,7 +119,7 @@ func TestORM(t *testing.T) { err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3]) require.NoError(t, err) - transmissions, err = orm.GetTransmitRequests() + transmissions, err = orm.GetTransmitRequests(jobID) require.NoError(t, err) require.Equal(t, transmissions, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[3]}, ReportCtx: reportContexts[3]}, @@ -131,7 +132,7 @@ func TestORM(t *testing.T) { func TestORM_PruneTransmitRequests(t *testing.T) { db := pgtest.NewSqlxDB(t) - var jobID int32 // foreign key constraints disabled so can leave as 0 + jobID := rand.Int32() // foreign key constraints disabled so value doesn't matter pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`) pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`) @@ -157,10 +158,10 @@ func TestORM_PruneTransmitRequests(t *testing.T) { require.NoError(t, err) // Max size greater than table size, expect no-op - err = orm.PruneTransmitRequests(5) + err = orm.PruneTransmitRequests(jobID, 5) require.NoError(t, err) - transmissions, err := orm.GetTransmitRequests() + transmissions, err := orm.GetTransmitRequests(jobID) require.NoError(t, err) require.Equal(t, transmissions, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[1]}, ReportCtx: makeReportContext(1, 2)}, @@ -168,37 +169,48 @@ func TestORM_PruneTransmitRequests(t *testing.T) { }) // Max size equal to table size, expect no-op - err = orm.PruneTransmitRequests(2) + err = orm.PruneTransmitRequests(jobID, 2) require.NoError(t, err) - transmissions, err = orm.GetTransmitRequests() + transmissions, err = orm.GetTransmitRequests(jobID) require.NoError(t, err) require.Equal(t, transmissions, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[1]}, ReportCtx: makeReportContext(1, 2)}, {Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: makeReportContext(1, 1)}, }) + // Max size is table size + 1, but jobID differs, expect no-op + err = orm.PruneTransmitRequests(-1, 2) + require.NoError(t, err) + + transmissions, err = orm.GetTransmitRequests(jobID) + require.NoError(t, err) + require.Equal(t, []*Transmission{ + {Req: &pb.TransmitRequest{Payload: reports[1]}, ReportCtx: makeReportContext(1, 2)}, + {Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: makeReportContext(1, 1)}, + }, transmissions) + err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(2, 1)) require.NoError(t, err) err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[3]}, jobID, makeReportContext(2, 2)) require.NoError(t, err) - // Max size is table size + 1, expect the oldest row to be pruned. - err = orm.PruneTransmitRequests(3) + // Max size is table size - 1, expect the oldest row to be pruned. + err = orm.PruneTransmitRequests(jobID, 3) require.NoError(t, err) - transmissions, err = orm.GetTransmitRequests() + transmissions, err = orm.GetTransmitRequests(jobID) require.NoError(t, err) - require.Equal(t, transmissions, []*Transmission{ + require.Equal(t, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[3]}, ReportCtx: makeReportContext(2, 2)}, {Req: &pb.TransmitRequest{Payload: reports[2]}, ReportCtx: makeReportContext(2, 1)}, {Req: &pb.TransmitRequest{Payload: reports[1]}, ReportCtx: makeReportContext(1, 2)}, - }) + }, transmissions) } func TestORM_InsertTransmitRequest_LatestReport(t *testing.T) { db := pgtest.NewSqlxDB(t) - var jobID int32 // foreign key constraints disabled so can leave as 0 + jobID := rand.Int32() // foreign key constraints disabled so value doesn't matter pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`) pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`) diff --git a/core/services/relay/evm/mercury/persistence_manager.go b/core/services/relay/evm/mercury/persistence_manager.go index 9e8df72a155..1c8dad45301 100644 --- a/core/services/relay/evm/mercury/persistence_manager.go +++ b/core/services/relay/evm/mercury/persistence_manager.go @@ -78,7 +78,7 @@ func (pm *PersistenceManager) AsyncDelete(req *pb.TransmitRequest) { } func (pm *PersistenceManager) Load(ctx context.Context) ([]*Transmission, error) { - return pm.orm.GetTransmitRequests(pg.WithParentCtx(ctx)) + return pm.orm.GetTransmitRequests(pm.jobID, pg.WithParentCtx(ctx)) } func (pm *PersistenceManager) runFlushDeletesLoop() { @@ -118,7 +118,7 @@ func (pm *PersistenceManager) runPruneLoop() { ticker.Stop() return case <-ticker.C: - if err := pm.orm.PruneTransmitRequests(pm.maxTransmitQueueSize, pg.WithParentCtx(ctx), pg.WithLongQueryTimeout()); err != nil { + if err := pm.orm.PruneTransmitRequests(pm.jobID, pm.maxTransmitQueueSize, pg.WithParentCtx(ctx), pg.WithLongQueryTimeout()); err != nil { pm.lggr.Errorw("Failed to prune transmit requests table", "err", err) } else { pm.lggr.Debugw("Pruned transmit requests table") diff --git a/core/services/relay/evm/mercury/persistence_manager_test.go b/core/services/relay/evm/mercury/persistence_manager_test.go index 97628ed9c2b..d185a64a8f1 100644 --- a/core/services/relay/evm/mercury/persistence_manager_test.go +++ b/core/services/relay/evm/mercury/persistence_manager_test.go @@ -5,7 +5,10 @@ import ( "testing" "time" + "github.com/cometbft/cometbft/libs/rand" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "github.com/smartcontractkit/sqlx" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" @@ -16,19 +19,22 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb" ) -func bootstrapPersistenceManager(t *testing.T) (*PersistenceManager, *observer.ObservedLogs) { +func bootstrapPersistenceManager(t *testing.T, jobID int32, db *sqlx.DB) (*PersistenceManager, *observer.ObservedLogs) { t.Helper() - db := pgtest.NewSqlxDB(t) - pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`) - pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`) lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.DebugLevel) orm := NewORM(db, lggr, pgtest.NewQConfig(true)) - return NewPersistenceManager(lggr, orm, 0, 2, 5*time.Millisecond, 5*time.Millisecond), observedLogs + return NewPersistenceManager(lggr, orm, jobID, 2, 5*time.Millisecond, 5*time.Millisecond), observedLogs } func TestPersistenceManager(t *testing.T) { + jobID1 := rand.Int32() + jobID2 := jobID1 + 1 + ctx := context.Background() - pm, _ := bootstrapPersistenceManager(t) + db := pgtest.NewSqlxDB(t) + pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`) + pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`) + pm, _ := bootstrapPersistenceManager(t, jobID1, db) reports := sampleReports @@ -52,11 +58,23 @@ func TestPersistenceManager(t *testing.T) { require.Equal(t, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[1]}}, }, transmissions) + + t.Run("scopes load to only transmissions with matching job ID", func(t *testing.T) { + pm2, _ := bootstrapPersistenceManager(t, jobID2, db) + transmissions, err = pm2.Load(ctx) + require.NoError(t, err) + + assert.Len(t, transmissions, 0) + }) } func TestPersistenceManagerAsyncDelete(t *testing.T) { ctx := context.Background() - pm, observedLogs := bootstrapPersistenceManager(t) + jobID := rand.Int32() + db := pgtest.NewSqlxDB(t) + pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`) + pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`) + pm, observedLogs := bootstrapPersistenceManager(t, jobID, db) reports := sampleReports @@ -96,16 +114,32 @@ func TestPersistenceManagerAsyncDelete(t *testing.T) { } func TestPersistenceManagerPrune(t *testing.T) { + jobID1 := rand.Int32() + jobID2 := jobID1 + 1 + db := pgtest.NewSqlxDB(t) + pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`) + pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`) + ctx := context.Background() - pm, observedLogs := bootstrapPersistenceManager(t) - reports := sampleReports + reports := make([][]byte, 25) + for i := 0; i < 25; i++ { + reports[i] = buildSampleV1Report(int64(i)) + } - err := pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[0]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 1}}) + pm2, _ := bootstrapPersistenceManager(t, jobID2, db) + for i := 0; i < 20; i++ { + err := pm2.Insert(ctx, &pb.TransmitRequest{Payload: reports[i]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: uint32(i)}}) + require.NoError(t, err) + } + + pm, observedLogs := bootstrapPersistenceManager(t, jobID1, db) + + err := pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[21]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 21}}) require.NoError(t, err) - err = pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[1]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 2}}) + err = pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[22]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 22}}) require.NoError(t, err) - err = pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[2]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 3}}) + err = pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[23]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 23}}) require.NoError(t, err) err = pm.Start(ctx) @@ -118,24 +152,28 @@ func TestPersistenceManagerPrune(t *testing.T) { transmissions, err := pm.Load(ctx) require.NoError(t, err) require.Equal(t, []*Transmission{ - {Req: &pb.TransmitRequest{Payload: reports[2]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 3}}}, - {Req: &pb.TransmitRequest{Payload: reports[1]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 2}}}, + {Req: &pb.TransmitRequest{Payload: reports[23]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 23}}}, + {Req: &pb.TransmitRequest{Payload: reports[22]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 22}}}, }, transmissions) // Test pruning stops after Close. err = pm.Close() require.NoError(t, err) - err = pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[3]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 4}}) + err = pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[24]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 24}}) require.NoError(t, err) - time.Sleep(15 * time.Millisecond) - transmissions, err = pm.Load(ctx) require.NoError(t, err) require.Equal(t, []*Transmission{ - {Req: &pb.TransmitRequest{Payload: reports[3]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 4}}}, - {Req: &pb.TransmitRequest{Payload: reports[2]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 3}}}, - {Req: &pb.TransmitRequest{Payload: reports[1]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 2}}}, + {Req: &pb.TransmitRequest{Payload: reports[24]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 24}}}, + {Req: &pb.TransmitRequest{Payload: reports[23]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 23}}}, + {Req: &pb.TransmitRequest{Payload: reports[22]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 22}}}, }, transmissions) + + t.Run("prune was scoped to job ID", func(t *testing.T) { + transmissions, err = pm2.Load(ctx) + require.NoError(t, err) + assert.Len(t, transmissions, 20) + }) } diff --git a/core/services/relay/evm/mercury/transmitter.go b/core/services/relay/evm/mercury/transmitter.go index 0c701e3b4b3..0c2721442b4 100644 --- a/core/services/relay/evm/mercury/transmitter.go +++ b/core/services/relay/evm/mercury/transmitter.go @@ -137,7 +137,7 @@ func NewTransmitter(lggr logger.Logger, cfgTracker ConfigTracker, rpcClient wsrp jobID, fmt.Sprintf("%x", fromAccount), make(chan (struct{})), - NewTransmitQueue(lggr, feedIDHex, maxTransmitQueueSize, nil, persistenceManager), + nil, sync.WaitGroup{}, transmitSuccessCount.WithLabelValues(feedIDHex), transmitDuplicateCount.WithLabelValues(feedIDHex), diff --git a/core/services/relay/evm/mercury/transmitter_test.go b/core/services/relay/evm/mercury/transmitter_test.go index 6723ffcbcac..c8a68d41a16 100644 --- a/core/services/relay/evm/mercury/transmitter_test.go +++ b/core/services/relay/evm/mercury/transmitter_test.go @@ -26,6 +26,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { var jobID int32 pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`) pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`) + q := NewTransmitQueue(lggr, "", 0, nil, nil) t.Run("v1 report transmission successfully enqueued", func(t *testing.T) { report := sampleV1Report @@ -40,6 +41,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { }, } mt := NewTransmitter(lggr, nil, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), nil) + mt.queue = q err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) require.NoError(t, err) @@ -57,6 +59,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { }, } mt := NewTransmitter(lggr, nil, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), nil) + mt.queue = q err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) require.NoError(t, err) @@ -74,6 +77,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { }, } mt := NewTransmitter(lggr, nil, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), nil) + mt.queue = q err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) require.NoError(t, err) diff --git a/core/store/migrate/migrations/0205_add_feed_id_to_mercury_transmit_requests.sql b/core/store/migrate/migrations/0205_add_feed_id_to_mercury_transmit_requests.sql new file mode 100644 index 00000000000..04cf5a2571d --- /dev/null +++ b/core/store/migrate/migrations/0205_add_feed_id_to_mercury_transmit_requests.sql @@ -0,0 +1,14 @@ +-- +goose Up +ALTER TABLE mercury_transmit_requests ADD COLUMN feed_id BYTEA CHECK (feed_id IS NULL OR octet_length(feed_id) = 32); +DROP INDEX idx_mercury_transmission_requests_epoch_round; +CREATE INDEX idx_mercury_transmission_requests_job_id_epoch_round ON mercury_transmit_requests (job_id, epoch DESC, round DESC); +CREATE INDEX idx_mercury_transmit_requests_job_id ON mercury_transmit_requests (job_id); +CREATE INDEX idx_mercury_transmit_requests_feed_id ON mercury_transmit_requests (feed_id); +CREATE INDEX idx_mercury_feed_latest_reports_job_id ON feed_latest_reports (job_id); + +-- +goose Down +ALTER TABLE mercury_transmit_requests DROP COLUMN feed_id; +DROP INDEX idx_mercury_transmit_requests_job_id; +DROP INDEX idx_mercury_feed_latest_reports_job_id; +CREATE INDEX idx_mercury_transmission_requests_epoch_round ON mercury_transmit_requests (epoch DESC, round DESC); +DROP INDEX idx_mercury_transmission_requests_job_id_epoch_round;