Skip to content

Commit

Permalink
[Functions] Offchain heartbeat support in Listener and OCR2 plugin
Browse files Browse the repository at this point in the history
1. Add new API structs
2. Mark offchain requests in Listener
3. Introduce a simple OffchainTransmitter to pass responses from the plugin
  • Loading branch information
bolekk committed Nov 14, 2023
1 parent e7e0d42 commit 0dd74ed
Show file tree
Hide file tree
Showing 9 changed files with 308 additions and 43 deletions.
42 changes: 42 additions & 0 deletions core/services/functions/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ const (
DefaultPruneCheckFrequencySec uint32 = 60 * 10
DefaultPruneBatchSize uint32 = 500

// Used in place of OnchainMetadata for all offchain requests.
OffchainRequestMarker string = "OFFCHAIN_REQUEST"

FlagCBORMaxSize uint32 = 1
FlagSecretsMaxSize uint32 = 2
)
Expand Down Expand Up @@ -280,6 +283,45 @@ func (l *FunctionsListener) getMaxSecretsSize(flags RequestFlags) uint32 {
return l.pluginConfig.MaxSecretsSizesList[idx]
}

func (l *FunctionsListener) HandleOffchainRequest(ctx context.Context, request *OffchainRequest) error {
if request == nil {
return errors.New("HandleOffchainRequest: received nil request")
}
if len(request.RequestId) != RequestIDLength {
return fmt.Errorf("HandleOffchainRequest: invalid request ID length %d", len(request.RequestId))
}
if len(request.SubscriptionOwner) != common.AddressLength || len(request.RequestInitiator) != common.AddressLength {
return fmt.Errorf("HandleOffchainRequest: SubscriptionOwner and RequestInitiator must be set to valid addresses")
}

var requestId RequestID
copy(requestId[:], request.RequestId[:32])
subscriptionOwner := common.BytesToAddress(request.SubscriptionOwner)
senderAddr := common.BytesToAddress(request.RequestInitiator)
emptyTxHash := common.Hash{}
zeroCallbackGasLimit := uint32(0)
newReq := &Request{
RequestID: requestId,
RequestTxHash: &emptyTxHash,
ReceivedAt: time.Now(),
Flags: []byte{},
CallbackGasLimit: &zeroCallbackGasLimit,
// use sender address in place of coordinator contract to keep batches uniform
CoordinatorContractAddress: &senderAddr,
OnchainMetadata: []byte(OffchainRequestMarker),
}
if err := l.pluginORM.CreateRequest(newReq, pg.WithParentCtx(ctx)); err != nil {
if errors.Is(err, ErrDuplicateRequestID) {
l.logger.Warnw("HandleOffchainRequest: received duplicate request ID", "requestID", formatRequestId(requestId), "err", err)
} else {
l.logger.Errorw("HandleOffchainRequest: failed to create a DB entry for new request", "requestID", formatRequestId(requestId), "err", err)
}
return err
}
l.handleRequest(ctx, requestId, request.SubscriptionId, subscriptionOwner, RequestFlags{}, &request.Data)
return nil
}

