Skip to content

Commit

Permalink
[Functions] Offchain heartbeat support in Listener and OCR2 plugin (#…
Browse files Browse the repository at this point in the history
…11274)

1. Add new API structs
2. Mark offchain requests in Listener
3. Introduce a simple OffchainTransmitter to pass responses from the plugin
  • Loading branch information
bolekk authored Nov 15, 2023
1 parent e7e0d42 commit 3a38e90
Show file tree
Hide file tree
Showing 9 changed files with 308 additions and 43 deletions.
42 changes: 42 additions & 0 deletions core/services/functions/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ const (
DefaultPruneCheckFrequencySec uint32 = 60 * 10
DefaultPruneBatchSize uint32 = 500

// Used in place of OnchainMetadata for all offchain requests.
OffchainRequestMarker string = "OFFCHAIN_REQUEST"

FlagCBORMaxSize uint32 = 1
FlagSecretsMaxSize uint32 = 2
)
Expand Down Expand Up @@ -280,6 +283,45 @@ func (l *FunctionsListener) getMaxSecretsSize(flags RequestFlags) uint32 {
return l.pluginConfig.MaxSecretsSizesList[idx]
}

func (l *FunctionsListener) HandleOffchainRequest(ctx context.Context, request *OffchainRequest) error {
if request == nil {
return errors.New("HandleOffchainRequest: received nil request")
}
if len(request.RequestId) != RequestIDLength {
return fmt.Errorf("HandleOffchainRequest: invalid request ID length %d", len(request.RequestId))
}
if len(request.SubscriptionOwner) != common.AddressLength || len(request.RequestInitiator) != common.AddressLength {
return fmt.Errorf("HandleOffchainRequest: SubscriptionOwner and RequestInitiator must be set to valid addresses")
}

var requestId RequestID
copy(requestId[:], request.RequestId[:32])
subscriptionOwner := common.BytesToAddress(request.SubscriptionOwner)
senderAddr := common.BytesToAddress(request.RequestInitiator)
emptyTxHash := common.Hash{}
zeroCallbackGasLimit := uint32(0)
newReq := &Request{
RequestID: requestId,
RequestTxHash: &emptyTxHash,
ReceivedAt: time.Now(),
Flags: []byte{},
CallbackGasLimit: &zeroCallbackGasLimit,
// use sender address in place of coordinator contract to keep batches uniform
CoordinatorContractAddress: &senderAddr,
OnchainMetadata: []byte(OffchainRequestMarker),
}
if err := l.pluginORM.CreateRequest(newReq, pg.WithParentCtx(ctx)); err != nil {
if errors.Is(err, ErrDuplicateRequestID) {
l.logger.Warnw("HandleOffchainRequest: received duplicate request ID", "requestID", formatRequestId(requestId), "err", err)
} else {
l.logger.Errorw("HandleOffchainRequest: failed to create a DB entry for new request", "requestID", formatRequestId(requestId), "err", err)
}
return err
}
l.handleRequest(ctx, requestId, request.SubscriptionId, subscriptionOwner, RequestFlags{}, &request.Data)
return nil
}

