Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Functions] Offchain heartbeat support in Listener and OCR2 plugin #11274

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions core/services/functions/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ const (
DefaultPruneCheckFrequencySec uint32 = 60 * 10
DefaultPruneBatchSize uint32 = 500

// Used in place of OnchainMetadata for all offchain requests.
OffchainRequestMarker string = "OFFCHAIN_REQUEST"

FlagCBORMaxSize uint32 = 1
FlagSecretsMaxSize uint32 = 2
)
Expand Down Expand Up @@ -280,6 +283,45 @@ func (l *FunctionsListener) getMaxSecretsSize(flags RequestFlags) uint32 {
return l.pluginConfig.MaxSecretsSizesList[idx]
}

func (l *FunctionsListener) HandleOffchainRequest(ctx context.Context, request *OffchainRequest) error {
if request == nil {
return errors.New("HandleOffchainRequest: received nil request")
}
if len(request.RequestId) != RequestIDLength {
return fmt.Errorf("HandleOffchainRequest: invalid request ID length %d", len(request.RequestId))
}
if len(request.SubscriptionOwner) != common.AddressLength || len(request.RequestInitiator) != common.AddressLength {
return fmt.Errorf("HandleOffchainRequest: SubscriptionOwner and RequestInitiator must be set to valid addresses")
}

justinkaseman marked this conversation as resolved.
Show resolved Hide resolved
var requestId RequestID
copy(requestId[:], request.RequestId[:32])
subscriptionOwner := common.BytesToAddress(request.SubscriptionOwner)
senderAddr := common.BytesToAddress(request.RequestInitiator)
emptyTxHash := common.Hash{}
zeroCallbackGasLimit := uint32(0)
newReq := &Request{
RequestID: requestId,
RequestTxHash: &emptyTxHash,
ReceivedAt: time.Now(),
Flags: []byte{},
bolekk marked this conversation as resolved.
Show resolved Hide resolved
CallbackGasLimit: &zeroCallbackGasLimit,
// use sender address in place of coordinator contract to keep batches uniform
CoordinatorContractAddress: &senderAddr,
OnchainMetadata: []byte(OffchainRequestMarker),
bolekk marked this conversation as resolved.
Show resolved Hide resolved
}
if err := l.pluginORM.CreateRequest(newReq, pg.WithParentCtx(ctx)); err != nil {
if errors.Is(err, ErrDuplicateRequestID) {
l.logger.Warnw("HandleOffchainRequest: received duplicate request ID", "requestID", formatRequestId(requestId), "err", err)
} else {
l.logger.Errorw("HandleOffchainRequest: failed to create a DB entry for new request", "requestID", formatRequestId(requestId), "err", err)
}
return err
}
l.handleRequest(ctx, requestId, request.SubscriptionId, subscriptionOwner, RequestFlags{}, &request.Data)
return nil
}

