Skip to content

Commit

Permalink
CCIP-4447 Promwrapper for OCR3 plugins and factories (#15521)
Browse files Browse the repository at this point in the history
* PoC

* Basic implementation

* Basic implementation

* Basic implementation

* fixes

* fixes

* fixes

* fixes

* fixes

* fixes

* fixes

* fixes

* fixes

* fixes

* fixes
  • Loading branch information
mateusz-sekara authored Dec 6, 2024
1 parent 699e172 commit f6f2457
Show file tree
Hide file tree
Showing 7 changed files with 441 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .changeset/fuzzy-hairs-appear.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Prometheus observability layer added to OCR3 Reporting Plugins #internal
9 changes: 9 additions & 0 deletions core/capabilities/ccip/oraclecreator/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
cctypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/config/toml"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr3/promwrapper"

"github.com/smartcontractkit/libocr/commontypes"
libocr3 "github.com/smartcontractkit/libocr/offchainreporting2plus"
Expand Down Expand Up @@ -229,6 +230,12 @@ func (i *pluginOracleCreator) createFactoryAndTransmitter(
) (ocr3types.ReportingPluginFactory[[]byte], ocr3types.ContractTransmitter[[]byte], error) {
var factory ocr3types.ReportingPluginFactory[[]byte]
var transmitter ocr3types.ContractTransmitter[[]byte]

chainID, err := chainsel.GetChainIDFromSelector(uint64(config.Config.ChainSelector))
if err != nil {
return nil, nil, fmt.Errorf("unsupported chain selector %d %w", config.Config.ChainSelector, err)
}

if config.Config.PluginType == uint8(cctypes.PluginTypeCCIPCommit) {
if !i.peerWrapper.IsStarted() {
return nil, nil, fmt.Errorf("peer wrapper is not started")
Expand Down Expand Up @@ -263,6 +270,7 @@ func (i *pluginOracleCreator) createFactoryAndTransmitter(
rmnPeerClient,
rmnCrypto,
)
factory = promwrapper.NewReportingPluginFactory[[]byte](factory, chainID, "CCIPCommit")
transmitter = ocrimpls.NewCommitContractTransmitter[[]byte](destChainWriter,
ocrtypes.Account(destFromAccounts[0]),
hexutil.Encode(config.Config.OfframpAddress), // TODO: this works for evm only, how about non-evm?
Expand All @@ -283,6 +291,7 @@ func (i *pluginOracleCreator) createFactoryAndTransmitter(
contractReaders,
chainWriters,
)
factory = promwrapper.NewReportingPluginFactory[[]byte](factory, chainID, "CCIPExec")
transmitter = ocrimpls.NewExecContractTransmitter[[]byte](destChainWriter,
ocrtypes.Account(destFromAccounts[0]),
hexutil.Encode(config.Config.OfframpAddress), // TODO: this works for evm only, how about non-evm?
Expand Down
42 changes: 42 additions & 0 deletions core/services/ocr3/promwrapper/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package promwrapper

import (
"context"

"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
)

var _ ocr3types.ReportingPluginFactory[any] = &ReportingPluginFactory[any]{}

type ReportingPluginFactory[RI any] struct {
origin ocr3types.ReportingPluginFactory[RI]
chainID string
plugin string
}

func NewReportingPluginFactory[RI any](
origin ocr3types.ReportingPluginFactory[RI],
chainID string,
plugin string,
) *ReportingPluginFactory[RI] {
return &ReportingPluginFactory[RI]{
origin: origin,
chainID: chainID,
plugin: plugin,
}
}

func (r ReportingPluginFactory[RI]) NewReportingPlugin(ctx context.Context, config ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[RI], ocr3types.ReportingPluginInfo, error) {
plugin, info, err := r.origin.NewReportingPlugin(ctx, config)
if err != nil {
return nil, ocr3types.ReportingPluginInfo{}, err
}
wrapped := newReportingPlugin(
plugin,
r.chainID,
r.plugin,
promOCR3ReportsGenerated,
promOCR3Durations,
)
return wrapped, info, err
}
41 changes: 41 additions & 0 deletions core/services/ocr3/promwrapper/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package promwrapper

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"

"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
)

func Test_WrapperFactory(t *testing.T) {
validFactory := NewReportingPluginFactory(fakeFactory[uint]{}, "solana", "plugin")
failingFactory := NewReportingPluginFactory(fakeFactory[uint]{err: errors.New("error")}, "123", "plugin")

plugin, _, err := validFactory.NewReportingPlugin(tests.Context(t), ocr3types.ReportingPluginConfig{})
require.NoError(t, err)

_, err = plugin.Outcome(tests.Context(t), ocr3types.OutcomeContext{}, nil, nil)
require.NoError(t, err)

require.Equal(t, 1, counterFromHistogramByLabels(t, promOCR3Durations, "solana", "plugin", "outcome", "true"))
require.Equal(t, 0, counterFromHistogramByLabels(t, promOCR3Durations, "solana", "plugin", "outcome", "false"))

_, _, err = failingFactory.NewReportingPlugin(tests.Context(t), ocr3types.ReportingPluginConfig{})
require.Error(t, err)
}

type fakeFactory[RI any] struct {
err error
}

func (f fakeFactory[RI]) NewReportingPlugin(context.Context, ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[RI], ocr3types.ReportingPluginInfo, error) {
if f.err != nil {
return nil, ocr3types.ReportingPluginInfo{}, f.err
}
return fakePlugin[RI]{}, ocr3types.ReportingPluginInfo{}, nil
}
122 changes: 122 additions & 0 deletions core/services/ocr3/promwrapper/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package promwrapper

import (
"context"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
)

var _ ocr3types.ReportingPlugin[any] = &reportingPlugin[any]{}

type reportingPlugin[RI any] struct {
ocr3types.ReportingPlugin[RI]
chainID string
plugin string

// Prometheus components for tracking metrics
reportsGenerated *prometheus.CounterVec
durations *prometheus.HistogramVec
}

func newReportingPlugin[RI any](
origin ocr3types.ReportingPlugin[RI],
chainID string,
plugin string,
reportsGenerated *prometheus.CounterVec,
durations *prometheus.HistogramVec,
) *reportingPlugin[RI] {
return &reportingPlugin[RI]{
ReportingPlugin: origin,
chainID: chainID,
plugin: plugin,
reportsGenerated: reportsGenerated,
durations: durations,
}
}

func (p *reportingPlugin[RI]) Query(ctx context.Context, outctx ocr3types.OutcomeContext) (ocrtypes.Query, error) {
return withObservedExecution(p, query, func() (ocrtypes.Query, error) {
return p.ReportingPlugin.Query(ctx, outctx)
})
}

func (p *reportingPlugin[RI]) Observation(ctx context.Context, outctx ocr3types.OutcomeContext, query ocrtypes.Query) (ocrtypes.Observation, error) {
return withObservedExecution(p, observation, func() (ocrtypes.Observation, error) {
return p.ReportingPlugin.Observation(ctx, outctx, query)
})
}

func (p *reportingPlugin[RI]) ValidateObservation(ctx context.Context, outctx ocr3types.OutcomeContext, query ocrtypes.Query, ao ocrtypes.AttributedObservation) error {
_, err := withObservedExecution(p, validateObservation, func() (any, error) {
err := p.ReportingPlugin.ValidateObservation(ctx, outctx, query, ao)
return nil, err
})
return err
}

func (p *reportingPlugin[RI]) Outcome(ctx context.Context, outctx ocr3types.OutcomeContext, query ocrtypes.Query, aos []ocrtypes.AttributedObservation) (ocr3types.Outcome, error) {
return withObservedExecution(p, outcome, func() (ocr3types.Outcome, error) {
return p.ReportingPlugin.Outcome(ctx, outctx, query, aos)
})
}

func (p *reportingPlugin[RI]) Reports(ctx context.Context, seqNr uint64, outcome ocr3types.Outcome) ([]ocr3types.ReportPlus[RI], error) {
result, err := withObservedExecution(p, reports, func() ([]ocr3types.ReportPlus[RI], error) {
return p.ReportingPlugin.Reports(ctx, seqNr, outcome)
})
p.trackReports(reports, len(result))
return result, err
}

func (p *reportingPlugin[RI]) ShouldAcceptAttestedReport(ctx context.Context, seqNr uint64, reportWithInfo ocr3types.ReportWithInfo[RI]) (bool, error) {
result, err := withObservedExecution(p, shouldAccept, func() (bool, error) {
return p.ReportingPlugin.ShouldAcceptAttestedReport(ctx, seqNr, reportWithInfo)
})
p.trackReports(shouldAccept, boolToInt(result))
return result, err
}

func (p *reportingPlugin[RI]) ShouldTransmitAcceptedReport(ctx context.Context, seqNr uint64, reportWithInfo ocr3types.ReportWithInfo[RI]) (bool, error) {
result, err := withObservedExecution(p, shouldTransmit, func() (bool, error) {
return p.ReportingPlugin.ShouldTransmitAcceptedReport(ctx, seqNr, reportWithInfo)
})
p.trackReports(shouldTransmit, boolToInt(result))
return result, err
}

func (p *reportingPlugin[RI]) trackReports(
function functionType,
count int,
) {
p.reportsGenerated.
WithLabelValues(p.chainID, p.plugin, string(function)).
Add(float64(count))
}

func boolToInt(arg bool) int {
if arg {
return 1
}
return 0
}

func withObservedExecution[RI, R any](
p *reportingPlugin[RI],
function functionType,
exec func() (R, error),
) (R, error) {
start := time.Now()
result, err := exec()

success := err == nil

p.durations.
WithLabelValues(p.chainID, p.plugin, string(function), strconv.FormatBool(success)).
Observe(float64(time.Since(start)))

return result, err
}
Loading

0 comments on commit f6f2457

Please sign in to comment.