func (l *FunctionsListener) handleOracleRequestV1(request *evmrelayTypes.OracleRequest) {
defer l.shutdownWaitGroup.Done()
l.logger.Infow("handleOracleRequestV1: oracle request v1 received", "requestID", formatRequestId(request.RequestId))
Expand Down
40 changes: 40 additions & 0 deletions core/services/functions/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,46 @@ func TestFunctionsListener_HandleOracleRequestV1_Success(t *testing.T) {
uni.service.Close()
}

func TestFunctionsListener_HandleOffchainRequest_Success(t *testing.T) {
testutils.SkipShortDB(t)
t.Parallel()

uni := NewFunctionsListenerUniverse(t, 0, 1_000_000)

uni.pluginORM.On("CreateRequest", mock.Anything, mock.Anything).Return(nil)
uni.bridgeAccessor.On("NewExternalAdapterClient").Return(uni.eaClient, nil)
uni.eaClient.On("RunComputation", mock.Anything, RequestIDStr, mock.Anything, SubscriptionOwner.Hex(), SubscriptionID, mock.Anything, mock.Anything, mock.Anything).Return(ResultBytes, nil, nil, nil)
uni.pluginORM.On("SetResult", RequestID, ResultBytes, mock.Anything, mock.Anything).Return(nil)

request := &functions_service.OffchainRequest{
RequestId: RequestID[:],
RequestInitiator: SubscriptionOwner.Bytes(),
SubscriptionId: uint64(SubscriptionID),
SubscriptionOwner: SubscriptionOwner.Bytes(),
Data: functions_service.RequestData{},
}
require.NoError(t, uni.service.HandleOffchainRequest(testutils.Context(t), request))
}

func TestFunctionsListener_HandleOffchainRequest_Invalid(t *testing.T) {
testutils.SkipShortDB(t)
t.Parallel()
uni := NewFunctionsListenerUniverse(t, 0, 1_000_000)

request := &functions_service.OffchainRequest{
RequestId: RequestID[:],
RequestInitiator: []byte("invalid_address"),
SubscriptionId: uint64(SubscriptionID),
SubscriptionOwner: SubscriptionOwner.Bytes(),
Data: functions_service.RequestData{},
}
require.Error(t, uni.service.HandleOffchainRequest(testutils.Context(t), request))

request.RequestInitiator = SubscriptionOwner.Bytes()
request.SubscriptionOwner = []byte("invalid_address")
require.Error(t, uni.service.HandleOffchainRequest(testutils.Context(t), request))
}

func TestFunctionsListener_HandleOracleRequestV1_ComputationError(t *testing.T) {
testutils.SkipShortDB(t)
t.Parallel()
Expand Down
59 changes: 59 additions & 0 deletions core/services/functions/mocks/offchain_transmitter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions core/services/functions/offchain_transmitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package functions

import (
"context"

"github.com/pkg/errors"
)

// Simple wrapper around a channel to transmit offchain reports between
// OCR plugin and Gateway connector
//
//go:generate mockery --quiet --name OffchainTransmitter --output ./mocks/ --case=underscore
type OffchainTransmitter interface {
TransmitReport(ctx context.Context, report *OffchainResponse) error
ReportChannel() chan *OffchainResponse
}

type offchainTransmitter struct {
reportCh chan *OffchainResponse
}

func NewOffchainTransmitter(chanSize uint32) OffchainTransmitter {
return &offchainTransmitter{
reportCh: make(chan *OffchainResponse, chanSize),
}
}

func (t *offchainTransmitter) TransmitReport(ctx context.Context, report *OffchainResponse) error {
select {
case t.reportCh <- report:
return nil
case <-ctx.Done():
return errors.New("context cancelled")
}
}

func (t *offchainTransmitter) ReportChannel() chan *OffchainResponse {
return t.reportCh
}
31 changes: 31 additions & 0 deletions core/services/functions/offchain_transmitter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package functions_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/services/functions"
)

func TestOffchainTransmitter(t *testing.T) {
t.Parallel()

transmitter := functions.NewOffchainTransmitter(1)
ch := transmitter.ReportChannel()
report := &functions.OffchainResponse{RequestId: []byte("testID")}
ctx := testutils.Context(t)

require.NoError(t, transmitter.TransmitReport(ctx, report))
require.Equal(t, report, <-ch)

require.NoError(t, transmitter.TransmitReport(ctx, report))

ctxTimeout, cancel := context.WithTimeout(ctx, time.Millisecond*20)
defer cancel()
// should not freeze
require.Error(t, transmitter.TransmitReport(ctxTimeout, report))
}
15 changes: 15 additions & 0 deletions core/services/functions/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ const (

type RequestFlags [32]byte

type OffchainRequest struct {
RequestId []byte `json:"requestId"`
RequestInitiator []byte `json:"requestInitiator"`
SubscriptionId uint64 `json:"subscriptionId"`
SubscriptionOwner []byte `json:"subscriptionOwner"`
Data RequestData `json:"data"`
}

type RequestData struct {
Source string `json:"source" cbor:"source"`
Language int `json:"language" cbor:"language"`
Expand All @@ -19,6 +27,13 @@ type RequestData struct {
BytesArgs [][]byte `json:"bytesArgs,omitempty" cbor:"bytesArgs"`
}

// NOTE: to be extended with raw report and signatures when needed
type OffchainResponse struct {
RequestId []byte `json:"requestId"`
Result []byte `json:"result"`
Error []byte `json:"error"`
}

type DONHostedSecrets struct {
SlotID uint `json:"slotId" cbor:"slotId"`
Version uint64 `json:"version" cbor:"version"`
Expand Down
17 changes: 10 additions & 7 deletions core/services/ocr2/plugins/functions/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ type FunctionsServicesConfig struct {
}

const (
FunctionsBridgeName string = "ea_bridge"
FunctionsS4Namespace string = "functions"
MaxAdapterResponseBytes int64 = 1_000_000
FunctionsBridgeName string = "ea_bridge"
FunctionsS4Namespace string = "functions"
MaxAdapterResponseBytes int64 = 1_000_000
DefaultOffchainTransmitterChannelSize uint32 = 1000
)

// Create all OCR2 plugin Oracles and all extra services needed to run a Functions job.
Expand Down Expand Up @@ -100,6 +101,7 @@ func NewFunctionsServices(functionsOracleArgs, thresholdOracleArgs, s4OracleArgs
s4Storage = s4.NewStorage(conf.Logger, *pluginConfig.S4Constraints, s4ORM, utils.NewRealClock())
}

offchainTransmitter := functions.NewOffchainTransmitter(DefaultOffchainTransmitterChannelSize)
listenerLogger := conf.Logger.Named("FunctionsListener")
bridgeAccessor := functions.NewBridgeAccessor(conf.BridgeORM, FunctionsBridgeName, MaxAdapterResponseBytes)
functionsListener := functions.NewFunctionsListener(
Expand All @@ -118,10 +120,11 @@ func NewFunctionsServices(functionsOracleArgs, thresholdOracleArgs, s4OracleArgs
allServices = append(allServices, functionsListener)

functionsOracleArgs.ReportingPluginFactory = FunctionsReportingPluginFactory{
Logger: functionsOracleArgs.Logger,
PluginORM: pluginORM,
JobID: conf.Job.ExternalJobID,
ContractVersion: pluginConfig.ContractVersion,
Logger: functionsOracleArgs.Logger,
PluginORM: pluginORM,
JobID: conf.Job.ExternalJobID,
ContractVersion: pluginConfig.ContractVersion,
OffchainTransmitter: offchainTransmitter,
}
functionsReportingPluginOracle, err := libocr2.NewOracle(*functionsOracleArgs)
if err != nil {
Expand Down
48 changes: 30 additions & 18 deletions core/services/ocr2/plugins/functions/reporting.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package functions

import (
"bytes"
"context"
"fmt"

Expand All @@ -21,22 +22,24 @@ import (
)

type FunctionsReportingPluginFactory struct {
Logger commontypes.Logger
PluginORM functions.ORM
JobID uuid.UUID
ContractVersion uint32
Logger commontypes.Logger
PluginORM functions.ORM
JobID uuid.UUID
ContractVersion uint32
OffchainTransmitter functions.OffchainTransmitter
}

var _ types.ReportingPluginFactory = (*FunctionsReportingPluginFactory)(nil)

type functionsReporting struct {
logger commontypes.Logger
pluginORM functions.ORM
jobID uuid.UUID
reportCodec encoding.ReportCodec
genericConfig *types.ReportingPluginConfig
specificConfig *config.ReportingPluginConfigWrapper
contractVersion uint32
logger commontypes.Logger
pluginORM functions.ORM
jobID uuid.UUID
reportCodec encoding.ReportCodec
genericConfig *types.ReportingPluginConfig
specificConfig *config.ReportingPluginConfigWrapper
contractVersion uint32
offchainTransmitter functions.OffchainTransmitter
}

var _ types.ReportingPlugin = &functionsReporting{}
Expand Down Expand Up @@ -112,13 +115,14 @@ func (f FunctionsReportingPluginFactory) NewReportingPlugin(rpConfig types.Repor
},
}
plugin := functionsReporting{
logger: f.Logger,
pluginORM: f.PluginORM,
jobID: f.JobID,
reportCodec: codec,
genericConfig: &rpConfig,
specificConfig: pluginConfig,
contractVersion: f.ContractVersion,
logger: f.Logger,
pluginORM: f.PluginORM,
jobID: f.JobID,
reportCodec: codec,
genericConfig: &rpConfig,
specificConfig: pluginConfig,
contractVersion: f.ContractVersion,
offchainTransmitter: f.OffchainTransmitter,
}
promReportingPlugins.WithLabelValues(f.JobID.String()).Inc()
return &plugin, info, nil
Expand Down Expand Up @@ -437,6 +441,14 @@ func (r *functionsReporting) ShouldAcceptFinalizedReport(ctx context.Context, ts
r.logger.Debug("FunctionsReporting ShouldAcceptFinalizedReport: state couldn't be changed to FINALIZED. Not transmitting.", commontypes.LogFields{"requestID": reqIdStr, "err": err})
continue
}
if bytes.Equal(item.OnchainMetadata, []byte(functions.OffchainRequestMarker)) {
r.logger.Debug("FunctionsReporting ShouldAcceptFinalizedReport: transmitting offchain", commontypes.LogFields{"requestID": reqIdStr})
result := functions.OffchainResponse{RequestId: item.RequestID, Result: item.Result, Error: item.Error}
if err := r.offchainTransmitter.TransmitReport(ctx, &result); err != nil {
r.logger.Error("FunctionsReporting ShouldAcceptFinalizedReport: unable to transmit offchain", commontypes.LogFields{"requestID": reqIdStr, "err": err})
}
continue // doesn't need onchain transmission
}
needTransmissionIds = append(needTransmissionIds, reqIdStr)
}
r.logger.Debug("FunctionsReporting ShouldAcceptFinalizedReport end", commontypes.LogFields{
Expand Down
Loading

0 comments on commit 3a38e90

Please sign in to comment.