func (l *FunctionsListener) handleOracleRequestV1(request *evmrelayTypes.OracleRequest) {
defer l.shutdownWaitGroup.Done()
l.logger.Infow("handleOracleRequestV1: oracle request v1 received", "requestID", formatRequestId(request.RequestId))
Expand Down
40 changes: 40 additions & 0 deletions core/services/functions/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,46 @@ func TestFunctionsListener_HandleOracleRequestV1_Success(t *testing.T) {
uni.service.Close()
}

func TestFunctionsListener_HandleOffchainRequest_Success(t *testing.T) {
testutils.SkipShortDB(t)
t.Parallel()

uni := NewFunctionsListenerUniverse(t, 0, 1_000_000)

uni.pluginORM.On("CreateRequest", mock.Anything, mock.Anything).Return(nil)
uni.bridgeAccessor.On("NewExternalAdapterClient").Return(uni.eaClient, nil)
uni.eaClient.On("RunComputation", mock.Anything, RequestIDStr, mock.Anything, SubscriptionOwner.Hex(), SubscriptionID, mock.Anything, mock.Anything, mock.Anything).Return(ResultBytes, nil, nil, nil)
uni.pluginORM.On("SetResult", RequestID, ResultBytes, mock.Anything, mock.Anything).Return(nil)

request := &functions_service.OffchainRequest{
RequestId: RequestID[:],
RequestInitiator: SubscriptionOwner.Bytes(),
SubscriptionId: uint64(SubscriptionID),
SubscriptionOwner: SubscriptionOwner.Bytes(),
Data: functions_service.RequestData{},
}
require.NoError(t, uni.service.HandleOffchainRequest(testutils.Context(t), request))
}

func TestFunctionsListener_HandleOffchainRequest_Invalid(t *testing.T) {
testutils.SkipShortDB(t)
t.Parallel()
uni := NewFunctionsListenerUniverse(t, 0, 1_000_000)

request := &functions_service.OffchainRequest{
RequestId: RequestID[:],
RequestInitiator: []byte("invalid_address"),
SubscriptionId: uint64(SubscriptionID),
SubscriptionOwner: SubscriptionOwner.Bytes(),
Data: functions_service.RequestData{},
}
require.Error(t, uni.service.HandleOffchainRequest(testutils.Context(t), request))

request.RequestInitiator = SubscriptionOwner.Bytes()
request.SubscriptionOwner = []byte("invalid_address")
require.Error(t, uni.service.HandleOffchainRequest(testutils.Context(t), request))
}

func TestFunctionsListener_HandleOracleRequestV1_ComputationError(t *testing.T) {
testutils.SkipShortDB(t)
t.Parallel()
Expand Down
59 changes: 59 additions & 0 deletions core/services/functions/mocks/offchain_transmitter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions core/services/functions/offchain_transmitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package functions

import (
"context"

"github.com/pkg/errors"
)

// Simple wrapper around a channel to transmit offchain reports between
// OCR plugin and Gateway connector
//
//go:generate mockery --quiet --name OffchainTransmitter --output ./mocks/ --case=underscore
type OffchainTransmitter interface {
TransmitReport(ctx context.Context, report *OffchainResponse) error
ReportChannel() chan *OffchainResponse
}

type offchainTransmitter struct {
reportCh chan *OffchainResponse
}

func NewOffchainTransmitter(chanSize uint32) OffchainTransmitter {
return &offchainTransmitter{
reportCh: make(chan *OffchainResponse, chanSize),
}
}

func (t *offchainTransmitter) TransmitReport(ctx context.Context, report *OffchainResponse) error {
select {
case t.reportCh <- report:
return nil
case <-ctx.Done():
return errors.New("context cancelled")
}
}

func (t *offchainTransmitter) ReportChannel() chan *OffchainResponse {
return t.reportCh
}
31 changes: 31 additions & 0 deletions core/services/functions/offchain_transmitter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package functions_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/services/functions"
)

func TestOffchainTransmitter(t *testing.T) {
t.Parallel()

transmitter := functions.NewOffchainTransmitter(1)
ch := transmitter.ReportChannel()
report := &functions.OffchainResponse{RequestId: []byte("testID")}
ctx := testutils.Context(t)

require.NoError(t, transmitter.TransmitReport(ctx, report))
require.Equal(t, report, <-ch)

require.NoError(t, transmitter.TransmitReport(ctx, report))

ctxTimeout, cancel := context.WithTimeout(ctx, time.Millisecond*20)
defer cancel()
// should not freeze
require.Error(t, transmitter.TransmitReport(ctxTimeout, report))
}
15 changes: 15 additions & 0 deletions core/services/functions/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ const (

type RequestFlags [32]byte

type OffchainRequest struct {
RequestId []byte `json:"requestId"`
RequestInitiator []byte `json:"requestInitiator"`
SubscriptionId uint64 `json:"subscriptionId"`
SubscriptionOwner []byte `json:"subscriptionOwner"`
Data RequestData `json:"data"`
}

type RequestData struct {
Source string `json:"source" cbor:"source"`
Language int `json:"language" cbor:"language"`
Expand All @@ -19,6 +27,13 @@ type RequestData struct {
BytesArgs [][]byte `json:"bytesArgs,omitempty" cbor:"bytesArgs"`
}

// NOTE: to be extended with raw report and signatures when needed
type OffchainResponse struct {
RequestId []byte `json:"requestId"`
Result []byte `json:"result"`
Error []byte `json:"error"`
}

type DONHostedSecrets struct {
SlotID uint `json:"slotId" cbor:"slotId"`
Version uint64 `json:"version" cbor:"version"`
Expand Down
17 changes: 10 additions & 7 deletions core/services/ocr2/plugins/functions/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ type FunctionsServicesConfig struct {
}

const (
FunctionsBridgeName string = "ea_bridge"
FunctionsS4Namespace string = "functions"
MaxAdapterResponseBytes int64 = 1_000_000
FunctionsBridgeName string = "ea_bridge"
FunctionsS4Namespace string = "functions"
MaxAdapterResponseBytes int64 = 1_000_000
DefaultOffchainTransmitterChannelSize uint32 = 1000
)

// Create all OCR2 plugin Oracles and all extra services needed to run a Functions job.
Expand Down Expand Up @@ -100,6 +101,7 @@ func NewFunctionsServices(functionsOracleArgs, thresholdOracleArgs, s4OracleArgs
s4Storage = s4.NewStorage(conf.Logger, *pluginConfig.S4Constraints, s4ORM, utils.NewRealClock())
}

offchainTransmitter := functions.NewOffchainTransmitter(DefaultOffchainTransmitterChannelSize)
listenerLogger := conf.Logger.Named("FunctionsListener")
bridgeAccessor := functions.NewBridgeAccessor(conf.BridgeORM, FunctionsBridgeName, MaxAdapterResponseBytes)
functionsListener := functions.NewFunctionsListener(
Expand All @@ -118,10 +120,11 @@ func NewFunctionsServices(functionsOracleArgs, thresholdOracleArgs, s4OracleArgs
allServices = append(allServices, functionsListener)

functionsOracleArgs.ReportingPluginFactory = FunctionsReportingPluginFactory{
Logger: functionsOracleArgs.Logger,
PluginORM: pluginORM,
JobID: conf.Job.ExternalJobID,
ContractVersion: pluginConfig.ContractVersion,
Logger: functionsOracleArgs.Logger,
PluginORM: pluginORM,
JobID: conf.Job.ExternalJobID,
ContractVersion: pluginConfig.ContractVersion,
OffchainTransmitter: offchainTransmitter,
}
functionsReportingPluginOracle, err := libocr2.NewOracle(*functionsOracleArgs)
if err != nil {
Expand Down
48 changes: 30 additions & 18 deletions core/services/ocr2/plugins/functions/reporting.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package functions

import (
"bytes"
"context"
"fmt"

Expand All @@ -21,22 +22,24 @@ import (
)

type FunctionsReportingPluginFactory struct {
Logger commontypes.Logger
PluginORM functions.ORM
JobID uuid.UUID
ContractVersion uint32
Logger commontypes.Logger
PluginORM functions.ORM
JobID uuid.UUID
ContractVersion uint32
OffchainTransmitter functions.OffchainTransmitter
}

var _ types.ReportingPluginFactory = (*FunctionsReportingPluginFactory)(nil)

type functionsReporting struct {
logger commontypes.Logger
pluginORM functions.ORM
jobID uuid.UUID
reportCodec encoding.ReportCodec
genericConfig *types.ReportingPluginConfig
specificConfig *config.ReportingPluginConfigWrapper
contractVersion uint32
logger commontypes.Logger
pluginORM functions.ORM
jobID uuid.UUID
reportCodec encoding.ReportCodec
genericConfig *types.ReportingPluginConfig
specificConfig *config.ReportingPluginConfigWrapper
contractVersion uint32
offchainTransmitter functions.OffchainTransmitter
}

var _ types.ReportingPlugin = &functionsReporting{}
Expand Down Expand Up @@ -112,13 +115,14 @@ func (f FunctionsReportingPluginFactory) NewReportingPlugin(rpConfig types.Repor
},
}
plugin := functionsReporting{
logger: f.Logger,
pluginORM: f.PluginORM,
jobID: f.JobID,
reportCodec: codec,
genericConfig: &rpConfig,
specificConfig: pluginConfig,
contractVersion: f.ContractVersion,
logger: f.Logger,
pluginORM: f.PluginORM,
jobID: f.JobID,
reportCodec: codec,
genericConfig: &rpConfig,
specificConfig: pluginConfig,
contractVersion: f.ContractVersion,
offchainTransmitter: f.OffchainTransmitter,
}
promReportingPlugins.WithLabelValues(f.JobID.String()).Inc()
return &plugin, info, nil
Expand Down Expand Up @@ -437,6 +441,14 @@ func (r *functionsReporting) ShouldAcceptFinalizedReport(ctx context.Context, ts
r.logger.Debug("FunctionsReporting ShouldAcceptFinalizedReport: state couldn't be changed to FINALIZED. Not transmitting.", commontypes.LogFields{"requestID": reqIdStr, "err": err})
continue
}
if bytes.Equal(item.OnchainMetadata, []byte(functions.OffchainRequestMarker)) {
r.logger.Debug("FunctionsReporting ShouldAcceptFinalizedReport: transmitting offchain", commontypes.LogFields{"requestID": reqIdStr})
result := functions.OffchainResponse{RequestId: item.RequestID, Result: item.Result, Error: item.Error}
if err := r.offchainTransmitter.TransmitReport(ctx, &result); err != nil {
r.logger.Error("FunctionsReporting ShouldAcceptFinalizedReport: unable to transmit offchain", commontypes.LogFields{"requestID": reqIdStr, "err": err})
}
continue // doesn't need onchain transmission
}
needTransmissionIds = append(needTransmissionIds, reqIdStr)
}
r.logger.Debug("FunctionsReporting ShouldAcceptFinalizedReport end", commontypes.LogFields{
Expand Down
Loading

0 comments on commit 0dd74ed

Please sign in to comment.