From 0dd74edad034f13675e180264ef8a8e7bb9e4240 Mon Sep 17 00:00:00 2001 From: Bolek Kulbabinski <1416262+bolekk@users.noreply.github.com> Date: Mon, 13 Nov 2023 13:44:19 -0800 Subject: [PATCH] [Functions] Offchain heartbeat support in Listener and OCR2 plugin 1. Add new API structs 2. Mark offchain requests in Listener 3. Introduce a simple OffchainTransmitter to pass responses from the plugin --- core/services/functions/listener.go | 42 +++++++++++++ core/services/functions/listener_test.go | 40 +++++++++++++ .../functions/mocks/offchain_transmitter.go | 59 ++++++++++++++++++ .../functions/offchain_transmitter.go | 39 ++++++++++++ .../functions/offchain_transmitter_test.go | 31 ++++++++++ core/services/functions/request.go | 15 +++++ .../services/ocr2/plugins/functions/plugin.go | 17 +++--- .../ocr2/plugins/functions/reporting.go | 48 +++++++++------ .../ocr2/plugins/functions/reporting_test.go | 60 +++++++++++++------ 9 files changed, 308 insertions(+), 43 deletions(-) create mode 100644 core/services/functions/mocks/offchain_transmitter.go create mode 100644 core/services/functions/offchain_transmitter.go create mode 100644 core/services/functions/offchain_transmitter_test.go diff --git a/core/services/functions/listener.go b/core/services/functions/listener.go index 773ae610408..5614c5331d4 100644 --- a/core/services/functions/listener.go +++ b/core/services/functions/listener.go @@ -111,6 +111,9 @@ const ( DefaultPruneCheckFrequencySec uint32 = 60 * 10 DefaultPruneBatchSize uint32 = 500 + // Used in place of OnchainMetadata for all offchain requests. + OffchainRequestMarker string = "OFFCHAIN_REQUEST" + FlagCBORMaxSize uint32 = 1 FlagSecretsMaxSize uint32 = 2 ) @@ -280,6 +283,45 @@ func (l *FunctionsListener) getMaxSecretsSize(flags RequestFlags) uint32 { return l.pluginConfig.MaxSecretsSizesList[idx] } +func (l *FunctionsListener) HandleOffchainRequest(ctx context.Context, request *OffchainRequest) error { + if request == nil { + return errors.New("HandleOffchainRequest: received nil request") + } + if len(request.RequestId) != RequestIDLength { + return fmt.Errorf("HandleOffchainRequest: invalid request ID length %d", len(request.RequestId)) + } + if len(request.SubscriptionOwner) != common.AddressLength || len(request.RequestInitiator) != common.AddressLength { + return fmt.Errorf("HandleOffchainRequest: SubscriptionOwner and RequestInitiator must be set to valid addresses") + } + + var requestId RequestID + copy(requestId[:], request.RequestId[:32]) + subscriptionOwner := common.BytesToAddress(request.SubscriptionOwner) + senderAddr := common.BytesToAddress(request.RequestInitiator) + emptyTxHash := common.Hash{} + zeroCallbackGasLimit := uint32(0) + newReq := &Request{ + RequestID: requestId, + RequestTxHash: &emptyTxHash, + ReceivedAt: time.Now(), + Flags: []byte{}, + CallbackGasLimit: &zeroCallbackGasLimit, + // use sender address in place of coordinator contract to keep batches uniform + CoordinatorContractAddress: &senderAddr, + OnchainMetadata: []byte(OffchainRequestMarker), + } + if err := l.pluginORM.CreateRequest(newReq, pg.WithParentCtx(ctx)); err != nil { + if errors.Is(err, ErrDuplicateRequestID) { + l.logger.Warnw("HandleOffchainRequest: received duplicate request ID", "requestID", formatRequestId(requestId), "err", err) + } else { + l.logger.Errorw("HandleOffchainRequest: failed to create a DB entry for new request", "requestID", formatRequestId(requestId), "err", err) + } + return err + } + l.handleRequest(ctx, requestId, request.SubscriptionId, subscriptionOwner, RequestFlags{}, &request.Data) + return nil +} + func (l *FunctionsListener) handleOracleRequestV1(request *evmrelayTypes.OracleRequest) { defer l.shutdownWaitGroup.Done() l.logger.Infow("handleOracleRequestV1: oracle request v1 received", "requestID", formatRequestId(request.RequestId)) diff --git a/core/services/functions/listener_test.go b/core/services/functions/listener_test.go index 3b7ed46988d..ac2bc64184d 100644 --- a/core/services/functions/listener_test.go +++ b/core/services/functions/listener_test.go @@ -179,6 +179,46 @@ func TestFunctionsListener_HandleOracleRequestV1_Success(t *testing.T) { uni.service.Close() } +func TestFunctionsListener_HandleOffchainRequest_Success(t *testing.T) { + testutils.SkipShortDB(t) + t.Parallel() + + uni := NewFunctionsListenerUniverse(t, 0, 1_000_000) + + uni.pluginORM.On("CreateRequest", mock.Anything, mock.Anything).Return(nil) + uni.bridgeAccessor.On("NewExternalAdapterClient").Return(uni.eaClient, nil) + uni.eaClient.On("RunComputation", mock.Anything, RequestIDStr, mock.Anything, SubscriptionOwner.Hex(), SubscriptionID, mock.Anything, mock.Anything, mock.Anything).Return(ResultBytes, nil, nil, nil) + uni.pluginORM.On("SetResult", RequestID, ResultBytes, mock.Anything, mock.Anything).Return(nil) + + request := &functions_service.OffchainRequest{ + RequestId: RequestID[:], + RequestInitiator: SubscriptionOwner.Bytes(), + SubscriptionId: uint64(SubscriptionID), + SubscriptionOwner: SubscriptionOwner.Bytes(), + Data: functions_service.RequestData{}, + } + require.NoError(t, uni.service.HandleOffchainRequest(testutils.Context(t), request)) +} + +func TestFunctionsListener_HandleOffchainRequest_Invalid(t *testing.T) { + testutils.SkipShortDB(t) + t.Parallel() + uni := NewFunctionsListenerUniverse(t, 0, 1_000_000) + + request := &functions_service.OffchainRequest{ + RequestId: RequestID[:], + RequestInitiator: []byte("invalid_address"), + SubscriptionId: uint64(SubscriptionID), + SubscriptionOwner: SubscriptionOwner.Bytes(), + Data: functions_service.RequestData{}, + } + require.Error(t, uni.service.HandleOffchainRequest(testutils.Context(t), request)) + + request.RequestInitiator = SubscriptionOwner.Bytes() + request.SubscriptionOwner = []byte("invalid_address") + require.Error(t, uni.service.HandleOffchainRequest(testutils.Context(t), request)) +} + func TestFunctionsListener_HandleOracleRequestV1_ComputationError(t *testing.T) { testutils.SkipShortDB(t) t.Parallel() diff --git a/core/services/functions/mocks/offchain_transmitter.go b/core/services/functions/mocks/offchain_transmitter.go new file mode 100644 index 00000000000..d9a7be04dd4 --- /dev/null +++ b/core/services/functions/mocks/offchain_transmitter.go @@ -0,0 +1,59 @@ +// Code generated by mockery v2.35.4. DO NOT EDIT. + +package mocks + +import ( + context "context" + + functions "github.com/smartcontractkit/chainlink/v2/core/services/functions" + mock "github.com/stretchr/testify/mock" +) + +// OffchainTransmitter is an autogenerated mock type for the OffchainTransmitter type +type OffchainTransmitter struct { + mock.Mock +} + +// ReportChannel provides a mock function with given fields: +func (_m *OffchainTransmitter) ReportChannel() chan *functions.OffchainResponse { + ret := _m.Called() + + var r0 chan *functions.OffchainResponse + if rf, ok := ret.Get(0).(func() chan *functions.OffchainResponse); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(chan *functions.OffchainResponse) + } + } + + return r0 +} + +// TransmitReport provides a mock function with given fields: ctx, report +func (_m *OffchainTransmitter) TransmitReport(ctx context.Context, report *functions.OffchainResponse) error { + ret := _m.Called(ctx, report) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *functions.OffchainResponse) error); ok { + r0 = rf(ctx, report) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewOffchainTransmitter creates a new instance of OffchainTransmitter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewOffchainTransmitter(t interface { + mock.TestingT + Cleanup(func()) +}) *OffchainTransmitter { + mock := &OffchainTransmitter{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/services/functions/offchain_transmitter.go b/core/services/functions/offchain_transmitter.go new file mode 100644 index 00000000000..63527937f92 --- /dev/null +++ b/core/services/functions/offchain_transmitter.go @@ -0,0 +1,39 @@ +package functions + +import ( + "context" + + "github.com/pkg/errors" +) + +// Simple wrapper around a channel to transmit offchain reports between +// OCR plugin and Gateway connector +// +//go:generate mockery --quiet --name OffchainTransmitter --output ./mocks/ --case=underscore +type OffchainTransmitter interface { + TransmitReport(ctx context.Context, report *OffchainResponse) error + ReportChannel() chan *OffchainResponse +} + +type offchainTransmitter struct { + reportCh chan *OffchainResponse +} + +func NewOffchainTransmitter(chanSize uint32) OffchainTransmitter { + return &offchainTransmitter{ + reportCh: make(chan *OffchainResponse, chanSize), + } +} + +func (t *offchainTransmitter) TransmitReport(ctx context.Context, report *OffchainResponse) error { + select { + case t.reportCh <- report: + return nil + case <-ctx.Done(): + return errors.New("context cancelled") + } +} + +func (t *offchainTransmitter) ReportChannel() chan *OffchainResponse { + return t.reportCh +} diff --git a/core/services/functions/offchain_transmitter_test.go b/core/services/functions/offchain_transmitter_test.go new file mode 100644 index 00000000000..bec639bf27d --- /dev/null +++ b/core/services/functions/offchain_transmitter_test.go @@ -0,0 +1,31 @@ +package functions_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/services/functions" +) + +func TestOffchainTransmitter(t *testing.T) { + t.Parallel() + + transmitter := functions.NewOffchainTransmitter(1) + ch := transmitter.ReportChannel() + report := &functions.OffchainResponse{RequestId: []byte("testID")} + ctx := testutils.Context(t) + + require.NoError(t, transmitter.TransmitReport(ctx, report)) + require.Equal(t, report, <-ch) + + require.NoError(t, transmitter.TransmitReport(ctx, report)) + + ctxTimeout, cancel := context.WithTimeout(ctx, time.Millisecond*20) + defer cancel() + // should not freeze + require.Error(t, transmitter.TransmitReport(ctxTimeout, report)) +} diff --git a/core/services/functions/request.go b/core/services/functions/request.go index 181058b83d0..14c0b0d0e5a 100644 --- a/core/services/functions/request.go +++ b/core/services/functions/request.go @@ -9,6 +9,14 @@ const ( type RequestFlags [32]byte +type OffchainRequest struct { + RequestId []byte `json:"requestId"` + RequestInitiator []byte `json:"requestInitiator"` + SubscriptionId uint64 `json:"subscriptionId"` + SubscriptionOwner []byte `json:"subscriptionOwner"` + Data RequestData `json:"data"` +} + type RequestData struct { Source string `json:"source" cbor:"source"` Language int `json:"language" cbor:"language"` @@ -19,6 +27,13 @@ type RequestData struct { BytesArgs [][]byte `json:"bytesArgs,omitempty" cbor:"bytesArgs"` } +// NOTE: to be extended with raw report and signatures when needed +type OffchainResponse struct { + RequestId []byte `json:"requestId"` + Result []byte `json:"result"` + Error []byte `json:"error"` +} + type DONHostedSecrets struct { SlotID uint `json:"slotId" cbor:"slotId"` Version uint64 `json:"version" cbor:"version"` diff --git a/core/services/ocr2/plugins/functions/plugin.go b/core/services/ocr2/plugins/functions/plugin.go index 26cffac5abf..8597b8ad4cc 100644 --- a/core/services/ocr2/plugins/functions/plugin.go +++ b/core/services/ocr2/plugins/functions/plugin.go @@ -50,9 +50,10 @@ type FunctionsServicesConfig struct { } const ( - FunctionsBridgeName string = "ea_bridge" - FunctionsS4Namespace string = "functions" - MaxAdapterResponseBytes int64 = 1_000_000 + FunctionsBridgeName string = "ea_bridge" + FunctionsS4Namespace string = "functions" + MaxAdapterResponseBytes int64 = 1_000_000 + DefaultOffchainTransmitterChannelSize uint32 = 1000 ) // Create all OCR2 plugin Oracles and all extra services needed to run a Functions job. @@ -100,6 +101,7 @@ func NewFunctionsServices(functionsOracleArgs, thresholdOracleArgs, s4OracleArgs s4Storage = s4.NewStorage(conf.Logger, *pluginConfig.S4Constraints, s4ORM, utils.NewRealClock()) } + offchainTransmitter := functions.NewOffchainTransmitter(DefaultOffchainTransmitterChannelSize) listenerLogger := conf.Logger.Named("FunctionsListener") bridgeAccessor := functions.NewBridgeAccessor(conf.BridgeORM, FunctionsBridgeName, MaxAdapterResponseBytes) functionsListener := functions.NewFunctionsListener( @@ -118,10 +120,11 @@ func NewFunctionsServices(functionsOracleArgs, thresholdOracleArgs, s4OracleArgs allServices = append(allServices, functionsListener) functionsOracleArgs.ReportingPluginFactory = FunctionsReportingPluginFactory{ - Logger: functionsOracleArgs.Logger, - PluginORM: pluginORM, - JobID: conf.Job.ExternalJobID, - ContractVersion: pluginConfig.ContractVersion, + Logger: functionsOracleArgs.Logger, + PluginORM: pluginORM, + JobID: conf.Job.ExternalJobID, + ContractVersion: pluginConfig.ContractVersion, + OffchainTransmitter: offchainTransmitter, } functionsReportingPluginOracle, err := libocr2.NewOracle(*functionsOracleArgs) if err != nil { diff --git a/core/services/ocr2/plugins/functions/reporting.go b/core/services/ocr2/plugins/functions/reporting.go index dfa8f575d1b..36e8a882734 100644 --- a/core/services/ocr2/plugins/functions/reporting.go +++ b/core/services/ocr2/plugins/functions/reporting.go @@ -1,6 +1,7 @@ package functions import ( + "bytes" "context" "fmt" @@ -21,22 +22,24 @@ import ( ) type FunctionsReportingPluginFactory struct { - Logger commontypes.Logger - PluginORM functions.ORM - JobID uuid.UUID - ContractVersion uint32 + Logger commontypes.Logger + PluginORM functions.ORM + JobID uuid.UUID + ContractVersion uint32 + OffchainTransmitter functions.OffchainTransmitter } var _ types.ReportingPluginFactory = (*FunctionsReportingPluginFactory)(nil) type functionsReporting struct { - logger commontypes.Logger - pluginORM functions.ORM - jobID uuid.UUID - reportCodec encoding.ReportCodec - genericConfig *types.ReportingPluginConfig - specificConfig *config.ReportingPluginConfigWrapper - contractVersion uint32 + logger commontypes.Logger + pluginORM functions.ORM + jobID uuid.UUID + reportCodec encoding.ReportCodec + genericConfig *types.ReportingPluginConfig + specificConfig *config.ReportingPluginConfigWrapper + contractVersion uint32 + offchainTransmitter functions.OffchainTransmitter } var _ types.ReportingPlugin = &functionsReporting{} @@ -112,13 +115,14 @@ func (f FunctionsReportingPluginFactory) NewReportingPlugin(rpConfig types.Repor }, } plugin := functionsReporting{ - logger: f.Logger, - pluginORM: f.PluginORM, - jobID: f.JobID, - reportCodec: codec, - genericConfig: &rpConfig, - specificConfig: pluginConfig, - contractVersion: f.ContractVersion, + logger: f.Logger, + pluginORM: f.PluginORM, + jobID: f.JobID, + reportCodec: codec, + genericConfig: &rpConfig, + specificConfig: pluginConfig, + contractVersion: f.ContractVersion, + offchainTransmitter: f.OffchainTransmitter, } promReportingPlugins.WithLabelValues(f.JobID.String()).Inc() return &plugin, info, nil @@ -437,6 +441,14 @@ func (r *functionsReporting) ShouldAcceptFinalizedReport(ctx context.Context, ts r.logger.Debug("FunctionsReporting ShouldAcceptFinalizedReport: state couldn't be changed to FINALIZED. Not transmitting.", commontypes.LogFields{"requestID": reqIdStr, "err": err}) continue } + if bytes.Equal(item.OnchainMetadata, []byte(functions.OffchainRequestMarker)) { + r.logger.Debug("FunctionsReporting ShouldAcceptFinalizedReport: transmitting offchain", commontypes.LogFields{"requestID": reqIdStr}) + result := functions.OffchainResponse{RequestId: item.RequestID, Result: item.Result, Error: item.Error} + if err := r.offchainTransmitter.TransmitReport(ctx, &result); err != nil { + r.logger.Error("FunctionsReporting ShouldAcceptFinalizedReport: unable to transmit offchain", commontypes.LogFields{"requestID": reqIdStr, "err": err}) + } + continue // doesn't need onchain transmission + } needTransmissionIds = append(needTransmissionIds, reqIdStr) } r.logger.Debug("FunctionsReporting ShouldAcceptFinalizedReport end", commontypes.LogFields{ diff --git a/core/services/ocr2/plugins/functions/reporting_test.go b/core/services/ocr2/plugins/functions/reporting_test.go index 24b430abd93..860492bfc52 100644 --- a/core/services/ocr2/plugins/functions/reporting_test.go +++ b/core/services/ocr2/plugins/functions/reporting_test.go @@ -22,14 +22,16 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/functions/encoding" ) -func preparePlugin(t *testing.T, batchSize uint32, maxTotalGasLimit uint32) (types.ReportingPlugin, *functions_mocks.ORM, encoding.ReportCodec) { +func preparePlugin(t *testing.T, batchSize uint32, maxTotalGasLimit uint32) (types.ReportingPlugin, *functions_mocks.ORM, encoding.ReportCodec, *functions_mocks.OffchainTransmitter) { lggr := logger.TestLogger(t) ocrLogger := relaylogger.NewOCRWrapper(lggr, true, func(msg string) {}) orm := functions_mocks.NewORM(t) + offchainTransmitter := functions_mocks.NewOffchainTransmitter(t) factory := functions.FunctionsReportingPluginFactory{ - Logger: ocrLogger, - PluginORM: orm, - ContractVersion: 1, + Logger: ocrLogger, + PluginORM: orm, + ContractVersion: 1, + OffchainTransmitter: offchainTransmitter, } pluginConfig := config.ReportingPluginConfigWrapper{ @@ -48,7 +50,7 @@ func preparePlugin(t *testing.T, batchSize uint32, maxTotalGasLimit uint32) (typ require.NoError(t, err) codec, err := encoding.NewReportCodec(1) require.NoError(t, err) - return plugin, orm, codec + return plugin, orm, codec, offchainTransmitter } func newRequestID() functions_srv.RequestID { @@ -130,7 +132,7 @@ func newObservation(t *testing.T, observerId uint8, requests ...*encoding.Proces func TestFunctionsReporting_Query(t *testing.T) { t.Parallel() const batchSize = 10 - plugin, orm, _ := preparePlugin(t, batchSize, 0) + plugin, orm, _, _ := preparePlugin(t, batchSize, 0) reqs := []functions_srv.Request{newRequest(), newRequest()} orm.On("FindOldestEntriesByState", functions_srv.RESULT_READY, uint32(batchSize), mock.Anything).Return(reqs, nil) @@ -148,7 +150,7 @@ func TestFunctionsReporting_Query(t *testing.T) { func TestFunctionsReporting_Query_HandleCoordinatorMismatch(t *testing.T) { t.Parallel() const batchSize = 10 - plugin, orm, _ := preparePlugin(t, batchSize, 1000000) + plugin, orm, _, _ := preparePlugin(t, batchSize, 1000000) reqs := []functions_srv.Request{newRequest(), newRequest()} reqs[0].CoordinatorContractAddress = &common.Address{1} reqs[1].CoordinatorContractAddress = &common.Address{2} @@ -167,7 +169,7 @@ func TestFunctionsReporting_Query_HandleCoordinatorMismatch(t *testing.T) { func TestFunctionsReporting_Observation(t *testing.T) { t.Parallel() - plugin, orm, _ := preparePlugin(t, 10, 0) + plugin, orm, _, _ := preparePlugin(t, 10, 0) req1 := newRequestWithResult([]byte("abc")) req2 := newRequest() @@ -202,7 +204,7 @@ func TestFunctionsReporting_Observation(t *testing.T) { func TestFunctionsReporting_Observation_IncorrectQuery(t *testing.T) { t.Parallel() - plugin, orm, _ := preparePlugin(t, 10, 0) + plugin, orm, _, _ := preparePlugin(t, 10, 0) req1 := newRequestWithResult([]byte("abc")) invalidId := []byte("invalid") @@ -229,7 +231,7 @@ func TestFunctionsReporting_Observation_IncorrectQuery(t *testing.T) { func TestFunctionsReporting_Report(t *testing.T) { t.Parallel() - plugin, _, codec := preparePlugin(t, 10, 1000000) + plugin, _, codec, _ := preparePlugin(t, 10, 1000000) reqId1, reqId2, reqId3 := newRequestID(), newRequestID(), newRequestID() compResult := []byte("aaa") procReq1 := newProcessedRequest(reqId1, compResult, []byte{}) @@ -266,7 +268,7 @@ func TestFunctionsReporting_Report(t *testing.T) { func TestFunctionsReporting_Report_WithGasLimitAndMetadata(t *testing.T) { t.Parallel() - plugin, _, codec := preparePlugin(t, 10, 300000) + plugin, _, codec, _ := preparePlugin(t, 10, 300000) reqId1, reqId2, reqId3 := newRequestID(), newRequestID(), newRequestID() compResult := []byte("aaa") gasLimit1, gasLimit2 := uint32(100_000), uint32(200_000) @@ -307,7 +309,7 @@ func TestFunctionsReporting_Report_WithGasLimitAndMetadata(t *testing.T) { func TestFunctionsReporting_Report_HandleCoordinatorMismatch(t *testing.T) { t.Parallel() - plugin, _, codec := preparePlugin(t, 10, 300000) + plugin, _, codec, _ := preparePlugin(t, 10, 300000) reqId1, reqId2, reqId3 := newRequestID(), newRequestID(), newRequestID() compResult, meta := []byte("aaa"), []byte("meta") coordinatorContractA, coordinatorContractB := common.Address{1}, common.Address{2} @@ -337,7 +339,7 @@ func TestFunctionsReporting_Report_HandleCoordinatorMismatch(t *testing.T) { func TestFunctionsReporting_Report_CallbackGasLimitExceeded(t *testing.T) { t.Parallel() - plugin, _, codec := preparePlugin(t, 10, 200000) + plugin, _, codec, _ := preparePlugin(t, 10, 200000) reqId1, reqId2 := newRequestID(), newRequestID() compResult := []byte("aaa") gasLimit1, gasLimit2 := uint32(100_000), uint32(200_000) @@ -368,7 +370,7 @@ func TestFunctionsReporting_Report_CallbackGasLimitExceeded(t *testing.T) { func TestFunctionsReporting_Report_DeterministicOrderOfRequests(t *testing.T) { t.Parallel() - plugin, _, codec := preparePlugin(t, 10, 0) + plugin, _, codec, _ := preparePlugin(t, 10, 0) reqId1, reqId2, reqId3 := newRequestID(), newRequestID(), newRequestID() compResult := []byte("aaa") @@ -397,7 +399,7 @@ func TestFunctionsReporting_Report_DeterministicOrderOfRequests(t *testing.T) { func TestFunctionsReporting_Report_IncorrectObservation(t *testing.T) { t.Parallel() - plugin, _, _ := preparePlugin(t, 10, 0) + plugin, _, _, _ := preparePlugin(t, 10, 0) reqId1 := newRequestID() compResult := []byte("aaa") @@ -416,7 +418,14 @@ func getReportBytes(t *testing.T, codec encoding.ReportCodec, reqs ...functions_ var report []*encoding.ProcessedRequest for _, req := range reqs { req := req - report = append(report, &encoding.ProcessedRequest{RequestID: req.RequestID[:], Result: req.Result}) + report = append(report, &encoding.ProcessedRequest{ + RequestID: req.RequestID[:], + Result: req.Result, + Error: req.Error, + CallbackGasLimit: *req.CallbackGasLimit, + CoordinatorContract: req.CoordinatorContractAddress[:], + OnchainMetadata: req.OnchainMetadata, + }) } reportBytes, err := codec.EncodeReport(report) require.NoError(t, err) @@ -425,7 +434,7 @@ func getReportBytes(t *testing.T, codec encoding.ReportCodec, reqs ...functions_ func TestFunctionsReporting_ShouldAcceptFinalizedReport(t *testing.T) { t.Parallel() - plugin, orm, codec := preparePlugin(t, 10, 0) + plugin, orm, codec, _ := preparePlugin(t, 10, 0) req1 := newRequestWithResult([]byte("xxx")) // nonexistent req2 := newRequestWithResult([]byte("abc")) @@ -462,9 +471,24 @@ func TestFunctionsReporting_ShouldAcceptFinalizedReport(t *testing.T) { require.True(t, should) } +func TestFunctionsReporting_ShouldAcceptFinalizedReport_OffchainTransmission(t *testing.T) { + t.Parallel() + plugin, orm, codec, offchainTransmitter := preparePlugin(t, 10, 0) + req1 := newRequestWithResult([]byte("abc")) + req1.OnchainMetadata = []byte(functions_srv.OffchainRequestMarker) + + orm.On("FindById", req1.RequestID, mock.Anything).Return(&req1, nil) + orm.On("SetFinalized", req1.RequestID, mock.Anything, mock.Anything, mock.Anything).Return(nil) + offchainTransmitter.On("TransmitReport", mock.Anything, mock.Anything).Return(nil) + + should, err := plugin.ShouldAcceptFinalizedReport(testutils.Context(t), types.ReportTimestamp{}, getReportBytes(t, codec, req1)) + require.NoError(t, err) + require.False(t, should) +} + func TestFunctionsReporting_ShouldTransmitAcceptedReport(t *testing.T) { t.Parallel() - plugin, orm, codec := preparePlugin(t, 10, 0) + plugin, orm, codec, _ := preparePlugin(t, 10, 0) req1 := newRequestWithResult([]byte("xxx")) // nonexistent req2 := newRequestWithResult([]byte("abc"))