Skip to content

Commit

Permalink
Every instance of mercury transmitter should not load reports for all…
Browse files Browse the repository at this point in the history
… feeds on startup (#10829)

* Every instance of mercury transmitter should not load reports for all feeds on startup

* Fix a few persistence manager bugs

* Bump Migration version
  • Loading branch information
samsondav authored Oct 30, 2023
1 parent db64df9 commit e242dfb
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 58 deletions.
38 changes: 21 additions & 17 deletions core/services/relay/evm/mercury/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
type ORM interface {
InsertTransmitRequest(req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext, qopts ...pg.QOpt) error
DeleteTransmitRequests(reqs []*pb.TransmitRequest, qopts ...pg.QOpt) error
GetTransmitRequests(qopts ...pg.QOpt) ([]*Transmission, error)
PruneTransmitRequests(maxSize int, qopts ...pg.QOpt) error
GetTransmitRequests(jobID int32, qopts ...pg.QOpt) ([]*Transmission, error)
PruneTransmitRequests(jobID int32, maxSize int, qopts ...pg.QOpt) error
LatestReport(ctx context.Context, feedID [32]byte, qopts ...pg.QOpt) (report []byte, err error)
}

Expand All @@ -49,6 +49,11 @@ func NewORM(db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) ORM {

// InsertTransmitRequest inserts one transmit request if the payload does not exist already.
func (o *orm) InsertTransmitRequest(req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext, qopts ...pg.QOpt) error {
feedID, err := FeedIDFromReport(req.Payload)
if err != nil {
return err
}

q := o.q.WithOpts(qopts...)
var wg sync.WaitGroup
wg.Add(2)
Expand All @@ -57,16 +62,12 @@ func (o *orm) InsertTransmitRequest(req *pb.TransmitRequest, jobID int32, report
go func() {
defer wg.Done()
err1 = q.ExecQ(`
INSERT INTO mercury_transmit_requests (payload, payload_hash, config_digest, epoch, round, extra_hash, job_id)
VALUES ($1, $2, $3, $4, $5, $6, $7)
INSERT INTO mercury_transmit_requests (payload, payload_hash, config_digest, epoch, round, extra_hash, job_id, feed_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (payload_hash) DO NOTHING
`, req.Payload, hashPayload(req.Payload), reportCtx.ConfigDigest[:], reportCtx.Epoch, reportCtx.Round, reportCtx.ExtraHash[:], jobID)
`, req.Payload, hashPayload(req.Payload), reportCtx.ConfigDigest[:], reportCtx.Epoch, reportCtx.Round, reportCtx.ExtraHash[:], jobID, feedID[:])
}()

feedID, err := FeedIDFromReport(req.Payload)
if err != nil {
return err
}
go func() {
defer wg.Done()
err2 = q.ExecQ(`
Expand Down Expand Up @@ -101,15 +102,16 @@ func (o *orm) DeleteTransmitRequests(reqs []*pb.TransmitRequest, qopts ...pg.QOp
}

// GetTransmitRequests returns all transmit requests in chronologically descending order.
func (o *orm) GetTransmitRequests(qopts ...pg.QOpt) ([]*Transmission, error) {
func (o *orm) GetTransmitRequests(jobID int32, qopts ...pg.QOpt) ([]*Transmission, error) {
q := o.q.WithOpts(qopts...)
// The priority queue uses epoch and round to sort transmissions so order by
// the same fields here for optimal insertion into the pq.
rows, err := q.QueryContext(q.ParentCtx, `
SELECT payload, config_digest, epoch, round, extra_hash
FROM mercury_transmit_requests
WHERE job_id = $1
ORDER BY epoch DESC, round DESC
`)
`, jobID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -142,20 +144,22 @@ func (o *orm) GetTransmitRequests(qopts ...pg.QOpt) ([]*Transmission, error) {
return transmissions, nil
}

// PruneTransmitRequests keeps at most maxSize rows in the table, deleting the
// oldest transactions.
func (o *orm) PruneTransmitRequests(maxSize int, qopts ...pg.QOpt) error {
// PruneTransmitRequests keeps at most maxSize rows for the given job ID,
// deleting the oldest transactions.
func (o *orm) PruneTransmitRequests(jobID int32, maxSize int, qopts ...pg.QOpt) error {
q := o.q.WithOpts(qopts...)
// Prune the oldest requests by epoch and round.
return q.ExecQ(`
DELETE FROM mercury_transmit_requests
WHERE payload_hash NOT IN (
WHERE job_id = $1 AND
payload_hash NOT IN (
SELECT payload_hash
FROM mercury_transmit_requests
WHERE job_id = $1
ORDER BY epoch DESC, round DESC
LIMIT $1
LIMIT $2
)
`, maxSize)
`, jobID, maxSize)
}

func (o *orm) LatestReport(ctx context.Context, feedID [32]byte, qopts ...pg.QOpt) (report []byte, err error) {
Expand Down
48 changes: 30 additions & 18 deletions core/services/relay/evm/mercury/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mercury
import (
"testing"

"github.com/cometbft/cometbft/libs/rand"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -16,7 +17,7 @@ import (
func TestORM(t *testing.T) {
db := pgtest.NewSqlxDB(t)

var jobID int32 // foreign key constraints disabled so can leave as 0
jobID := rand.Int32() // foreign key constraints disabled so value doesn't matter
pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`)
pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`)
lggr := logger.TestLogger(t)
Expand Down Expand Up @@ -48,7 +49,7 @@ func TestORM(t *testing.T) {
err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[2]}, jobID, reportContexts[2])
require.NoError(t, err)

transmissions, err := orm.GetTransmitRequests()
transmissions, err := orm.GetTransmitRequests(jobID)
require.NoError(t, err)
require.Equal(t, transmissions, []*Transmission{
{Req: &pb.TransmitRequest{Payload: reports[2]}, ReportCtx: reportContexts[2]},
Expand All @@ -65,7 +66,7 @@ func TestORM(t *testing.T) {
err = orm.DeleteTransmitRequests([]*pb.TransmitRequest{{Payload: reports[1]}})
require.NoError(t, err)

transmissions, err = orm.GetTransmitRequests()
transmissions, err = orm.GetTransmitRequests(jobID)
require.NoError(t, err)
require.Equal(t, transmissions, []*Transmission{
{Req: &pb.TransmitRequest{Payload: reports[2]}, ReportCtx: reportContexts[2]},
Expand All @@ -80,7 +81,7 @@ func TestORM(t *testing.T) {
err = orm.DeleteTransmitRequests([]*pb.TransmitRequest{{Payload: []byte("does-not-exist")}})
require.NoError(t, err)

transmissions, err = orm.GetTransmitRequests()
transmissions, err = orm.GetTransmitRequests(jobID)
require.NoError(t, err)
require.Equal(t, transmissions, []*Transmission{
{Req: &pb.TransmitRequest{Payload: reports[2]}, ReportCtx: reportContexts[2]},
Expand All @@ -98,15 +99,15 @@ func TestORM(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, reports[2], l)

transmissions, err = orm.GetTransmitRequests()
transmissions, err = orm.GetTransmitRequests(jobID)
require.NoError(t, err)
require.Empty(t, transmissions)

// More inserts.
err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3])
require.NoError(t, err)

transmissions, err = orm.GetTransmitRequests()
transmissions, err = orm.GetTransmitRequests(jobID)
require.NoError(t, err)
require.Equal(t, transmissions, []*Transmission{
{Req: &pb.TransmitRequest{Payload: reports[3]}, ReportCtx: reportContexts[3]},
Expand All @@ -118,7 +119,7 @@ func TestORM(t *testing.T) {
err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3])
require.NoError(t, err)

transmissions, err = orm.GetTransmitRequests()
transmissions, err = orm.GetTransmitRequests(jobID)
require.NoError(t, err)
require.Equal(t, transmissions, []*Transmission{
{Req: &pb.TransmitRequest{Payload: reports[3]}, ReportCtx: reportContexts[3]},
Expand All @@ -131,7 +132,7 @@ func TestORM(t *testing.T) {

func TestORM_PruneTransmitRequests(t *testing.T) {
db := pgtest.NewSqlxDB(t)
var jobID int32 // foreign key constraints disabled so can leave as 0
jobID := rand.Int32() // foreign key constraints disabled so value doesn't matter
pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`)
pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`)

Expand All @@ -157,48 +158,59 @@ func TestORM_PruneTransmitRequests(t *testing.T) {
require.NoError(t, err)

// Max size greater than table size, expect no-op
err = orm.PruneTransmitRequests(5)
err = orm.PruneTransmitRequests(jobID, 5)
require.NoError(t, err)

transmissions, err := orm.GetTransmitRequests()
transmissions, err := orm.GetTransmitRequests(jobID)
require.NoError(t, err)
require.Equal(t, transmissions, []*Transmission{
{Req: &pb.TransmitRequest{Payload: reports[1]}, ReportCtx: makeReportContext(1, 2)},
{Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: makeReportContext(1, 1)},
})

// Max size equal to table size, expect no-op
err = orm.PruneTransmitRequests(2)
err = orm.PruneTransmitRequests(jobID, 2)
require.NoError(t, err)

transmissions, err = orm.GetTransmitRequests()
transmissions, err = orm.GetTransmitRequests(jobID)
require.NoError(t, err)
require.Equal(t, transmissions, []*Transmission{
{Req: &pb.TransmitRequest{Payload: reports[1]}, ReportCtx: makeReportContext(1, 2)},
{Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: makeReportContext(1, 1)},
})

// Max size is table size + 1, but jobID differs, expect no-op
err = orm.PruneTransmitRequests(-1, 2)
require.NoError(t, err)

transmissions, err = orm.GetTransmitRequests(jobID)
require.NoError(t, err)
require.Equal(t, []*Transmission{
{Req: &pb.TransmitRequest{Payload: reports[1]}, ReportCtx: makeReportContext(1, 2)},
{Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: makeReportContext(1, 1)},
}, transmissions)

err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(2, 1))
require.NoError(t, err)
err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[3]}, jobID, makeReportContext(2, 2))
require.NoError(t, err)

// Max size is table size + 1, expect the oldest row to be pruned.
err = orm.PruneTransmitRequests(3)
// Max size is table size - 1, expect the oldest row to be pruned.
err = orm.PruneTransmitRequests(jobID, 3)
require.NoError(t, err)

transmissions, err = orm.GetTransmitRequests()
transmissions, err = orm.GetTransmitRequests(jobID)
require.NoError(t, err)
require.Equal(t, transmissions, []*Transmission{
require.Equal(t, []*Transmission{
{Req: &pb.TransmitRequest{Payload: reports[3]}, ReportCtx: makeReportContext(2, 2)},
{Req: &pb.TransmitRequest{Payload: reports[2]}, ReportCtx: makeReportContext(2, 1)},
{Req: &pb.TransmitRequest{Payload: reports[1]}, ReportCtx: makeReportContext(1, 2)},
})
}, transmissions)
}

func TestORM_InsertTransmitRequest_LatestReport(t *testing.T) {
db := pgtest.NewSqlxDB(t)
var jobID int32 // foreign key constraints disabled so can leave as 0
jobID := rand.Int32() // foreign key constraints disabled so value doesn't matter
pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`)
pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`)

Expand Down
4 changes: 2 additions & 2 deletions core/services/relay/evm/mercury/persistence_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (pm *PersistenceManager) AsyncDelete(req *pb.TransmitRequest) {
}

func (pm *PersistenceManager) Load(ctx context.Context) ([]*Transmission, error) {
return pm.orm.GetTransmitRequests(pg.WithParentCtx(ctx))
return pm.orm.GetTransmitRequests(pm.jobID, pg.WithParentCtx(ctx))
}

func (pm *PersistenceManager) runFlushDeletesLoop() {
Expand Down Expand Up @@ -118,7 +118,7 @@ func (pm *PersistenceManager) runPruneLoop() {
ticker.Stop()
return
case <-ticker.C:
if err := pm.orm.PruneTransmitRequests(pm.maxTransmitQueueSize, pg.WithParentCtx(ctx), pg.WithLongQueryTimeout()); err != nil {
if err := pm.orm.PruneTransmitRequests(pm.jobID, pm.maxTransmitQueueSize, pg.WithParentCtx(ctx), pg.WithLongQueryTimeout()); err != nil {
pm.lggr.Errorw("Failed to prune transmit requests table", "err", err)
} else {
pm.lggr.Debugw("Pruned transmit requests table")
Expand Down
78 changes: 58 additions & 20 deletions core/services/relay/evm/mercury/persistence_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import (
"testing"
"time"

"github.com/cometbft/cometbft/libs/rand"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"github.com/smartcontractkit/sqlx"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
Expand All @@ -16,19 +19,22 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb"
)

func bootstrapPersistenceManager(t *testing.T) (*PersistenceManager, *observer.ObservedLogs) {
func bootstrapPersistenceManager(t *testing.T, jobID int32, db *sqlx.DB) (*PersistenceManager, *observer.ObservedLogs) {
t.Helper()
db := pgtest.NewSqlxDB(t)
pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`)
pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`)
lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.DebugLevel)
orm := NewORM(db, lggr, pgtest.NewQConfig(true))
return NewPersistenceManager(lggr, orm, 0, 2, 5*time.Millisecond, 5*time.Millisecond), observedLogs
return NewPersistenceManager(lggr, orm, jobID, 2, 5*time.Millisecond, 5*time.Millisecond), observedLogs
}

func TestPersistenceManager(t *testing.T) {
jobID1 := rand.Int32()
jobID2 := jobID1 + 1

ctx := context.Background()
pm, _ := bootstrapPersistenceManager(t)
db := pgtest.NewSqlxDB(t)
pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`)
pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`)
pm, _ := bootstrapPersistenceManager(t, jobID1, db)

reports := sampleReports

Expand All @@ -52,11 +58,23 @@ func TestPersistenceManager(t *testing.T) {
require.Equal(t, []*Transmission{
{Req: &pb.TransmitRequest{Payload: reports[1]}},
}, transmissions)

t.Run("scopes load to only transmissions with matching job ID", func(t *testing.T) {
pm2, _ := bootstrapPersistenceManager(t, jobID2, db)
transmissions, err = pm2.Load(ctx)
require.NoError(t, err)

assert.Len(t, transmissions, 0)
})
}

func TestPersistenceManagerAsyncDelete(t *testing.T) {
ctx := context.Background()
pm, observedLogs := bootstrapPersistenceManager(t)
jobID := rand.Int32()
db := pgtest.NewSqlxDB(t)
pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`)
pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`)
pm, observedLogs := bootstrapPersistenceManager(t, jobID, db)

reports := sampleReports

Expand Down Expand Up @@ -96,16 +114,32 @@ func TestPersistenceManagerAsyncDelete(t *testing.T) {
}

func TestPersistenceManagerPrune(t *testing.T) {
jobID1 := rand.Int32()
jobID2 := jobID1 + 1
db := pgtest.NewSqlxDB(t)
pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`)
pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`)

ctx := context.Background()
pm, observedLogs := bootstrapPersistenceManager(t)

reports := sampleReports
reports := make([][]byte, 25)
for i := 0; i < 25; i++ {
reports[i] = buildSampleV1Report(int64(i))
}

err := pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[0]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 1}})
pm2, _ := bootstrapPersistenceManager(t, jobID2, db)
for i := 0; i < 20; i++ {
err := pm2.Insert(ctx, &pb.TransmitRequest{Payload: reports[i]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: uint32(i)}})
require.NoError(t, err)
}

pm, observedLogs := bootstrapPersistenceManager(t, jobID1, db)

err := pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[21]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 21}})
require.NoError(t, err)
err = pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[1]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 2}})
err = pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[22]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 22}})
require.NoError(t, err)
err = pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[2]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 3}})
err = pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[23]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 23}})
require.NoError(t, err)