func (l *FunctionsListener) handleOracleRequestV1(request *evmrelayTypes.OracleRequest) {
defer l.shutdownWaitGroup.Done()
l.logger.Infow("handleOracleRequestV1: oracle request v1 received", "requestID", formatRequestId(request.RequestId))
Expand Down
40 changes: 40 additions & 0 deletions core/services/functions/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,46 @@ func TestFunctionsListener_HandleOracleRequestV1_Success(t *testing.T) {
uni.service.Close()
}

func TestFunctionsListener_HandleOffchainRequest_Success(t *testing.T) {
testutils.SkipShortDB(t)
t.Parallel()

uni := NewFunctionsListenerUniverse(t, 0, 1_000_000)

uni.pluginORM.On("CreateRequest", mock.Anything, mock.Anything).Return(nil)
uni.bridgeAccessor.On("NewExternalAdapterClient").Return(uni.eaClient, nil)
uni.eaClient.On("RunComputation", mock.Anything, RequestIDStr, mock.Anything, SubscriptionOwner.Hex(), SubscriptionID, mock.Anything, mock.Anything, mock.Anything).Return(ResultBytes, nil, nil, nil)
uni.pluginORM.On("SetResult", RequestID, ResultBytes, mock.Anything, mock.Anything).Return(nil)

request := &functions_service.OffchainRequest{
RequestId: RequestID[:],
RequestInitiator: SubscriptionOwner.Bytes(),
SubscriptionId: uint64(SubscriptionID),
SubscriptionOwner: SubscriptionOwner.Bytes(),
Data: functions_service.RequestData{},
}
require.NoError(t, uni.service.HandleOffchainRequest(testutils.Context(t), request))
}

func TestFunctionsListener_HandleOffchainRequest_Invalid(t *testing.T) {
testutils.SkipShortDB(t)
t.Parallel()
uni := NewFunctionsListenerUniverse(t, 0, 1_000_000)

request := &functions_service.OffchainRequest{
RequestId: RequestID[:],
RequestInitiator: []byte("invalid_address"),
SubscriptionId: uint64(SubscriptionID),
SubscriptionOwner: SubscriptionOwner.Bytes(),
Data: functions_service.RequestData{},
}
require.Error(t, uni.service.HandleOffchainRequest(testutils.Context(t), request))

request.RequestInitiator = SubscriptionOwner.Bytes()
request.SubscriptionOwner = []byte("invalid_address")
require.Error(t, uni.service.HandleOffchainRequest(testutils.Context(t), request))
}

func TestFunctionsListener_HandleOracleRequestV1_ComputationError(t *testing.T) {
testutils.SkipShortDB(t)
t.Parallel()
Expand Down
59 changes: 59 additions & 0 deletions core/services/functions/mocks/offchain_transmitter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions core/services/functions/offchain_transmitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package functions

import (
"context"

"github.com/pkg/errors"
)

// Simple wrapper around a channel to transmit offchain reports between
// OCR plugin and Gateway connector
//
//go:generate mockery --quiet --name OffchainTransmitter --output ./mocks/ --case=underscore
type OffchainTransmitter interface {
TransmitReport(ctx context.Context, report *OffchainResponse) error
ReportChannel() chan *OffchainResponse
}

type offchainTransmitter struct {
reportCh chan *OffchainResponse
}

func NewOffchainTransmitter(chanSize uint32) OffchainTransmitter {
return &offchainTransmitter{
reportCh: make(chan *OffchainResponse, chanSize),
}
}

func (t *offchainTransmitter) TransmitReport(ctx context.Context, report *OffchainResponse) error {
select {
case t.reportCh <- report:
return nil
case <-ctx.Done():
return errors.New("context cancelled")
}
}

func (t *offchainTransmitter) ReportChannel() chan *OffchainResponse {
return t.reportCh
}
31 changes: 31 additions & 0 deletions core/services/functions/offchain_transmitter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package functions_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/services/functions"
)

func TestOffchainTransmitter(t *testing.T) {
t.Parallel()

transmitter := functions.NewOffchainTransmitter(1)
ch := transmitter.ReportChannel()
report := &functions.OffchainResponse{RequestId: []byte("testID")}
ctx := testutils.Context(t)

require.NoError(t, transmitter.TransmitReport(ctx, report))
require.Equal(t, report, <-ch)

require.NoError(t, transmitter.TransmitReport(ctx, report))

ctxTimeout, cancel := context.WithTimeout(ctx, time.Millisecond*20)
defer cancel()
// should not freeze
require.Error(t, transmitter.TransmitReport(ctxTimeout, report))
}
15 changes: 15 additions & 0 deletions core/services/functions/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ const (

type RequestFlags [32]byte

type OffchainRequest struct {
RequestId []byte `json:"requestId"`
RequestInitiator []byte `json:"requestInitiator"`
SubscriptionId uint64 `json:"subscriptionId"`
SubscriptionOwner []byte `json:"subscriptionOwner"`
Data RequestData `json:"data"`
}

type RequestData struct {
Source string `json:"source" cbor:"source"`
Language int `json:"language" cbor:"language"`
Expand All @@ -19,6 +27,13 @@ type RequestData struct {
BytesArgs [][]byte `json:"bytesArgs,omitempty" cbor:"bytesArgs"`
}

// NOTE: to be extended with raw report and signatures when needed
type OffchainResponse struct {
RequestId []byte `json:"requestId"`
Result []byte `json:"result"`
Error []byte `json:"error"`
}

type DONHostedSecrets struct {
SlotID uint `json:"slotId" cbor:"slotId"`
Version uint64 `json:"version" cbor:"version"`
Expand Down
17 changes: 10 additions & 7 deletions core/services/ocr2/plugins/functions/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ type FunctionsServicesConfig struct {
}

const (
FunctionsBridgeName string = "ea_bridge"
FunctionsS4Namespace string = "functions"
MaxAdapterResponseBytes int64 = 1_000_000
FunctionsBridgeName string = "ea_bridge"
FunctionsS4Namespace string = "functions"
MaxAdapterResponseBytes int64 = 1_000_000
DefaultOffchainTransmitterChannelSize uint32 = 1000
)

// Create all OCR2 plugin Oracles and all extra services needed to run a Functions job.
Expand Down Expand Up @@ -100,6 +101,7 @@ func NewFunctionsServices(functionsOracleArgs, thresholdOracleArgs, s4OracleArgs
s4Storage = s4.NewStorage(conf.Logger, *pluginConfig.S4Constraints, s4ORM, utils.NewRealClock())
}

offchainTransmitter := functions.NewOffchainTransmitter(DefaultOffchainTransmitterChannelSize)
listenerLogger := conf.Logger.Named("FunctionsListener")
bridgeAccessor := functions.NewBridgeAccessor(conf.BridgeORM, FunctionsBridgeName, MaxAdapterResponseBytes)
functionsListener := functions.NewFunctionsListener(
Expand All @@ -118,10 +120,11 @@ func NewFunctionsServices(functionsOracleArgs, thresholdOracleArgs, s4OracleArgs
allServices = append(allServices, functionsListener)

functionsOracleArgs.ReportingPluginFactory = FunctionsReportingPluginFactory{
Logger: functionsOracleArgs.Logger,
PluginORM: pluginORM,
JobID: conf.Job.ExternalJobID,
ContractVersion: pluginConfig.ContractVersion,
Logger: functionsOracleArgs.Logger,
PluginORM: pluginORM,
JobID: conf.Job.ExternalJobID,
ContractVersion: pluginConfig.ContractVersion,
OffchainTransmitter: offchainTransmitter,
}
functionsReportingPluginOracle, err := libocr2.NewOracle(*functionsOracleArgs)
if err != nil {
Expand Down
48 changes: 30 additions & 18 deletions core/services/ocr2/plugins/functions/reporting.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package functions

import (
"bytes"
"context"
"fmt"

Expand All @@ -21,22 +22,24 @@ import (
)

type FunctionsReportingPluginFactory struct {
Logger commontypes.Logger
PluginORM functions.ORM
JobID uuid.UUID
ContractVersion uint32
Logger commontypes.Logger
PluginORM functions.ORM
JobID uuid.UUID
ContractVersion uint32
OffchainTransmitter functions.OffchainTransmitter
}

var _ types.ReportingPluginFactory = (*FunctionsReportingPluginFactory)(nil)

type functionsReporting struct {
logger commontypes.Logger
pluginORM functions.ORM
jobID uuid.UUID
reportCodec encoding.ReportCodec
genericConfig *types.ReportingPluginConfig
specificConfig *config.ReportingPluginConfigWrapper
contractVersion uint32
logger commontypes.Logger
pluginORM functions.ORM
jobID uuid.UUID
reportCodec encoding.ReportCodec
genericConfig *types.ReportingPluginConfig
specificConfig *config.ReportingPluginConfigWrapper
contractVersion uint32
offchainTransmitter functions.OffchainTransmitter
}

var _ types.ReportingPlugin = &functionsReporting{}
Expand Down Expand Up @@ -112,13 +115,14 @@ func (f FunctionsReportingPluginFactory) NewReportingPlugin(rpConfig types.Repor
},
}
plugin := functionsReporting{
logger: f.Logger,
pluginORM: f.PluginORM,
jobID: f.JobID,
reportCodec: codec,
genericConfig: &rpConfig,
specificConfig: pluginConfig,
contractVersion: f.ContractVersion,
logger: f.Logger,
pluginORM: f.PluginORM,
jobID: f.JobID,
reportCodec: codec,
genericConfig: &rpConfig,
specificConfig: pluginConfig,
contractVersion: f.ContractVersion,
offchainTransmitter: f.OffchainTransmitter,
}
promReportingPlugins.WithLabelValues(f.JobID.String()).Inc()
return &plugin, info, nil
Expand Down Expand Up @@ -437,6 +441,14 @@ func (r *functionsReporting) ShouldAcceptFinalizedReport(ctx context.Context, ts
r.logger.Debug("FunctionsReporting ShouldAcceptFinalizedReport: state couldn't be changed to FINALIZED. Not transmitting.", commontypes.LogFields{"requestID": reqIdStr, "err": err})
continue
}
if bytes.Equal(item.OnchainMetadata, []byte(functions.OffchainRequestMarker)) {
r.logger.Debug("FunctionsReporting ShouldAcceptFinalizedReport: transmitting offchain", commontypes.LogFields{"requestID": reqIdStr})
result := functions.OffchainResponse{RequestId: item.RequestID, Result: item.Result, Error: item.Error}
if err := r.offchainTransmitter.TransmitReport(ctx, &result); err != nil {
r.logger.Error("FunctionsReporting ShouldAcceptFinalizedReport: unable to transmit offchain", commontypes.LogFields{"requestID": reqIdStr, "err": err})
}
continue // doesn't need onchain transmission
}
needTransmissionIds = append(needTransmissionIds, reqIdStr)
}
r.logger.Debug("FunctionsReporting ShouldAcceptFinalizedReport end", commontypes.LogFields{
Expand Down
Loading
Loading