err = pm.Start(ctx)
Expand All @@ -118,24 +152,28 @@ func TestPersistenceManagerPrune(t *testing.T) {
transmissions, err := pm.Load(ctx)
require.NoError(t, err)
require.Equal(t, []*Transmission{
{Req: &pb.TransmitRequest{Payload: reports[2]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 3}}},
{Req: &pb.TransmitRequest{Payload: reports[1]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 2}}},
{Req: &pb.TransmitRequest{Payload: reports[23]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 23}}},
{Req: &pb.TransmitRequest{Payload: reports[22]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 22}}},
}, transmissions)

// Test pruning stops after Close.
err = pm.Close()
require.NoError(t, err)

err = pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[3]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 4}})
err = pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[24]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 24}})
require.NoError(t, err)

time.Sleep(15 * time.Millisecond)

transmissions, err = pm.Load(ctx)
require.NoError(t, err)
require.Equal(t, []*Transmission{
{Req: &pb.TransmitRequest{Payload: reports[3]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 4}}},
{Req: &pb.TransmitRequest{Payload: reports[2]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 3}}},
{Req: &pb.TransmitRequest{Payload: reports[1]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 2}}},
{Req: &pb.TransmitRequest{Payload: reports[24]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 24}}},
{Req: &pb.TransmitRequest{Payload: reports[23]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 23}}},
{Req: &pb.TransmitRequest{Payload: reports[22]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 22}}},
}, transmissions)

t.Run("prune was scoped to job ID", func(t *testing.T) {
transmissions, err = pm2.Load(ctx)
require.NoError(t, err)
assert.Len(t, transmissions, 20)
})
}
Loading

0 comments on commit e242dfb

Please sign in to comment.