diff --git a/go.mod b/go.mod index c5670c9..43dc60e 100644 --- a/go.mod +++ b/go.mod @@ -4,12 +4,12 @@ go 1.21 require ( github.com/hashicorp/go-plugin v1.6.0 - github.com/pkg/errors v0.9.1 github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/chain-selectors v1.0.5 github.com/smartcontractkit/chainlink-common v0.1.7-0.20240327133125-eed636b9a6df - github.com/smartcontractkit/libocr v0.0.0-20240326191951-2bbe9382d052 + github.com/smartcontractkit/libocr v0.0.0-20240419185742-fd3cab206b2c github.com/stretchr/testify v1.9.0 + golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa google.golang.org/protobuf v1.32.0 ) @@ -41,6 +41,7 @@ require ( github.com/mwitkow/grpc-proxy v0.0.0-20230212185441-f345521cb9c9 // indirect github.com/oklog/run v1.0.0 // indirect github.com/pelletier/go-toml/v2 v2.1.1 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.17.0 // indirect github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect @@ -59,7 +60,6 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/crypto v0.18.0 // indirect - golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect golang.org/x/mod v0.14.0 // indirect golang.org/x/net v0.20.0 // indirect golang.org/x/sys v0.16.0 // indirect diff --git a/go.sum b/go.sum index 446b343..6500c42 100644 --- a/go.sum +++ b/go.sum @@ -215,8 +215,8 @@ github.com/smartcontractkit/go-plugin v0.0.0-20240208201424-b3b91517de16 h1:TFe+ github.com/smartcontractkit/go-plugin v0.0.0-20240208201424-b3b91517de16/go.mod h1:lBS5MtSSBZk0SHc66KACcjjlU6WzEVP/8pwz68aMkCI= github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f h1:hgJif132UCdjo8u43i7iPN1/MFnu49hv7lFGFftCHKU= github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f/go.mod h1:MvMXoufZAtqExNexqi4cjrNYE9MefKddKylxjS+//n0= -github.com/smartcontractkit/libocr v0.0.0-20240326191951-2bbe9382d052 h1:1WFjrrVrWoQ9UpVMh7Mx4jDpzhmo1h8hFUKd9awIhIU= -github.com/smartcontractkit/libocr v0.0.0-20240326191951-2bbe9382d052/go.mod h1:SJEZCHgMCAzzBvo9vMV2DQ9onfEcIJCYSViyP4JI6c4= +github.com/smartcontractkit/libocr v0.0.0-20240419185742-fd3cab206b2c h1:lIyMbTaF2H0Q71vkwZHX/Ew4KF2BxiKhqEXwF8rn+KI= +github.com/smartcontractkit/libocr v0.0.0-20240419185742-fd3cab206b2c/go.mod h1:fb1ZDVXACvu4frX3APHZaEBp0xi1DIm34DcA0CwTsZM= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= diff --git a/llo/plugin.go b/llo/plugin.go index 4d16cf3..c509ce1 100644 --- a/llo/plugin.go +++ b/llo/plugin.go @@ -11,6 +11,8 @@ import ( "sort" "time" + "golang.org/x/exp/maps" + "github.com/smartcontractkit/chainlink-common/pkg/logger" llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" @@ -22,34 +24,63 @@ import ( // TODO: Split out this file and write unit tests: https://smartcontract-it.atlassian.net/browse/MERC-3524 -// Notes: -// -// This is a sketch, there are many improvements to be made for this to be -// production-grade, secure code. -// -// We use JSON for serialization/deserialization. We rely on the fact that -// golang's json package serializes maps deterministically. Protobufs would -// likely be a more performant & efficient choice. - // Additional limits so we can more effectively bound the size of observations +// NOTE: These are hardcoded because these exact values are relied upon as a +// property of coming to consensus, it's too dangerous to make these +// configurable on a per-node basis. It may be possible to add them to the +// OffchainConfig if they need to be changed dynamically and in a +// backwards-compatible way. const ( - MaxObservationRemoveChannelIDsLength = 5 - MaxObservationAddChannelDefinitionsLength = 5 - MaxObservationStreamValuesLength = 1_000 + // Maximum amount of channels that can be added per round (if more than + // this needs to be added, it will be added in batches until everything is + // up-to-date) + MaxObservationRemoveChannelIDsLength = 5 + // Maximum amount of channels that can be removed per round (if more than + // this needs to be removed, it will be removed in batches until everything + // is up-to-date) + MaxObservationUpdateChannelDefinitionsLength = 5 + // Maximum number of streams that can be observed per round + // TODO: This needs to be implemented on the Observation side so we don't + // even generate an observation that fails this + MaxObservationStreamValuesLength = 10_000 + // MaxOutcomeChannelDefinitionsLength is the maximum number of channels that + // can be supported + // TODO: This needs to be implemented on the Observation side so we don't + // even generate an observation that fails this + MaxOutcomeChannelDefinitionsLength = 10_000 ) -const MaxOutcomeChannelDefinitionsLength = 500 - -// Values for a set of streams, e.g. "eth-usd", "link-usd", and "eur-chf" +// Values for a set of streams, e.g. "eth-usd", "link-usd", "eur-chf" etc +// StreamIDs are uint32 // TODO: generalize from *big.Int to anything // https://smartcontract-it.atlassian.net/browse/MERC-3525 -// TODO: Consider renaming to StreamDataPoints? -type StreamValues map[llotypes.StreamID]ObsResult[*big.Int] +// TODO: Consider renaming to StreamDataPoints? Or StreamObservations? +type StreamValues map[llotypes.StreamID]*big.Int + +type DSOpts interface { + VerboseLogging() bool + SeqNr() uint64 +} + +type dsOpts struct { + verboseLogging bool + seqNr uint64 +} + +func (o dsOpts) VerboseLogging() bool { + return o.verboseLogging +} + +func (o dsOpts) SeqNr() uint64 { + return o.seqNr +} type DataSource interface { - // For each known streamID, Observe should return a non-nil entry in - // StreamValues. Observe should ignore unknown streamIDs. - Observe(ctx context.Context, streamIDs map[llotypes.StreamID]struct{}) (StreamValues, error) + // For each known streamID, Observe should set the observed value in the + // passed streamValues. + // If an observation fails, or the stream is unknown, no value should be + // set. + Observe(ctx context.Context, streamValues StreamValues, opts DSOpts) error } // Protocol instances start in either the staging or production stage. They @@ -88,6 +119,29 @@ type PredecessorRetirementReportCache interface { CheckAttestedRetirementReport(predecessorConfigDigest ocr2types.ConfigDigest, attestedRetirementReport []byte) (RetirementReport, error) } +type ChannelDefinitionCache interface { + Definitions() llotypes.ChannelDefinitions +} + +// TODO: Test this +func ChannelEquals(a, b llotypes.ChannelDefinition) bool { + if a.ChainSelector != b.ChainSelector { + return false + } + if a.ReportFormat != b.ReportFormat { + return false + } + if len(a.StreamIDs) != len(b.StreamIDs) { + return false + } + for i, streamID := range a.StreamIDs { + if streamID != b.StreamIDs[i] { + return false + } + } + return true +} + // MakeChannelHash is used for mapping ChannelDefinitionWithIDs func MakeChannelHash(cd ChannelDefinitionWithID) ChannelHash { h := sha256.New() @@ -164,16 +218,23 @@ func MakeChannelHash(cd ChannelDefinitionWithID) ChannelHash { // A ReportingPlugin instance will only ever serve a single protocol instance. var _ ocr3types.ReportingPluginFactory[llotypes.ReportInfo] = &PluginFactory{} -func NewPluginFactory(prrc PredecessorRetirementReportCache, src ShouldRetireCache, cdc llotypes.ChannelDefinitionCache, ds DataSource, lggr logger.Logger, codecs map[llotypes.ReportFormat]ReportCodec) *PluginFactory { +func NewPluginFactory(cfg Config, prrc PredecessorRetirementReportCache, src ShouldRetireCache, cdc ChannelDefinitionCache, ds DataSource, lggr logger.Logger, codecs map[llotypes.ReportFormat]ReportCodec) *PluginFactory { return &PluginFactory{ - prrc, src, cdc, ds, lggr, codecs, + cfg, prrc, src, cdc, ds, lggr, codecs, } } +type Config struct { + // Enables additional logging that might be expensive, e.g. logging entire + // channel definitions on every round or other very large structs + VerboseLogging bool +} + type PluginFactory struct { + Config Config PredecessorRetirementReportCache PredecessorRetirementReportCache ShouldRetireCache ShouldRetireCache - ChannelDefinitionCache llotypes.ChannelDefinitionCache + ChannelDefinitionCache ChannelDefinitionCache DataSource DataSource Logger logger.Logger Codecs map[llotypes.ReportFormat]ReportCodec @@ -186,6 +247,7 @@ func (f *PluginFactory) NewReportingPlugin(cfg ocr3types.ReportingPluginConfig) } return &Plugin{ + f.Config, offchainCfg.PredecessorConfigDigest, cfg.ConfigDigest, f.PredecessorRetirementReportCache, @@ -194,6 +256,8 @@ func (f *PluginFactory) NewReportingPlugin(cfg ocr3types.ReportingPluginConfig) f.DataSource, f.Logger, cfg.F, + protoObservationCodec{}, + protoOutcomeCodec{}, f.Codecs, }, ocr3types.ReportingPluginInfo{ Name: "LLO", @@ -216,14 +280,17 @@ type ReportCodec interface { } type Plugin struct { + Config Config PredecessorConfigDigest *types.ConfigDigest ConfigDigest types.ConfigDigest PredecessorRetirementReportCache PredecessorRetirementReportCache ShouldRetireCache ShouldRetireCache - ChannelDefinitionCache llotypes.ChannelDefinitionCache + ChannelDefinitionCache ChannelDefinitionCache DataSource DataSource Logger logger.Logger F int + ObservationCodec ObservationCodec + OutcomeCodec OutcomeCodec Codecs map[llotypes.ReportFormat]ReportCodec } @@ -253,8 +320,9 @@ type Observation struct { // Timestamp from when observation is made UnixTimestampNanoseconds int64 // Votes to remove/add channels. Subject to MAX_OBSERVATION_*_LENGTH limits - RemoveChannelIDs map[llotypes.ChannelID]struct{} - AddChannelDefinitions llotypes.ChannelDefinitions + RemoveChannelIDs map[llotypes.ChannelID]struct{} + // Votes to add or replace channel definitions + UpdateChannelDefinitions llotypes.ChannelDefinitions // Observed (numeric) stream values. Subject to // MaxObservationStreamValuesLength limit StreamValues StreamValues @@ -270,21 +338,25 @@ type Observation struct { // // Should return a serialized Observation struct. func (p *Plugin) Observation(ctx context.Context, outctx ocr3types.OutcomeContext, query types.Query) (types.Observation, error) { - // send empty observation in initial round - // NOTE: First sequence number is always 1 + // NOTE: First sequence number is always 1 (0 is invalid) if outctx.SeqNr < 1 { - // send empty observation in initial round return types.Observation{}, fmt.Errorf("got invalid seqnr=%d, must be >=1", outctx.SeqNr) } else if outctx.SeqNr == 1 { - return types.Observation{}, nil // FIXME: but it needs to be properly serialized + // First round always has empty PreviousOutcome + // Don't bother observing on the first ever round, because the result + // will never be used anyway. + // See case at the top of Outcome() + return types.Observation{}, nil } + // Second round will have no channel definitions yet, but may vote to add + // them // QUESTION: is there a way to have this captured in EAs so we get something // closer to the source? nowNanoseconds := time.Now().UnixNano() - var previousOutcome Outcome - if err := json.Unmarshal(outctx.PreviousOutcome, &previousOutcome); err != nil { + previousOutcome, err := p.OutcomeCodec.Decode(outctx.PreviousOutcome) + if err != nil { return nil, fmt.Errorf("error unmarshalling previous outcome: %w", err) } @@ -292,10 +364,10 @@ func (p *Plugin) Observation(ctx context.Context, outctx ocr3types.OutcomeContex // Only try to fetch this from the cache if this instance if configured // with a predecessor and we're still in the staging stage. if p.PredecessorConfigDigest != nil && previousOutcome.LifeCycleStage == LifeCycleStageStaging { - var err error - attestedRetirementReport, err = p.PredecessorRetirementReportCache.AttestedRetirementReport(*p.PredecessorConfigDigest) - if err != nil { - return nil, fmt.Errorf("error fetching attested retirement report from cache: %w", err) + var err2 error + attestedRetirementReport, err2 = p.PredecessorRetirementReportCache.AttestedRetirementReport(*p.PredecessorConfigDigest) + if err2 != nil { + return nil, fmt.Errorf("error fetching attested retirement report from cache: %w", err2) } } @@ -309,8 +381,14 @@ func (p *Plugin) Observation(ctx context.Context, outctx ocr3types.OutcomeContex removeChannelIDs := map[llotypes.ChannelID]struct{}{} // vote to add channel definitions that aren't present in the previous // outcome ChannelDefinitions - var addChannelDefinitions llotypes.ChannelDefinitions + // FIXME: Why care about ValidAfterSeconds here? + var updateChannelDefinitions llotypes.ChannelDefinitions { + // NOTE: Be careful using maps, since key ordering is randomized! All + // addition/removal lists must be built deterministically so that nodes + // can agree on the same set of changes. + // + // ChannelIDs should always be sorted the same way (channel ID ascending). expectedChannelDefs := p.ChannelDefinitionCache.Definitions() removeChannelDefinitions := subtractChannelDefinitions(previousOutcome.ChannelDefinitions, expectedChannelDefs, MaxObservationRemoveChannelIDsLength) @@ -318,7 +396,11 @@ func (p *Plugin) Observation(ctx context.Context, outctx ocr3types.OutcomeContex removeChannelIDs[channelID] = struct{}{} } - for channelID := range previousOutcome.ValidAfterSeconds { + // TODO: needs testing + validAfterSecondsChannelIDs := maps.Keys(previousOutcome.ValidAfterSeconds) + // Sort so we cut off deterministically + sortChannelIDs(validAfterSecondsChannelIDs) + for _, channelID := range validAfterSecondsChannelIDs { if len(removeChannelIDs) >= MaxObservationRemoveChannelIDsLength { break } @@ -327,24 +409,53 @@ func (p *Plugin) Observation(ctx context.Context, outctx ocr3types.OutcomeContex } } - addChannelDefinitions = subtractChannelDefinitions(expectedChannelDefs, previousOutcome.ChannelDefinitions, MaxObservationAddChannelDefinitionsLength) + // NOTE: This is slow because it deeply compares every value in the map. + // To improve performance, consider changing channel voting to happen + // every N rounds instead of every round. Or, alternatively perhaps the + // first e.g. 100 rounds could check every round to allow for fast feed + // spinup, then after that every 10 or 100 rounds. + updateChannelDefinitions = make(llotypes.ChannelDefinitions) + expectedChannelIDs := maps.Keys(expectedChannelDefs) + // Sort so we cut off deterministically + sortChannelIDs(expectedChannelIDs) + for _, channelID := range expectedChannelIDs { + prev, exists := previousOutcome.ChannelDefinitions[channelID] + channelDefinition := expectedChannelDefs[channelID] + if exists && ChannelEquals(prev, channelDefinition) { + continue + } + // Add or replace channel + updateChannelDefinitions[channelID] = channelDefinition + if len(updateChannelDefinitions) >= MaxObservationUpdateChannelDefinitionsLength { + // Never add more than MaxObservationUpdateChannelDefinitionsLength + break + } + } + + if len(updateChannelDefinitions) > 0 { + p.Logger.Debugw("Voting to update channel definitions", + "updateChannelDefinitions", updateChannelDefinitions, + "seqNr", outctx.SeqNr) + } + if len(removeChannelIDs) > 0 { + p.Logger.Debugw("Voting to remove channel definitions", + "removeChannelIDs", removeChannelIDs, + "seqNr", outctx.SeqNr) + } } var streamValues StreamValues if len(previousOutcome.ChannelDefinitions) == 0 { - p.Logger.Warn("ChannelDefinitions is empty, will not generate any observations") + p.Logger.Debugw("ChannelDefinitions is empty, will not generate any observations", "seqNr", outctx.SeqNr) } else { - streams := map[llotypes.StreamID]struct{}{} + streamValues = make(map[llotypes.StreamID]*big.Int) for _, channelDefinition := range previousOutcome.ChannelDefinitions { for _, streamID := range channelDefinition.StreamIDs { - streams[streamID] = struct{}{} + streamValues[streamID] = nil } } - var err error - // TODO: Should probably be a slice, not map? - streamValues, err = p.DataSource.Observe(ctx, streams) - if err != nil { + if err := p.DataSource.Observe(ctx, streamValues, dsOpts{p.Config.VerboseLogging, outctx.SeqNr}); err != nil { return nil, fmt.Errorf("DataSource.Observe error: %w", err) } } @@ -352,16 +463,16 @@ func (p *Plugin) Observation(ctx context.Context, outctx ocr3types.OutcomeContex var rawObservation []byte { var err error - rawObservation, err = json.Marshal(Observation{ + rawObservation, err = p.ObservationCodec.Encode(Observation{ attestedRetirementReport, shouldRetire, nowNanoseconds, removeChannelIDs, - addChannelDefinitions, + updateChannelDefinitions, streamValues, }) if err != nil { - return nil, fmt.Errorf("json.Marshal error: %w", err) + return nil, fmt.Errorf("Observation encode error: %w", err) } } @@ -377,27 +488,28 @@ func (p *Plugin) Observation(ctx context.Context, outctx ocr3types.OutcomeContex // outctx.previousOutcome contains the consensus outcome with sequence // number (outctx.SeqNr-1). func (p *Plugin) ValidateObservation(outctx ocr3types.OutcomeContext, query types.Query, ao types.AttributedObservation) error { - if outctx.SeqNr <= 1 { + if outctx.SeqNr < 1 { + return fmt.Errorf("Invalid SeqNr: %d", outctx.SeqNr) + } else if outctx.SeqNr == 1 { if len(ao.Observation) != 0 { - return fmt.Errorf("Observation is not empty") + return fmt.Errorf("Expected empty observation for first round, got: 0x%x", ao.Observation) } } - var observation Observation - // FIXME: do we really want to allow empty observations? happens because "" is not valid JSON - if len(ao.Observation) > 0 { - err := json.Unmarshal(ao.Observation, &observation) - if err != nil { - return fmt.Errorf("Observation is invalid json (got: %q): %w", ao.Observation, err) - } + observation, err := p.ObservationCodec.Decode(ao.Observation) + if err != nil { + // Critical error + // If the previous outcome cannot be decoded for whatever reason, the + // protocol will become permanently stuck at this point + return fmt.Errorf("Observation decode error (got: 0x%x): %w", ao.Observation, err) } if p.PredecessorConfigDigest == nil && len(observation.AttestedPredecessorRetirement) != 0 { return fmt.Errorf("AttestedPredecessorRetirement is not empty even though this instance has no predecessor") } - if len(observation.AddChannelDefinitions) > MaxObservationAddChannelDefinitionsLength { - return fmt.Errorf("AddChannelDefinitions is too long: %v vs %v", len(observation.AddChannelDefinitions), MaxObservationAddChannelDefinitionsLength) + if len(observation.UpdateChannelDefinitions) > MaxObservationUpdateChannelDefinitionsLength { + return fmt.Errorf("UpdateChannelDefinitions is too long: %v vs %v", len(observation.UpdateChannelDefinitions), MaxObservationUpdateChannelDefinitionsLength) } if len(observation.RemoveChannelIDs) > MaxObservationRemoveChannelIDsLength { @@ -408,12 +520,6 @@ func (p *Plugin) ValidateObservation(outctx ocr3types.OutcomeContext, query type return fmt.Errorf("StreamValues is too long: %v vs %v", len(observation.StreamValues), MaxObservationStreamValuesLength) } - for streamID, obsResult := range observation.StreamValues { - if obsResult.Valid && obsResult.Val == nil { - return fmt.Errorf("stream with id %q was marked valid but carries nil value", streamID) - } - } - return nil } @@ -446,28 +552,36 @@ func (out *Outcome) ObservationsTimestampSeconds() (uint32, error) { // Indicates whether a report can be generated for the given channel. // Returns nil if channel is reportable -func (out *Outcome) IsReportable(channelID llotypes.ChannelID) error { +// TODO: Test this function +func (out *Outcome) IsReportable(channelID llotypes.ChannelID) *ErrUnreportableChannel { if out.LifeCycleStage == LifeCycleStageRetired { - return fmt.Errorf("IsReportable=false; retired channel with ID: %d", channelID) + return &ErrUnreportableChannel{nil, "IsReportable=false; retired channel", channelID} } observationsTimestampSeconds, err := out.ObservationsTimestampSeconds() if err != nil { - return fmt.Errorf("IsReportable=false; invalid observations timestamp; %w", err) + return &ErrUnreportableChannel{err, "IsReportable=false; invalid observations timestamp", channelID} } channelDefinition, exists := out.ChannelDefinitions[channelID] if !exists { - return fmt.Errorf("IsReportable=false; no channel definition with ID: %d", channelID) + return &ErrUnreportableChannel{nil, "IsReportable=false; no channel definition with this ID", channelID} } if _, err := chainselectors.ChainIdFromSelector(channelDefinition.ChainSelector); err != nil { - return fmt.Errorf("IsReportable=false; invalid chain selector; %w", err) + return &ErrUnreportableChannel{err, "IsReportable=false; invalid chain selector", channelID} } for _, streamID := range channelDefinition.StreamIDs { if out.StreamMedians[streamID] == nil { - return errors.New("IsReportable=false; median was nil") + // FIXME: Is this comment actually correct? + // This can happen in normal operation, because in Report() we use + // the ChannelDefinitions in the generated Outcome. But that was + // compiled with Observations made using the ChannelDefinitions + // from the PREVIOUS outcome. So if channel definitions have been + // added in this round, we would not expect there to be + // observations present for new streams in those channels. + return &ErrUnreportableChannel{nil, fmt.Sprintf("IsReportable=false; median was nil for stream %d", streamID), channelID} } } @@ -475,33 +589,56 @@ func (out *Outcome) IsReportable(channelID llotypes.ChannelID) error { // No validAfterSeconds entry yet, this must be a new channel. // validAfterSeconds will be populated in Outcome() so the channel // becomes reportable in later protocol rounds. - return errors.New("IsReportable=false; no validAfterSeconds entry yet, this must be a new channel") + // TODO: Test this case, haven't seen it in prod logs even though it would be expected + return &ErrUnreportableChannel{nil, "IsReportable=false; no validAfterSeconds entry yet, this must be a new channel", channelID} } if validAfterSeconds := out.ValidAfterSeconds[channelID]; validAfterSeconds >= observationsTimestampSeconds { - return fmt.Errorf("IsReportable=false; not valid yet (observationsTimestampSeconds=%d < validAfterSeconds=%d)", observationsTimestampSeconds, validAfterSeconds) + return &ErrUnreportableChannel{nil, fmt.Sprintf("IsReportable=false; not valid yet (observationsTimestampSeconds=%d < validAfterSeconds=%d)", observationsTimestampSeconds, validAfterSeconds), channelID} } return nil } +type ErrUnreportableChannel struct { + Inner error + Reason string + ChannelID llotypes.ChannelID +} + +func (e *ErrUnreportableChannel) Error() string { + s := fmt.Sprintf("ChannelID: %d; Reason: %s", e.ChannelID, e.Reason) + if e.Inner != nil { + s += fmt.Sprintf("; Err: %v", e.Inner) + } + return s +} + +func (e *ErrUnreportableChannel) String() string { + return e.Error() +} + +func (e *ErrUnreportableChannel) Unwrap() error { + return e.Inner +} + // List of reportable channels (according to IsReportable), sorted according // to a canonical ordering -func (out *Outcome) ReportableChannels() []llotypes.ChannelID { - result := []llotypes.ChannelID{} - +// TODO: test this +func (out *Outcome) ReportableChannels() (reportable []llotypes.ChannelID, unreportable []*ErrUnreportableChannel) { for channelID := range out.ChannelDefinitions { if err := out.IsReportable(channelID); err != nil { - continue + unreportable = append(unreportable, err) + } else { + reportable = append(reportable, channelID) } - result = append(result, channelID) } - sort.Slice(result, func(i, j int) bool { - return result[i] < result[j] + sort.Slice(reportable, func(i, j int) bool { + return reportable[i] < reportable[j] }) - return result + return } // Generates an outcome for a seqNr, typically based on the previous @@ -522,6 +659,7 @@ func (p *Plugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, aos return nil, fmt.Errorf("invariant violation: expected at least 2f+1 attributed observations, got %d (f: %d)", len(aos), p.F) } + // Initial outcome is kind of a "keystone" with minimum extra information if outctx.SeqNr <= 1 { // Initial Outcome var lifeCycleStage llotypes.LifeCycleStage @@ -538,15 +676,15 @@ func (p *Plugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, aos nil, nil, } - return json.Marshal(outcome) + return p.OutcomeCodec.Encode(outcome) } ///////////////////////////////// // Decode previousOutcome ///////////////////////////////// - var previousOutcome Outcome - if err := json.Unmarshal(outctx.PreviousOutcome, &previousOutcome); err != nil { - return nil, fmt.Errorf("error unmarshalling previous outcome: %v", err) + previousOutcome, err := p.OutcomeCodec.Decode(outctx.PreviousOutcome) + if err != nil { + return nil, fmt.Errorf("error decoding previous outcome: %v", err) } ///////////////////////////////// @@ -563,24 +701,24 @@ func (p *Plugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, aos removeChannelVotesByID := map[llotypes.ChannelID]int{} // for each channelId count number of votes that mention it and count number of votes that include it. - addChannelVotesByHash := map[ChannelHash]int{} - addChannelDefinitionsByHash := map[ChannelHash]ChannelDefinitionWithID{} + updateChannelVotesByHash := map[ChannelHash]int{} + updateChannelDefinitionsByHash := map[ChannelHash]ChannelDefinitionWithID{} streamObservations := map[llotypes.StreamID][]*big.Int{} for _, ao := range aos { - observation := Observation{} - // TODO: Use protobufs - if err := json.Unmarshal(ao.Observation, &observation); err != nil { - p.Logger.Warnw("ignoring invalid observation", "oracleID", ao.Observer, "error", err) + // TODO: Put in a function + observation, err2 := p.ObservationCodec.Decode(ao.Observation) + if err2 != nil { + p.Logger.Warnw("ignoring invalid observation", "oracleID", ao.Observer, "error", err2) continue } if len(observation.AttestedPredecessorRetirement) != 0 && validPredecessorRetirementReport == nil { pcd := *p.PredecessorConfigDigest - retirementReport, err := p.PredecessorRetirementReportCache.CheckAttestedRetirementReport(pcd, observation.AttestedPredecessorRetirement) - if err != nil { - p.Logger.Warnw("ignoring observation with invalid attested predecessor retirement", "oracleID", ao.Observer, "error", err, "predecessorConfigDigest", pcd) + retirementReport, err3 := p.PredecessorRetirementReportCache.CheckAttestedRetirementReport(pcd, observation.AttestedPredecessorRetirement) + if err3 != nil { + p.Logger.Warnw("ignoring observation with invalid attested predecessor retirement", "oracleID", ao.Observer, "error", err3, "predecessorConfigDigest", pcd) continue } validPredecessorRetirementReport = &retirementReport @@ -596,20 +734,28 @@ func (p *Plugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, aos removeChannelVotesByID[channelID]++ } - for channelID, channelDefinition := range observation.AddChannelDefinitions { + for channelID, channelDefinition := range observation.UpdateChannelDefinitions { defWithID := ChannelDefinitionWithID{channelDefinition, channelID} channelHash := MakeChannelHash(defWithID) - addChannelVotesByHash[channelHash]++ - addChannelDefinitionsByHash[channelHash] = defWithID + updateChannelVotesByHash[channelHash]++ + updateChannelDefinitionsByHash[channelHash] = defWithID } + var missingObservations []llotypes.StreamID for id, obsResult := range observation.StreamValues { - if obsResult.Valid { - streamObservations[id] = append(streamObservations[id], obsResult.Val) + if obsResult != nil { + streamObservations[id] = append(streamObservations[id], obsResult) } else { - p.Logger.Debugw("Ignoring invalid observation", "streamID", id, "oracleID", ao.Observer) + missingObservations = append(missingObservations, id) } } + if p.Config.VerboseLogging { + if len(missingObservations) > 0 { + sort.Slice(missingObservations, func(i, j int) bool { return missingObservations[i] < missingObservations[j] }) + p.Logger.Debugw("Missing observations", "streamIDs", missingObservations, "oracleID", ao.Observer, "seqNr", outctx.SeqNr) + } + p.Logger.Debugw("Using observations", "sv", streamObservations, "oracleID", ao.Observer, "seqNr", outctx.SeqNr) + } } if len(timestampsNanoseconds) == 0 { @@ -651,7 +797,7 @@ func (p *Plugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, aos // if retired, stop updating channel definitions if outcome.LifeCycleStage == LifeCycleStageRetired { - removeChannelVotesByID, addChannelDefinitionsByHash = nil, nil + removeChannelVotesByID, updateChannelDefinitionsByHash = nil, nil } var removedChannelIDs []llotypes.ChannelID @@ -663,25 +809,44 @@ func (p *Plugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, aos delete(outcome.ChannelDefinitions, channelID) } - for channelHash, defWithID := range addChannelDefinitionsByHash { - voteCount := addChannelVotesByHash[channelHash] + type hashWithID struct { + ChannelHash + ChannelDefinitionWithID + } + orderedHashes := make([]hashWithID, 0, len(updateChannelDefinitionsByHash)) + for channelHash, dfnWithID := range updateChannelDefinitionsByHash { + orderedHashes = append(orderedHashes, hashWithID{channelHash, dfnWithID}) + } + // Use predictable order for adding channels (id asc) so that extras that + // exceed the max are consistent across all nodes + sort.Slice(orderedHashes, func(i, j int) bool { return orderedHashes[i].ChannelID < orderedHashes[j].ChannelID }) + for _, hwid := range orderedHashes { + voteCount := updateChannelVotesByHash[hwid.ChannelHash] if voteCount <= p.F { continue } - if conflictDef, exists := outcome.ChannelDefinitions[defWithID.ChannelID]; exists { - p.Logger.Warn("More than f nodes vote to add a channel, but a channel with the same id already exists", - "existingChannelDefinition", conflictDef, - "addChannelDefinition", defWithID, + defWithID := hwid.ChannelDefinitionWithID + if original, exists := outcome.ChannelDefinitions[defWithID.ChannelID]; exists { + p.Logger.Debugw("Adding channel (replacement)", + "channelID", defWithID.ChannelID, + "originalChannelDefinition", original, + "replaceChannelDefinition", defWithID, + "seqNr", outctx.SeqNr, ) - continue - } - if len(outcome.ChannelDefinitions) > MaxOutcomeChannelDefinitionsLength { - p.Logger.Warn("Cannot add channel, outcome already contains maximum number of channels", + outcome.ChannelDefinitions[defWithID.ChannelID] = defWithID.ChannelDefinition + } else if len(outcome.ChannelDefinitions) >= MaxOutcomeChannelDefinitionsLength { + p.Logger.Warnw("Adding channel FAILED. Cannot add channel, outcome already contains maximum number of channels", "maxOutcomeChannelDefinitionsLength", MaxOutcomeChannelDefinitionsLength, "addChannelDefinition", defWithID, + "seqNr", outctx.SeqNr, ) continue } + p.Logger.Debugw("Adding channel (new)", + "channelID", defWithID.ChannelID, + "addChannelDefinition", defWithID, + "seqNr", outctx.SeqNr, + ) outcome.ChannelDefinitions[defWithID.ChannelID] = defWithID.ChannelDefinition } @@ -693,20 +858,20 @@ func (p *Plugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, aos // populated ValidAfterSeconds during promotion to production. In this // case, nothing to do. if outcome.ValidAfterSeconds == nil { - previousObservationsTimestampSeconds, err := previousOutcome.ObservationsTimestampSeconds() - if err != nil { - return nil, fmt.Errorf("error getting previous outcome's observations timestamp: %v", err) + previousObservationsTimestampSeconds, err2 := previousOutcome.ObservationsTimestampSeconds() + if err2 != nil { + return nil, fmt.Errorf("error getting previous outcome's observations timestamp: %v", err2) } outcome.ValidAfterSeconds = map[llotypes.ChannelID]uint32{} for channelID, previousValidAfterSeconds := range previousOutcome.ValidAfterSeconds { - if err := previousOutcome.IsReportable(channelID); err != nil { - p.Logger.Warnw("Channel is not reportable", "channelID", channelID, "err", err) + if err3 := previousOutcome.IsReportable(channelID); err3 != nil { + if p.Config.VerboseLogging { + p.Logger.Debugw("Channel is not reportable", "channelID", channelID, "err", err3, "seqNr", outctx.SeqNr) + } // was reported based on previous outcome outcome.ValidAfterSeconds[channelID] = previousObservationsTimestampSeconds } else { - p.Logger.Debugw("Channel is reportable", "channelID", channelID) - // TODO: change log level based on what type of error we got // was skipped based on previous outcome outcome.ValidAfterSeconds[channelID] = previousValidAfterSeconds } @@ -747,7 +912,9 @@ func (p *Plugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, aos // are allowed to be unparseable/missing. If we have less than f+1 // usable observations, we cannot securely generate a median at // all. - p.Logger.Debugw("Not enough observations to calculate median, expected at least f+1", "f", p.F, "streamID", streamID, "observations", observations) + if p.Config.VerboseLogging { + p.Logger.Warnw("Not enough observations to calculate median, expected at least f+1", "f", p.F, "streamID", streamID, "observations", observations, "seqNr", outctx.SeqNr) + } continue } // We use a "rank-k" median here, instead one could average in case of @@ -755,7 +922,10 @@ func (p *Plugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, aos outcome.StreamMedians[streamID] = observations[len(observations)/2] } - return json.Marshal(outcome) + if p.Config.VerboseLogging { + p.Logger.Debugw("Generated outcome", "outcome", outcome, "seqNr", outctx.SeqNr) + } + return p.OutcomeCodec.Encode(outcome) } type Report struct { @@ -805,8 +975,8 @@ func (p *Plugin) Reports(seqNr uint64, rawOutcome ocr3types.Outcome) ([]ocr3type return nil, nil } - var outcome Outcome - if err := json.Unmarshal(rawOutcome, &outcome); err != nil { + outcome, err := p.OutcomeCodec.Decode(rawOutcome) + if err != nil { return nil, fmt.Errorf("error unmarshalling outcome: %w", err) } @@ -826,6 +996,7 @@ func (p *Plugin) Reports(seqNr uint64, rawOutcome ocr3types.Outcome) ([]ocr3type } rwis = append(rwis, ocr3types.ReportWithInfo[llotypes.ReportInfo]{ + // TODO: Needs retirement report codec Report: must(json.Marshal(retirementReport)), Info: llotypes.ReportInfo{ LifeCycleStage: outcome.LifeCycleStage, @@ -834,7 +1005,12 @@ func (p *Plugin) Reports(seqNr uint64, rawOutcome ocr3types.Outcome) ([]ocr3type }) } - for _, channelID := range outcome.ReportableChannels() { + reportableChannels, unreportableChannels := outcome.ReportableChannels() + if p.Config.VerboseLogging { + p.Logger.Debugw("Reportable channels", "reportableChannels", reportableChannels, "unreportableChannels", unreportableChannels, "seqNr", seqNr) + } + + for _, channelID := range reportableChannels { channelDefinition := outcome.ChannelDefinitions[channelID] values := []*big.Int{} for _, streamID := range channelDefinition.StreamIDs { @@ -865,8 +1041,8 @@ func (p *Plugin) Reports(seqNr uint64, rawOutcome ocr3types.Outcome) ([]ocr3type }) } - if len(rwis) == 0 { - p.Logger.Debugw("No reports", "reportableChannels", outcome.ReportableChannels()) + if p.Config.VerboseLogging && len(rwis) == 0 { + p.Logger.Debugw("No reports, will not transmit anything", "reportableChannels", reportableChannels, "seqNr", seqNr) } return rwis, nil @@ -922,3 +1098,10 @@ func subtractChannelDefinitions(minuend llotypes.ChannelDefinitions, subtrahend return difference } + +// deterministic sort of channel IDs +func sortChannelIDs(cids []llotypes.ChannelID) { + sort.Slice(cids, func(i, j int) bool { + return cids[i] < cids[j] + }) +} diff --git a/llo/plugin_codecs.go b/llo/plugin_codecs.go new file mode 100644 index 0000000..0c0034d --- /dev/null +++ b/llo/plugin_codecs.go @@ -0,0 +1,263 @@ +package llo + +import ( + "fmt" + "math/big" + "sort" + + "github.com/smartcontractkit/libocr/offchainreporting2/types" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + "golang.org/x/exp/maps" + + llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" + + "google.golang.org/protobuf/proto" +) + +// NOTE: These codecs make a lot of allocations which will be hard on the +// garbage collector, this can probably be made more efficient + +// OBSERVATION CODEC + +var ( + _ ObservationCodec = (*protoObservationCodec)(nil) +) + +type ObservationCodec interface { + Encode(obs Observation) (types.Observation, error) + Decode(encoded types.Observation) (obs Observation, err error) +} + +type protoObservationCodec struct{} + +func (c protoObservationCodec) Encode(obs Observation) (types.Observation, error) { + dfns := channelDefinitionsToProtoObservation(obs.UpdateChannelDefinitions) + + streamValues := make(map[uint32][]byte, len(obs.StreamValues)) + for id, sv := range obs.StreamValues { + if sv != nil { + streamValues[id] = sv.Bytes() + } + } + + pbuf := &LLOObservationProto{ + AttestedPredecessorRetirement: obs.AttestedPredecessorRetirement, + ShouldRetire: obs.ShouldRetire, + UnixTimestampNanoseconds: obs.UnixTimestampNanoseconds, + RemoveChannelIDs: maps.Keys(obs.RemoveChannelIDs), + UpdateChannelDefinitions: dfns, + StreamValues: streamValues, + } + + return proto.Marshal(pbuf) +} + +func channelDefinitionsToProtoObservation(in llotypes.ChannelDefinitions) (out map[uint32]*LLOChannelDefinitionProto) { + if len(in) > 0 { + out = make(map[uint32]*LLOChannelDefinitionProto, len(in)) + for id, d := range in { + out[id] = &LLOChannelDefinitionProto{ + ReportFormat: uint32(d.ReportFormat), + ChainSelector: d.ChainSelector, + StreamIDs: d.StreamIDs, + } + } + } + return +} + +// TODO: Guard against untrusted inputs! +func (c protoObservationCodec) Decode(b types.Observation) (Observation, error) { + pbuf := &LLOObservationProto{} + err := proto.Unmarshal(b, pbuf) + if err != nil { + return Observation{}, fmt.Errorf("failed to decode observation: expected protobuf (got: 0x%x); %w", b, err) + } + var removeChannelIDs map[llotypes.ChannelID]struct{} + if len(pbuf.RemoveChannelIDs) > 0 { + removeChannelIDs = make(map[llotypes.ChannelID]struct{}, len(pbuf.RemoveChannelIDs)) + for _, id := range pbuf.RemoveChannelIDs { + removeChannelIDs[id] = struct{}{} + } + } + dfns := channelDefinitionsFromProtoObservation(pbuf.UpdateChannelDefinitions) + var streamValues StreamValues + if len(pbuf.StreamValues) > 0 { + streamValues = make(StreamValues, len(pbuf.StreamValues)) + for id, sv := range pbuf.StreamValues { + // StreamValues shouldn't have explicit nils, but for safety we + // ought to handle it anyway + if sv != nil { + streamValues[id] = new(big.Int).SetBytes(sv) + } + } + } + obs := Observation{ + AttestedPredecessorRetirement: pbuf.AttestedPredecessorRetirement, + ShouldRetire: pbuf.ShouldRetire, + UnixTimestampNanoseconds: pbuf.UnixTimestampNanoseconds, + RemoveChannelIDs: removeChannelIDs, + UpdateChannelDefinitions: dfns, + StreamValues: streamValues, + } + return obs, nil +} + +func channelDefinitionsFromProtoObservation(channelDefinitions map[uint32]*LLOChannelDefinitionProto) llotypes.ChannelDefinitions { + if len(channelDefinitions) == 0 { + return nil + } + dfns := make(map[llotypes.ChannelID]llotypes.ChannelDefinition, len(channelDefinitions)) + for id, d := range channelDefinitions { + dfns[id] = llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormat(d.ReportFormat), + ChainSelector: d.ChainSelector, + StreamIDs: d.StreamIDs, + } + } + return dfns +} + +// OUTCOME CODEC + +var _ OutcomeCodec = (*protoOutcomeCodec)(nil) + +type OutcomeCodec interface { + Encode(outcome Outcome) (ocr3types.Outcome, error) + Decode(encoded ocr3types.Outcome) (outcome Outcome, err error) +} + +type protoOutcomeCodec struct{} + +func (protoOutcomeCodec) Encode(outcome Outcome) (ocr3types.Outcome, error) { + dfns := channelDefinitionsToProtoOutcome(outcome.ChannelDefinitions) + + streamMedians := streamMediansToProtoOutcome(outcome.StreamMedians) + validAfterSeconds := validAfterSecondsToProtoOutcome(outcome.ValidAfterSeconds) + + pbuf := &LLOOutcomeProto{ + LifeCycleStage: string(outcome.LifeCycleStage), + ObservationsTimestampNanoseconds: outcome.ObservationsTimestampNanoseconds, + ChannelDefinitions: dfns, + ValidAfterSeconds: validAfterSeconds, + StreamMedians: streamMedians, + } + + // It's very important that Outcome serialization be deterministic across all nodes! + // Should be reliable since we don't use maps + return proto.MarshalOptions{Deterministic: true}.Marshal(pbuf) +} + +func channelDefinitionsToProtoOutcome(in llotypes.ChannelDefinitions) (out []*LLOChannelIDAndDefinitionProto) { + if len(in) > 0 { + out = make([]*LLOChannelIDAndDefinitionProto, 0, len(in)) + for id, d := range in { + out = append(out, &LLOChannelIDAndDefinitionProto{ + ChannelID: id, + ChannelDefinition: &LLOChannelDefinitionProto{ + ReportFormat: uint32(d.ReportFormat), + ChainSelector: d.ChainSelector, + StreamIDs: d.StreamIDs, + }, + }) + } + sort.Slice(out, func(i, j int) bool { + return out[i].ChannelID < out[j].ChannelID + }) + } + return +} + +func streamMediansToProtoOutcome(in StreamValues) (out []*LLOStreamIDAndValue) { + if len(in) > 0 { + out = make([]*LLOStreamIDAndValue, 0, len(in)) + for id, v := range in { + // StreamMedians shouldn't have explicit nil values, but for + // safety we ought to handle it anyway + if v != nil { + out = append(out, &LLOStreamIDAndValue{ + StreamID: id, + Value: v.Bytes(), + }) + } + } + sort.Slice(out, func(i, j int) bool { + return out[i].StreamID < out[j].StreamID + }) + } + return +} + +func validAfterSecondsToProtoOutcome(in map[llotypes.ChannelID]uint32) (out []*LLOChannelIDAndValidAfterSecondsProto) { + if len(in) > 0 { + out = make([]*LLOChannelIDAndValidAfterSecondsProto, 0, len(in)) + for id, v := range in { + out = append(out, &LLOChannelIDAndValidAfterSecondsProto{ + ChannelID: id, + ValidAfterSeconds: v, + }) + } + sort.Slice(out, func(i, j int) bool { + return out[i].ChannelID < out[j].ChannelID + }) + } + return +} + +// TODO: Guard against untrusted inputs! +func (protoOutcomeCodec) Decode(b ocr3types.Outcome) (outcome Outcome, err error) { + pbuf := &LLOOutcomeProto{} + err = proto.Unmarshal(b, pbuf) + if err != nil { + return Outcome{}, fmt.Errorf("failed to decode outcome: expected protobuf (got: 0x%x); %w", b, err) + } + dfns := channelDefinitionsFromProtoOutcome(pbuf.ChannelDefinitions) + streamMedians := streamMediansFromProtoOutcome(pbuf.StreamMedians) + validAfterSeconds := validAfterSecondsFromProtoOutcome(pbuf.ValidAfterSeconds) + outcome = Outcome{ + LifeCycleStage: llotypes.LifeCycleStage(pbuf.LifeCycleStage), + ObservationsTimestampNanoseconds: pbuf.ObservationsTimestampNanoseconds, + ChannelDefinitions: dfns, + ValidAfterSeconds: validAfterSeconds, + StreamMedians: streamMedians, + } + return outcome, nil +} + +func channelDefinitionsFromProtoOutcome(in []*LLOChannelIDAndDefinitionProto) (out llotypes.ChannelDefinitions) { + if len(in) > 0 { + out = make(map[llotypes.ChannelID]llotypes.ChannelDefinition, len(in)) + for _, d := range in { + out[d.ChannelID] = llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormat(d.ChannelDefinition.ReportFormat), + ChainSelector: d.ChannelDefinition.ChainSelector, + StreamIDs: d.ChannelDefinition.StreamIDs, + } + } + } + return +} + +func streamMediansFromProtoOutcome(in []*LLOStreamIDAndValue) (out StreamValues) { + if len(in) > 0 { + out = make(map[llotypes.StreamID]*big.Int, len(in)) + for _, sv := range in { + if sv.Value != nil { + // StreamMedians shouldn't have explicit nil values, but for + // safety we ought to handle it anyway + out[sv.StreamID] = new(big.Int).SetBytes(sv.Value) + } + } + } + return +} + +func validAfterSecondsFromProtoOutcome(in []*LLOChannelIDAndValidAfterSecondsProto) (out map[llotypes.ChannelID]uint32) { + if len(in) > 0 { + out = make(map[llotypes.ChannelID]uint32, len(in)) + for _, v := range in { + out[v.ChannelID] = v.ValidAfterSeconds + } + } + return +} diff --git a/llo/plugin_codecs.pb.go b/llo/plugin_codecs.pb.go new file mode 100644 index 0000000..afe9511 --- /dev/null +++ b/llo/plugin_codecs.pb.go @@ -0,0 +1,730 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v4.23.2 +// source: plugin_codecs.proto + +package llo + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// WARNING +// Use extreme caution making changes in this file +// All changes MUST be backwards compatible +// If the format changes in a backwards incompatible way, active DONs can +// become stuck permanently +type LLOObservationProto struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + AttestedPredecessorRetirement []byte `protobuf:"bytes,1,opt,name=attestedPredecessorRetirement,proto3" json:"attestedPredecessorRetirement,omitempty"` + ShouldRetire bool `protobuf:"varint,2,opt,name=shouldRetire,proto3" json:"shouldRetire,omitempty"` + UnixTimestampNanoseconds int64 `protobuf:"varint,3,opt,name=unixTimestampNanoseconds,proto3" json:"unixTimestampNanoseconds,omitempty"` + RemoveChannelIDs []uint32 `protobuf:"varint,4,rep,packed,name=removeChannelIDs,proto3" json:"removeChannelIDs,omitempty"` + UpdateChannelDefinitions map[uint32]*LLOChannelDefinitionProto `protobuf:"bytes,5,rep,name=updateChannelDefinitions,proto3" json:"updateChannelDefinitions,omitempty" protobuf_key:"varint,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + StreamValues map[uint32][]byte `protobuf:"bytes,6,rep,name=streamValues,proto3" json:"streamValues,omitempty" protobuf_key:"varint,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (x *LLOObservationProto) Reset() { + *x = LLOObservationProto{} + if protoimpl.UnsafeEnabled { + mi := &file_plugin_codecs_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *LLOObservationProto) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LLOObservationProto) ProtoMessage() {} + +func (x *LLOObservationProto) ProtoReflect() protoreflect.Message { + mi := &file_plugin_codecs_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LLOObservationProto.ProtoReflect.Descriptor instead. +func (*LLOObservationProto) Descriptor() ([]byte, []int) { + return file_plugin_codecs_proto_rawDescGZIP(), []int{0} +} + +func (x *LLOObservationProto) GetAttestedPredecessorRetirement() []byte { + if x != nil { + return x.AttestedPredecessorRetirement + } + return nil +} + +func (x *LLOObservationProto) GetShouldRetire() bool { + if x != nil { + return x.ShouldRetire + } + return false +} + +func (x *LLOObservationProto) GetUnixTimestampNanoseconds() int64 { + if x != nil { + return x.UnixTimestampNanoseconds + } + return 0 +} + +func (x *LLOObservationProto) GetRemoveChannelIDs() []uint32 { + if x != nil { + return x.RemoveChannelIDs + } + return nil +} + +func (x *LLOObservationProto) GetUpdateChannelDefinitions() map[uint32]*LLOChannelDefinitionProto { + if x != nil { + return x.UpdateChannelDefinitions + } + return nil +} + +func (x *LLOObservationProto) GetStreamValues() map[uint32][]byte { + if x != nil { + return x.StreamValues + } + return nil +} + +type LLOChannelDefinitionProto struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ReportFormat uint32 `protobuf:"varint,1,opt,name=reportFormat,proto3" json:"reportFormat,omitempty"` + ChainSelector uint64 `protobuf:"varint,2,opt,name=chainSelector,proto3" json:"chainSelector,omitempty"` + StreamIDs []uint32 `protobuf:"varint,3,rep,packed,name=streamIDs,proto3" json:"streamIDs,omitempty"` +} + +func (x *LLOChannelDefinitionProto) Reset() { + *x = LLOChannelDefinitionProto{} + if protoimpl.UnsafeEnabled { + mi := &file_plugin_codecs_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *LLOChannelDefinitionProto) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LLOChannelDefinitionProto) ProtoMessage() {} + +func (x *LLOChannelDefinitionProto) ProtoReflect() protoreflect.Message { + mi := &file_plugin_codecs_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LLOChannelDefinitionProto.ProtoReflect.Descriptor instead. +func (*LLOChannelDefinitionProto) Descriptor() ([]byte, []int) { + return file_plugin_codecs_proto_rawDescGZIP(), []int{1} +} + +func (x *LLOChannelDefinitionProto) GetReportFormat() uint32 { + if x != nil { + return x.ReportFormat + } + return 0 +} + +func (x *LLOChannelDefinitionProto) GetChainSelector() uint64 { + if x != nil { + return x.ChainSelector + } + return 0 +} + +func (x *LLOChannelDefinitionProto) GetStreamIDs() []uint32 { + if x != nil { + return x.StreamIDs + } + return nil +} + +type LLOStreamObservationProto struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Valid bool `protobuf:"varint,1,opt,name=valid,proto3" json:"valid,omitempty"` + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` +} + +func (x *LLOStreamObservationProto) Reset() { + *x = LLOStreamObservationProto{} + if protoimpl.UnsafeEnabled { + mi := &file_plugin_codecs_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *LLOStreamObservationProto) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LLOStreamObservationProto) ProtoMessage() {} + +func (x *LLOStreamObservationProto) ProtoReflect() protoreflect.Message { + mi := &file_plugin_codecs_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LLOStreamObservationProto.ProtoReflect.Descriptor instead. +func (*LLOStreamObservationProto) Descriptor() ([]byte, []int) { + return file_plugin_codecs_proto_rawDescGZIP(), []int{2} +} + +func (x *LLOStreamObservationProto) GetValid() bool { + if x != nil { + return x.Valid + } + return false +} + +func (x *LLOStreamObservationProto) GetValue() []byte { + if x != nil { + return x.Value + } + return nil +} + +// NOTE: Use of repeated tuple instead of maps for more reliable determinstic +// serialization +type LLOOutcomeProto struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + LifeCycleStage string `protobuf:"bytes,1,opt,name=lifeCycleStage,proto3" json:"lifeCycleStage,omitempty"` + ObservationsTimestampNanoseconds int64 `protobuf:"varint,2,opt,name=ObservationsTimestampNanoseconds,proto3" json:"ObservationsTimestampNanoseconds,omitempty"` + ChannelDefinitions []*LLOChannelIDAndDefinitionProto `protobuf:"bytes,3,rep,name=channelDefinitions,proto3" json:"channelDefinitions,omitempty"` + ValidAfterSeconds []*LLOChannelIDAndValidAfterSecondsProto `protobuf:"bytes,4,rep,name=validAfterSeconds,proto3" json:"validAfterSeconds,omitempty"` + StreamMedians []*LLOStreamIDAndValue `protobuf:"bytes,5,rep,name=streamMedians,proto3" json:"streamMedians,omitempty"` +} + +func (x *LLOOutcomeProto) Reset() { + *x = LLOOutcomeProto{} + if protoimpl.UnsafeEnabled { + mi := &file_plugin_codecs_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *LLOOutcomeProto) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LLOOutcomeProto) ProtoMessage() {} + +func (x *LLOOutcomeProto) ProtoReflect() protoreflect.Message { + mi := &file_plugin_codecs_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LLOOutcomeProto.ProtoReflect.Descriptor instead. +func (*LLOOutcomeProto) Descriptor() ([]byte, []int) { + return file_plugin_codecs_proto_rawDescGZIP(), []int{3} +} + +func (x *LLOOutcomeProto) GetLifeCycleStage() string { + if x != nil { + return x.LifeCycleStage + } + return "" +} + +func (x *LLOOutcomeProto) GetObservationsTimestampNanoseconds() int64 { + if x != nil { + return x.ObservationsTimestampNanoseconds + } + return 0 +} + +func (x *LLOOutcomeProto) GetChannelDefinitions() []*LLOChannelIDAndDefinitionProto { + if x != nil { + return x.ChannelDefinitions + } + return nil +} + +func (x *LLOOutcomeProto) GetValidAfterSeconds() []*LLOChannelIDAndValidAfterSecondsProto { + if x != nil { + return x.ValidAfterSeconds + } + return nil +} + +func (x *LLOOutcomeProto) GetStreamMedians() []*LLOStreamIDAndValue { + if x != nil { + return x.StreamMedians + } + return nil +} + +type LLOChannelIDAndDefinitionProto struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ChannelID uint32 `protobuf:"varint,1,opt,name=channelID,proto3" json:"channelID,omitempty"` + ChannelDefinition *LLOChannelDefinitionProto `protobuf:"bytes,2,opt,name=channelDefinition,proto3" json:"channelDefinition,omitempty"` +} + +func (x *LLOChannelIDAndDefinitionProto) Reset() { + *x = LLOChannelIDAndDefinitionProto{} + if protoimpl.UnsafeEnabled { + mi := &file_plugin_codecs_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *LLOChannelIDAndDefinitionProto) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LLOChannelIDAndDefinitionProto) ProtoMessage() {} + +func (x *LLOChannelIDAndDefinitionProto) ProtoReflect() protoreflect.Message { + mi := &file_plugin_codecs_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LLOChannelIDAndDefinitionProto.ProtoReflect.Descriptor instead. +func (*LLOChannelIDAndDefinitionProto) Descriptor() ([]byte, []int) { + return file_plugin_codecs_proto_rawDescGZIP(), []int{4} +} + +func (x *LLOChannelIDAndDefinitionProto) GetChannelID() uint32 { + if x != nil { + return x.ChannelID + } + return 0 +} + +func (x *LLOChannelIDAndDefinitionProto) GetChannelDefinition() *LLOChannelDefinitionProto { + if x != nil { + return x.ChannelDefinition + } + return nil +} + +type LLOChannelIDAndValidAfterSecondsProto struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ChannelID uint32 `protobuf:"varint,1,opt,name=channelID,proto3" json:"channelID,omitempty"` + ValidAfterSeconds uint32 `protobuf:"varint,2,opt,name=validAfterSeconds,proto3" json:"validAfterSeconds,omitempty"` +} + +func (x *LLOChannelIDAndValidAfterSecondsProto) Reset() { + *x = LLOChannelIDAndValidAfterSecondsProto{} + if protoimpl.UnsafeEnabled { + mi := &file_plugin_codecs_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *LLOChannelIDAndValidAfterSecondsProto) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LLOChannelIDAndValidAfterSecondsProto) ProtoMessage() {} + +func (x *LLOChannelIDAndValidAfterSecondsProto) ProtoReflect() protoreflect.Message { + mi := &file_plugin_codecs_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LLOChannelIDAndValidAfterSecondsProto.ProtoReflect.Descriptor instead. +func (*LLOChannelIDAndValidAfterSecondsProto) Descriptor() ([]byte, []int) { + return file_plugin_codecs_proto_rawDescGZIP(), []int{5} +} + +func (x *LLOChannelIDAndValidAfterSecondsProto) GetChannelID() uint32 { + if x != nil { + return x.ChannelID + } + return 0 +} + +func (x *LLOChannelIDAndValidAfterSecondsProto) GetValidAfterSeconds() uint32 { + if x != nil { + return x.ValidAfterSeconds + } + return 0 +} + +type LLOStreamIDAndValue struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StreamID uint32 `protobuf:"varint,1,opt,name=streamID,proto3" json:"streamID,omitempty"` + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` +} + +func (x *LLOStreamIDAndValue) Reset() { + *x = LLOStreamIDAndValue{} + if protoimpl.UnsafeEnabled { + mi := &file_plugin_codecs_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *LLOStreamIDAndValue) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LLOStreamIDAndValue) ProtoMessage() {} + +func (x *LLOStreamIDAndValue) ProtoReflect() protoreflect.Message { + mi := &file_plugin_codecs_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LLOStreamIDAndValue.ProtoReflect.Descriptor instead. +func (*LLOStreamIDAndValue) Descriptor() ([]byte, []int) { + return file_plugin_codecs_proto_rawDescGZIP(), []int{6} +} + +func (x *LLOStreamIDAndValue) GetStreamID() uint32 { + if x != nil { + return x.StreamID + } + return 0 +} + +func (x *LLOStreamIDAndValue) GetValue() []byte { + if x != nil { + return x.Value + } + return nil +} + +var File_plugin_codecs_proto protoreflect.FileDescriptor + +var file_plugin_codecs_proto_rawDesc = []byte{ + 0x0a, 0x13, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x63, 0x73, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x02, 0x76, 0x31, 0x22, 0xd6, 0x04, 0x0a, 0x13, 0x4c, 0x4c, + 0x4f, 0x4f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x72, 0x6f, 0x74, + 0x6f, 0x12, 0x44, 0x0a, 0x1d, 0x61, 0x74, 0x74, 0x65, 0x73, 0x74, 0x65, 0x64, 0x50, 0x72, 0x65, + 0x64, 0x65, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x52, 0x65, 0x74, 0x69, 0x72, 0x65, 0x6d, 0x65, + 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x1d, 0x61, 0x74, 0x74, 0x65, 0x73, 0x74, + 0x65, 0x64, 0x50, 0x72, 0x65, 0x64, 0x65, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x52, 0x65, 0x74, + 0x69, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x22, 0x0a, 0x0c, 0x73, 0x68, 0x6f, 0x75, 0x6c, + 0x64, 0x52, 0x65, 0x74, 0x69, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x73, + 0x68, 0x6f, 0x75, 0x6c, 0x64, 0x52, 0x65, 0x74, 0x69, 0x72, 0x65, 0x12, 0x3a, 0x0a, 0x18, 0x75, + 0x6e, 0x69, 0x78, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x4e, 0x61, 0x6e, 0x6f, + 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x18, 0x75, + 0x6e, 0x69, 0x78, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x4e, 0x61, 0x6e, 0x6f, + 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x12, 0x2a, 0x0a, 0x10, 0x72, 0x65, 0x6d, 0x6f, 0x76, + 0x65, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x49, 0x44, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, + 0x0d, 0x52, 0x10, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, + 0x49, 0x44, 0x73, 0x12, 0x71, 0x0a, 0x18, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x43, 0x68, 0x61, + 0x6e, 0x6e, 0x65, 0x6c, 0x44, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, + 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x35, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x4c, 0x4f, 0x4f, 0x62, + 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x55, + 0x70, 0x64, 0x61, 0x74, 0x65, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x44, 0x65, 0x66, 0x69, + 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x18, 0x75, 0x70, + 0x64, 0x61, 0x74, 0x65, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x44, 0x65, 0x66, 0x69, 0x6e, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x4d, 0x0a, 0x0c, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x56, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x18, 0x06, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x76, + 0x31, 0x2e, 0x4c, 0x4c, 0x4f, 0x4f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x56, 0x61, 0x6c, 0x75, + 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0c, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x56, + 0x61, 0x6c, 0x75, 0x65, 0x73, 0x1a, 0x6a, 0x0a, 0x1d, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x43, + 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x44, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, + 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x0d, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x33, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x4c, 0x4f, + 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x44, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, + 0x01, 0x1a, 0x3f, 0x0a, 0x11, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x56, 0x61, 0x6c, 0x75, 0x65, + 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x0d, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, + 0x38, 0x01, 0x22, 0x83, 0x01, 0x0a, 0x19, 0x4c, 0x4c, 0x4f, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, + 0x6c, 0x44, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x72, 0x6f, 0x74, 0x6f, + 0x12, 0x22, 0x0a, 0x0c, 0x72, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x46, 0x6f, 0x72, 0x6d, 0x61, 0x74, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0c, 0x72, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x46, 0x6f, + 0x72, 0x6d, 0x61, 0x74, 0x12, 0x24, 0x0a, 0x0d, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x53, 0x65, 0x6c, + 0x65, 0x63, 0x74, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, 0x63, 0x68, 0x61, + 0x69, 0x6e, 0x53, 0x65, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0d, 0x52, 0x09, 0x73, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, 0x73, 0x22, 0x47, 0x0a, 0x19, 0x4c, 0x4c, 0x4f, 0x53, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x22, 0xf1, 0x02, 0x0a, 0x0f, 0x4c, 0x4c, 0x4f, 0x4f, 0x75, 0x74, 0x63, 0x6f, 0x6d, 0x65, + 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x26, 0x0a, 0x0e, 0x6c, 0x69, 0x66, 0x65, 0x43, 0x79, 0x63, + 0x6c, 0x65, 0x53, 0x74, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x6c, + 0x69, 0x66, 0x65, 0x43, 0x79, 0x63, 0x6c, 0x65, 0x53, 0x74, 0x61, 0x67, 0x65, 0x12, 0x4a, 0x0a, + 0x20, 0x4f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x54, 0x69, 0x6d, + 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x4e, 0x61, 0x6e, 0x6f, 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, + 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x20, 0x4f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x4e, 0x61, + 0x6e, 0x6f, 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x12, 0x52, 0x0a, 0x12, 0x63, 0x68, 0x61, + 0x6e, 0x6e, 0x65, 0x6c, 0x44, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, + 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x4c, 0x4f, 0x43, 0x68, + 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x49, 0x44, 0x41, 0x6e, 0x64, 0x44, 0x65, 0x66, 0x69, 0x6e, 0x69, + 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x52, 0x12, 0x63, 0x68, 0x61, 0x6e, 0x6e, + 0x65, 0x6c, 0x44, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x57, 0x0a, + 0x11, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x41, 0x66, 0x74, 0x65, 0x72, 0x53, 0x65, 0x63, 0x6f, 0x6e, + 0x64, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x4c, + 0x4f, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x49, 0x44, 0x41, 0x6e, 0x64, 0x56, 0x61, 0x6c, + 0x69, 0x64, 0x41, 0x66, 0x74, 0x65, 0x72, 0x53, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x50, 0x72, + 0x6f, 0x74, 0x6f, 0x52, 0x11, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x41, 0x66, 0x74, 0x65, 0x72, 0x53, + 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x12, 0x3d, 0x0a, 0x0d, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x4d, 0x65, 0x64, 0x69, 0x61, 0x6e, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x17, 0x2e, + 0x76, 0x31, 0x2e, 0x4c, 0x4c, 0x4f, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, 0x41, 0x6e, + 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x0d, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4d, 0x65, + 0x64, 0x69, 0x61, 0x6e, 0x73, 0x22, 0x8b, 0x01, 0x0a, 0x1e, 0x4c, 0x4c, 0x4f, 0x43, 0x68, 0x61, + 0x6e, 0x6e, 0x65, 0x6c, 0x49, 0x44, 0x41, 0x6e, 0x64, 0x44, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, + 0x69, 0x6f, 0x6e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x1c, 0x0a, 0x09, 0x63, 0x68, 0x61, 0x6e, + 0x6e, 0x65, 0x6c, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x63, 0x68, 0x61, + 0x6e, 0x6e, 0x65, 0x6c, 0x49, 0x44, 0x12, 0x4b, 0x0a, 0x11, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, + 0x6c, 0x44, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x1d, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x4c, 0x4f, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, + 0x6c, 0x44, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x72, 0x6f, 0x74, 0x6f, + 0x52, 0x11, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x44, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, + 0x69, 0x6f, 0x6e, 0x22, 0x73, 0x0a, 0x25, 0x4c, 0x4c, 0x4f, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, + 0x6c, 0x49, 0x44, 0x41, 0x6e, 0x64, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x41, 0x66, 0x74, 0x65, 0x72, + 0x53, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x1c, 0x0a, 0x09, + 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, + 0x09, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x49, 0x44, 0x12, 0x2c, 0x0a, 0x11, 0x76, 0x61, + 0x6c, 0x69, 0x64, 0x41, 0x66, 0x74, 0x65, 0x72, 0x53, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x11, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x41, 0x66, 0x74, 0x65, + 0x72, 0x53, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x22, 0x47, 0x0a, 0x13, 0x4c, 0x4c, 0x4f, 0x53, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, 0x41, 0x6e, 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, + 0x1a, 0x0a, 0x08, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0d, 0x52, 0x08, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, 0x12, 0x14, 0x0a, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x3b, 0x6c, 0x6c, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, +} + +var ( + file_plugin_codecs_proto_rawDescOnce sync.Once + file_plugin_codecs_proto_rawDescData = file_plugin_codecs_proto_rawDesc +) + +func file_plugin_codecs_proto_rawDescGZIP() []byte { + file_plugin_codecs_proto_rawDescOnce.Do(func() { + file_plugin_codecs_proto_rawDescData = protoimpl.X.CompressGZIP(file_plugin_codecs_proto_rawDescData) + }) + return file_plugin_codecs_proto_rawDescData +} + +var file_plugin_codecs_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_plugin_codecs_proto_goTypes = []interface{}{ + (*LLOObservationProto)(nil), // 0: v1.LLOObservationProto + (*LLOChannelDefinitionProto)(nil), // 1: v1.LLOChannelDefinitionProto + (*LLOStreamObservationProto)(nil), // 2: v1.LLOStreamObservationProto + (*LLOOutcomeProto)(nil), // 3: v1.LLOOutcomeProto + (*LLOChannelIDAndDefinitionProto)(nil), // 4: v1.LLOChannelIDAndDefinitionProto + (*LLOChannelIDAndValidAfterSecondsProto)(nil), // 5: v1.LLOChannelIDAndValidAfterSecondsProto + (*LLOStreamIDAndValue)(nil), // 6: v1.LLOStreamIDAndValue + nil, // 7: v1.LLOObservationProto.UpdateChannelDefinitionsEntry + nil, // 8: v1.LLOObservationProto.StreamValuesEntry +} +var file_plugin_codecs_proto_depIdxs = []int32{ + 7, // 0: v1.LLOObservationProto.updateChannelDefinitions:type_name -> v1.LLOObservationProto.UpdateChannelDefinitionsEntry + 8, // 1: v1.LLOObservationProto.streamValues:type_name -> v1.LLOObservationProto.StreamValuesEntry + 4, // 2: v1.LLOOutcomeProto.channelDefinitions:type_name -> v1.LLOChannelIDAndDefinitionProto + 5, // 3: v1.LLOOutcomeProto.validAfterSeconds:type_name -> v1.LLOChannelIDAndValidAfterSecondsProto + 6, // 4: v1.LLOOutcomeProto.streamMedians:type_name -> v1.LLOStreamIDAndValue + 1, // 5: v1.LLOChannelIDAndDefinitionProto.channelDefinition:type_name -> v1.LLOChannelDefinitionProto + 1, // 6: v1.LLOObservationProto.UpdateChannelDefinitionsEntry.value:type_name -> v1.LLOChannelDefinitionProto + 7, // [7:7] is the sub-list for method output_type + 7, // [7:7] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name +} + +func init() { file_plugin_codecs_proto_init() } +func file_plugin_codecs_proto_init() { + if File_plugin_codecs_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_plugin_codecs_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*LLOObservationProto); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_plugin_codecs_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*LLOChannelDefinitionProto); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_plugin_codecs_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*LLOStreamObservationProto); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_plugin_codecs_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*LLOOutcomeProto); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_plugin_codecs_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*LLOChannelIDAndDefinitionProto); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_plugin_codecs_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*LLOChannelIDAndValidAfterSecondsProto); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_plugin_codecs_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*LLOStreamIDAndValue); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_plugin_codecs_proto_rawDesc, + NumEnums: 0, + NumMessages: 9, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_plugin_codecs_proto_goTypes, + DependencyIndexes: file_plugin_codecs_proto_depIdxs, + MessageInfos: file_plugin_codecs_proto_msgTypes, + }.Build() + File_plugin_codecs_proto = out.File + file_plugin_codecs_proto_rawDesc = nil + file_plugin_codecs_proto_goTypes = nil + file_plugin_codecs_proto_depIdxs = nil +} diff --git a/llo/plugin_codecs.proto b/llo/plugin_codecs.proto new file mode 100644 index 0000000..f0ba22a --- /dev/null +++ b/llo/plugin_codecs.proto @@ -0,0 +1,58 @@ +syntax="proto3"; + +package v1; +option go_package = ".;llo"; + +// WARNING +// Use extreme caution making changes in this file +// All changes MUST be backwards compatible +// If the format changes in a backwards incompatible way, active DONs can +// become stuck permanently +message LLOObservationProto { + bytes attestedPredecessorRetirement = 1; + bool shouldRetire = 2; + int64 unixTimestampNanoseconds = 3; + repeated uint32 removeChannelIDs = 4; + // Maps are safe to use here because Observation serialization does not + // need to be deterministic. Non-deterministic map serialization is + // marginally more efficient than converting to tuples and guarantees + // uniqueness. + map updateChannelDefinitions = 5; + map streamValues = 6; +} + +message LLOChannelDefinitionProto { + uint32 reportFormat = 1; + uint64 chainSelector = 2; + repeated uint32 streamIDs = 3; +} + +message LLOStreamObservationProto { + bool valid = 1; + bytes value = 2; +} + +// NOTE: Outcome must serialize deterministically, hence use of repeated tuple instead of maps +message LLOOutcomeProto { + string lifeCycleStage = 1; + int64 ObservationsTimestampNanoseconds = 2; + repeated LLOChannelIDAndDefinitionProto channelDefinitions = 3; + repeated LLOChannelIDAndValidAfterSecondsProto validAfterSeconds = 4; + repeated LLOStreamIDAndValue streamMedians = 5; +} + +message LLOChannelIDAndDefinitionProto { + uint32 channelID = 1; + LLOChannelDefinitionProto channelDefinition = 2; +} + +message LLOChannelIDAndValidAfterSecondsProto { + uint32 channelID = 1; + uint32 validAfterSeconds = 2; +} + +message LLOStreamIDAndValue { + uint32 streamID = 1; + bytes value = 2; +} + diff --git a/llo/plugin_codecs_test.go b/llo/plugin_codecs_test.go new file mode 100644 index 0000000..00a7f87 --- /dev/null +++ b/llo/plugin_codecs_test.go @@ -0,0 +1,108 @@ +package llo + +import ( + "math/big" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" +) + +// TODO: probably ought to have fuzz testing to detect crashes +// TODO: what about resource starvation attacks? maximum length? Does OCR +// protect us from this? + +func Test_protoObservationCodec(t *testing.T) { + t.Run("encode and decode empty struct", func(t *testing.T) { + obs := Observation{} + obsBytes, err := (protoObservationCodec{}).Encode(obs) + require.NoError(t, err) + + obs2, err := (protoObservationCodec{}).Decode(obsBytes) + require.NoError(t, err) + + assert.Equal(t, obs, obs2) + }) + t.Run("encode and decode with values", func(t *testing.T) { + obs := Observation{ + AttestedPredecessorRetirement: []byte{1, 2, 3}, + ShouldRetire: true, + UnixTimestampNanoseconds: 1234567890, + RemoveChannelIDs: map[llotypes.ChannelID]struct{}{ + 1: {}, + 2: {}, + }, + UpdateChannelDefinitions: map[llotypes.ChannelID]llotypes.ChannelDefinition{ + 3: { + ReportFormat: llotypes.ReportFormatJSON, + ChainSelector: 12345, + StreamIDs: []llotypes.StreamID{3, 4}, + }, + }, + StreamValues: map[llotypes.StreamID]*big.Int{ + 4: big.NewInt(123), + 5: big.NewInt(456), + 6: (*big.Int)(nil), + }, + } + + obsBytes, err := (protoObservationCodec{}).Encode(obs) + require.NoError(t, err) + + obs2, err := (protoObservationCodec{}).Decode(obsBytes) + require.NoError(t, err) + + expectedObs := obs + delete(expectedObs.StreamValues, 6) // nils will be dropped + + assert.Equal(t, expectedObs, obs2) + }) +} + +func Test_protoOutcomeCodec(t *testing.T) { + t.Run("encode and decode empty struct", func(t *testing.T) { + outcome := Outcome{} + outcomeBytes, err := (protoOutcomeCodec{}).Encode(outcome) + require.NoError(t, err) + + outcome2, err := (protoOutcomeCodec{}).Decode(outcomeBytes) + require.NoError(t, err) + + assert.Equal(t, outcome, outcome2) + }) + t.Run("encode and decode with values", func(t *testing.T) { + outcome := Outcome{ + LifeCycleStage: llotypes.LifeCycleStage("staging"), + ObservationsTimestampNanoseconds: 1234567890, + ChannelDefinitions: map[llotypes.ChannelID]llotypes.ChannelDefinition{ + 3: { + ReportFormat: llotypes.ReportFormatJSON, + ChainSelector: 12345, + StreamIDs: []llotypes.StreamID{1, 2}, + }, + }, + + ValidAfterSeconds: map[llotypes.ChannelID]uint32{ + 3: 123, + }, + StreamMedians: map[llotypes.StreamID]*big.Int{ + 1: big.NewInt(123), + 2: big.NewInt(456), + 3: nil, + }, + } + + outcomeBytes, err := (protoOutcomeCodec{}).Encode(outcome) + require.NoError(t, err) + + outcome2, err := (protoOutcomeCodec{}).Decode(outcomeBytes) + require.NoError(t, err) + + expectedOutcome := outcome + delete(expectedOutcome.StreamMedians, 3) // nils will be dropped + + assert.Equal(t, outcome, outcome2) + }) +} diff --git a/llo/plugin_test.go b/llo/plugin_test.go new file mode 100644 index 0000000..49ffbda --- /dev/null +++ b/llo/plugin_test.go @@ -0,0 +1,557 @@ +package llo + +import ( + "context" + "math/big" + "testing" + "time" + + "github.com/smartcontractkit/libocr/commontypes" + "github.com/smartcontractkit/libocr/offchainreporting2/types" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + "golang.org/x/exp/maps" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type mockShouldRetireCache struct { + shouldRetire bool + err error +} + +func (m *mockShouldRetireCache) ShouldRetire() (bool, error) { + return m.shouldRetire, m.err +} + +type mockChannelDefinitionCache struct { + definitions llotypes.ChannelDefinitions +} + +func (m *mockChannelDefinitionCache) Definitions() llotypes.ChannelDefinitions { + return m.definitions +} + +type mockDataSource struct { + s StreamValues + err error +} + +func (m *mockDataSource) Observe(ctx context.Context, streamValues StreamValues, opts DSOpts) error { + for k, v := range m.s { + streamValues[k] = v + } + return m.err +} + +func Test_Observation(t *testing.T) { + smallDefinitions := map[llotypes.ChannelID]llotypes.ChannelDefinition{ + 1: llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormatJSON, + ChainSelector: 123, + StreamIDs: []llotypes.StreamID{1, 2, 3}, + }, + 2: llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormatEVM, + ChainSelector: 456, + StreamIDs: []llotypes.StreamID{2, 3, 4}, + }, + } + cdc := &mockChannelDefinitionCache{smallDefinitions} + + ds := &mockDataSource{ + s: map[llotypes.StreamID]*big.Int{ + 1: big.NewInt(1000), + 3: big.NewInt(3000), + 4: big.NewInt(4000), + }, + } + + p := &Plugin{ + Config: Config{true}, + OutcomeCodec: protoOutcomeCodec{}, + ShouldRetireCache: &mockShouldRetireCache{}, + ChannelDefinitionCache: cdc, + Logger: logger.Test(t), + ObservationCodec: protoObservationCodec{}, + DataSource: ds, + } + var query types.Query // query is always empty for LLO + + t.Run("seqNr=0 always errors", func(t *testing.T) { + outctx := ocr3types.OutcomeContext{} + _, err := p.Observation(context.Background(), outctx, query) + assert.EqualError(t, err, "got invalid seqnr=0, must be >=1") + }) + + t.Run("seqNr=1 always returns empty observation", func(t *testing.T) { + outctx := ocr3types.OutcomeContext{SeqNr: 1} + obs, err := p.Observation(context.Background(), outctx, query) + require.NoError(t, err) + require.Len(t, obs, 0) + }) + + t.Run("observes timestamp and channel definitions on seqNr=2", func(t *testing.T) { + testStartTS := time.Now() + + outctx := ocr3types.OutcomeContext{SeqNr: 2} + obs, err := p.Observation(context.Background(), outctx, query) + require.NoError(t, err) + decoded, err := p.ObservationCodec.Decode(obs) + require.NoError(t, err) + + assert.Len(t, decoded.AttestedPredecessorRetirement, 0) + assert.False(t, decoded.ShouldRetire) + assert.Len(t, decoded.RemoveChannelIDs, 0) + assert.Len(t, decoded.StreamValues, 0) + assert.Equal(t, cdc.definitions, decoded.UpdateChannelDefinitions) + assert.GreaterOrEqual(t, decoded.UnixTimestampNanoseconds, testStartTS.UnixNano()) + }) + + t.Run("observes streams on seqNr=2", func(t *testing.T) { + testStartTS := time.Now() + + previousOutcome := Outcome{ + LifeCycleStage: llotypes.LifeCycleStage("test"), + ObservationsTimestampNanoseconds: testStartTS.UnixNano(), + ChannelDefinitions: cdc.definitions, + ValidAfterSeconds: nil, + StreamMedians: nil, + } + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + outctx := ocr3types.OutcomeContext{SeqNr: 2, PreviousOutcome: encodedPreviousOutcome} + obs, err := p.Observation(context.Background(), outctx, query) + require.NoError(t, err) + decoded, err := p.ObservationCodec.Decode(obs) + require.NoError(t, err) + + assert.Len(t, decoded.AttestedPredecessorRetirement, 0) + assert.False(t, decoded.ShouldRetire) + assert.Len(t, decoded.UpdateChannelDefinitions, 0) + assert.Len(t, decoded.RemoveChannelIDs, 0) + assert.GreaterOrEqual(t, decoded.UnixTimestampNanoseconds, testStartTS.UnixNano()) + assert.Equal(t, ds.s, decoded.StreamValues) + }) + + mediumDefinitions := map[llotypes.ChannelID]llotypes.ChannelDefinition{ + 1: llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormatJSON, + ChainSelector: 123, + StreamIDs: []llotypes.StreamID{1, 2, 3}, + }, + 3: llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormatEVM, + ChainSelector: 456, + StreamIDs: []llotypes.StreamID{2, 3, 4}, + }, + 4: llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormatEVM, + ChainSelector: 457, + StreamIDs: []llotypes.StreamID{2, 3, 4}, + }, + 5: llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormatEVM, + ChainSelector: 458, + StreamIDs: []llotypes.StreamID{2, 3, 4}, + }, + 6: llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormatEVM, + ChainSelector: 459, + StreamIDs: []llotypes.StreamID{2, 3, 4}, + }, + } + + cdc.definitions = mediumDefinitions + + t.Run("votes to increase channel amount by a small amount, and remove one", func(t *testing.T) { + testStartTS := time.Now() + + previousOutcome := Outcome{ + LifeCycleStage: llotypes.LifeCycleStage("test"), + ObservationsTimestampNanoseconds: testStartTS.UnixNano(), + ChannelDefinitions: smallDefinitions, + ValidAfterSeconds: nil, + StreamMedians: nil, + } + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + outctx := ocr3types.OutcomeContext{SeqNr: 3, PreviousOutcome: encodedPreviousOutcome} + obs, err := p.Observation(context.Background(), outctx, query) + require.NoError(t, err) + decoded, err := p.ObservationCodec.Decode(obs) + require.NoError(t, err) + + assert.Len(t, decoded.AttestedPredecessorRetirement, 0) + assert.False(t, decoded.ShouldRetire) + + assert.Len(t, decoded.UpdateChannelDefinitions, 4) + assert.ElementsMatch(t, []uint32{3, 4, 5, 6}, maps.Keys(decoded.UpdateChannelDefinitions)) + expected := make(llotypes.ChannelDefinitions) + for k, v := range mediumDefinitions { + if k > 2 { // 2 was removed and 1 already present + expected[k] = v + } + } + assert.Equal(t, expected, decoded.UpdateChannelDefinitions) + + assert.Len(t, decoded.RemoveChannelIDs, 1) + assert.Equal(t, map[uint32]struct{}{2: struct{}{}}, decoded.RemoveChannelIDs) + + assert.GreaterOrEqual(t, decoded.UnixTimestampNanoseconds, testStartTS.UnixNano()) + assert.Equal(t, ds.s, decoded.StreamValues) + }) + + largeSize := 100 + require.Greater(t, largeSize, MaxObservationUpdateChannelDefinitionsLength) + largeDefinitions := make(map[llotypes.ChannelID]llotypes.ChannelDefinition, largeSize) + for i := 0; i < largeSize; i++ { + largeDefinitions[llotypes.ChannelID(i)] = llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormatEVM, + ChainSelector: uint64(i), + StreamIDs: []llotypes.StreamID{uint32(i)}, + } + } + cdc.definitions = largeDefinitions + + t.Run("votes to add channels when channel definitions increases by a large amount, and replace some existing channels with different definitions", func(t *testing.T) { + t.Run("first round of additions", func(t *testing.T) { + testStartTS := time.Now() + + previousOutcome := Outcome{ + LifeCycleStage: llotypes.LifeCycleStage("test"), + ObservationsTimestampNanoseconds: testStartTS.UnixNano(), + ChannelDefinitions: smallDefinitions, + ValidAfterSeconds: nil, + StreamMedians: nil, + } + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + outctx := ocr3types.OutcomeContext{SeqNr: 3, PreviousOutcome: encodedPreviousOutcome} + obs, err := p.Observation(context.Background(), outctx, query) + require.NoError(t, err) + decoded, err := p.ObservationCodec.Decode(obs) + require.NoError(t, err) + + assert.Len(t, decoded.AttestedPredecessorRetirement, 0) + assert.False(t, decoded.ShouldRetire) + + // Even though we have a large amount of channel definitions, we should + // only add/replace MaxObservationUpdateChannelDefinitionsLength at a time + assert.Len(t, decoded.UpdateChannelDefinitions, MaxObservationUpdateChannelDefinitionsLength) + expected := make(llotypes.ChannelDefinitions) + for i := 0; i < MaxObservationUpdateChannelDefinitionsLength; i++ { + expected[llotypes.ChannelID(i)] = largeDefinitions[llotypes.ChannelID(i)] + } + + // 1 and 2 are actually replaced since definition is different from the one in smallDefinitions + assert.ElementsMatch(t, []uint32{0, 1, 2, 3, 4}, maps.Keys(decoded.UpdateChannelDefinitions)) + assert.Equal(t, expected, decoded.UpdateChannelDefinitions) + + // Nothing removed + assert.Len(t, decoded.RemoveChannelIDs, 0) + + assert.GreaterOrEqual(t, decoded.UnixTimestampNanoseconds, testStartTS.UnixNano()) + assert.Equal(t, ds.s, decoded.StreamValues) + }) + + t.Run("second round of additions", func(t *testing.T) { + testStartTS := time.Now() + offset := MaxObservationUpdateChannelDefinitionsLength * 2 + + subsetDfns := make(llotypes.ChannelDefinitions) + for i := 0; i < offset; i++ { + subsetDfns[llotypes.ChannelID(i)] = largeDefinitions[llotypes.ChannelID(i)] + } + + previousOutcome := Outcome{ + LifeCycleStage: llotypes.LifeCycleStage("test"), + ObservationsTimestampNanoseconds: testStartTS.UnixNano(), + ChannelDefinitions: subsetDfns, + ValidAfterSeconds: nil, + StreamMedians: nil, + } + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + outctx := ocr3types.OutcomeContext{SeqNr: 3, PreviousOutcome: encodedPreviousOutcome} + obs, err := p.Observation(context.Background(), outctx, query) + require.NoError(t, err) + decoded, err := p.ObservationCodec.Decode(obs) + require.NoError(t, err) + + assert.Len(t, decoded.AttestedPredecessorRetirement, 0) + assert.False(t, decoded.ShouldRetire) + + // Even though we have a large amount of channel definitions, we should + // only add/replace MaxObservationUpdateChannelDefinitionsLength at a time + assert.Len(t, decoded.UpdateChannelDefinitions, MaxObservationUpdateChannelDefinitionsLength) + expected := make(llotypes.ChannelDefinitions) + expectedChannelIDs := []uint32{} + for i := 0; i < MaxObservationUpdateChannelDefinitionsLength; i++ { + expectedChannelIDs = append(expectedChannelIDs, uint32(i+offset)) + expected[llotypes.ChannelID(i+offset)] = largeDefinitions[llotypes.ChannelID(i+offset)] + } + assert.Equal(t, expected, decoded.UpdateChannelDefinitions) + + assert.ElementsMatch(t, expectedChannelIDs, maps.Keys(decoded.UpdateChannelDefinitions)) + + // Nothing removed + assert.Len(t, decoded.RemoveChannelIDs, 0) + + assert.GreaterOrEqual(t, decoded.UnixTimestampNanoseconds, testStartTS.UnixNano()) + assert.Equal(t, ds.s, decoded.StreamValues) + }) + }) + + cdc.definitions = smallDefinitions + + // TODO: huge (greater than max allowed) + + t.Run("votes to remove channel IDs", func(t *testing.T) { + t.Run("first round of removals", func(t *testing.T) { + testStartTS := time.Now() + + previousOutcome := Outcome{ + LifeCycleStage: llotypes.LifeCycleStage("test"), + ObservationsTimestampNanoseconds: testStartTS.UnixNano(), + ChannelDefinitions: largeDefinitions, + ValidAfterSeconds: nil, + StreamMedians: nil, + } + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + outctx := ocr3types.OutcomeContext{SeqNr: 3, PreviousOutcome: encodedPreviousOutcome} + obs, err := p.Observation(context.Background(), outctx, query) + require.NoError(t, err) + decoded, err := p.ObservationCodec.Decode(obs) + require.NoError(t, err) + + assert.Len(t, decoded.AttestedPredecessorRetirement, 0) + assert.False(t, decoded.ShouldRetire) + // will have two items here to account for the change of 1 and 2 in smallDefinitions + assert.Len(t, decoded.UpdateChannelDefinitions, 2) + + // Even though we have a large amount of channel definitions, we should + // only remove MaxObservationRemoveChannelIDsLength at a time + assert.Len(t, decoded.RemoveChannelIDs, MaxObservationRemoveChannelIDsLength) + assert.ElementsMatch(t, []uint32{0, 3, 4, 5, 6}, maps.Keys(decoded.RemoveChannelIDs)) + + assert.GreaterOrEqual(t, decoded.UnixTimestampNanoseconds, testStartTS.UnixNano()) + assert.Equal(t, ds.s, decoded.StreamValues) + }) + t.Run("second round of removals", func(t *testing.T) { + testStartTS := time.Now() + offset := MaxObservationUpdateChannelDefinitionsLength * 2 + + subsetDfns := maps.Clone(largeDefinitions) + for i := 0; i < offset; i++ { + delete(subsetDfns, llotypes.ChannelID(i)) + } + + previousOutcome := Outcome{ + LifeCycleStage: llotypes.LifeCycleStage("test"), + ObservationsTimestampNanoseconds: testStartTS.UnixNano(), + ChannelDefinitions: subsetDfns, + ValidAfterSeconds: nil, + StreamMedians: nil, + } + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + outctx := ocr3types.OutcomeContext{SeqNr: 3, PreviousOutcome: encodedPreviousOutcome} + obs, err := p.Observation(context.Background(), outctx, query) + require.NoError(t, err) + decoded, err := p.ObservationCodec.Decode(obs) + require.NoError(t, err) + + assert.Len(t, decoded.AttestedPredecessorRetirement, 0) + assert.False(t, decoded.ShouldRetire) + // will have two items here to account for the change of 1 and 2 in smallDefinitions + assert.Len(t, decoded.UpdateChannelDefinitions, 2) + + // Even though we have a large amount of channel definitions, we should + // only remove MaxObservationRemoveChannelIDsLength at a time + assert.Len(t, decoded.RemoveChannelIDs, MaxObservationRemoveChannelIDsLength) + assert.ElementsMatch(t, []uint32{10, 11, 12, 13, 14}, maps.Keys(decoded.RemoveChannelIDs)) + + assert.GreaterOrEqual(t, decoded.UnixTimestampNanoseconds, testStartTS.UnixNano()) + assert.Equal(t, ds.s, decoded.StreamValues) + }) + }) +} + +func Test_ValidateObservation(t *testing.T) { + p := &Plugin{ + Config: Config{true}, + } + + t.Run("SeqNr < 1 is not valid", func(t *testing.T) { + err := p.ValidateObservation(ocr3types.OutcomeContext{}, types.Query{}, types.AttributedObservation{}) + assert.EqualError(t, err, "Invalid SeqNr: 0") + }) + t.Run("SeqNr == 1 enforces empty observation", func(t *testing.T) { + err := p.ValidateObservation(ocr3types.OutcomeContext{SeqNr: 1}, types.Query{}, types.AttributedObservation{Observation: []byte{1}}) + assert.EqualError(t, err, "Expected empty observation for first round, got: 0x01") + }) +} + +func Test_Outcome(t *testing.T) { + // cdc := &mockChannelDefinitionCache{} + p := &Plugin{ + Config: Config{true}, + OutcomeCodec: protoOutcomeCodec{}, + // ShouldRetireCache: &mockShouldRetireCache{}, + Logger: logger.Test(t), + ObservationCodec: protoObservationCodec{}, + } + + t.Run("if number of observers < 2f+1, errors", func(t *testing.T) { + _, err := p.Outcome(ocr3types.OutcomeContext{SeqNr: 1}, types.Query{}, []types.AttributedObservation{}) + assert.EqualError(t, err, "invariant violation: expected at least 2f+1 attributed observations, got 0 (f: 0)") + p.F = 1 + _, err = p.Outcome(ocr3types.OutcomeContext{SeqNr: 1}, types.Query{}, []types.AttributedObservation{{}, {}}) + assert.EqualError(t, err, "invariant violation: expected at least 2f+1 attributed observations, got 2 (f: 1)") + }) + + t.Run("if seqnr == 1, and has enough observers, emits initial outcome with 'production' LifeCycleStage", func(t *testing.T) { + outcome, err := p.Outcome(ocr3types.OutcomeContext{SeqNr: 1}, types.Query{}, []types.AttributedObservation{ + { + Observation: []byte{}, + Observer: commontypes.OracleID(0), + }, + { + Observation: []byte{}, + Observer: commontypes.OracleID(1), + }, + { + Observation: []byte{}, + Observer: commontypes.OracleID(2), + }, + { + Observation: []byte{}, + Observer: commontypes.OracleID(3), + }, + }) + require.NoError(t, err) + + decoded, err := p.OutcomeCodec.Decode(outcome) + require.NoError(t, err) + + assert.Equal(t, Outcome{ + LifeCycleStage: "production", + }, decoded) + }) + + t.Run("adds a new channel definition if there are enough votes", func(t *testing.T) { + newCd := llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormat(2), + ChainSelector: 3, + StreamIDs: []llotypes.StreamID{1, 2, 3}, + } + obs, err := p.ObservationCodec.Encode(Observation{ + UpdateChannelDefinitions: map[llotypes.ChannelID]llotypes.ChannelDefinition{ + 42: newCd, + }, + }) + require.NoError(t, err) + aos := []types.AttributedObservation{} + for i := 0; i < 4; i++ { + aos = append(aos, + types.AttributedObservation{ + Observation: obs, + Observer: commontypes.OracleID(i), + }) + } + outcome, err := p.Outcome(ocr3types.OutcomeContext{SeqNr: 2}, types.Query{}, aos) + require.NoError(t, err) + + decoded, err := p.OutcomeCodec.Decode(outcome) + require.NoError(t, err) + + assert.Equal(t, newCd, decoded.ChannelDefinitions[42]) + }) + + t.Run("replaces an existing channel definition if there are enough votes", func(t *testing.T) { + newCd := llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormat(2), + ChainSelector: 3, + StreamIDs: []llotypes.StreamID{1, 2, 3}, + } + obs, err := p.ObservationCodec.Encode(Observation{ + UpdateChannelDefinitions: map[llotypes.ChannelID]llotypes.ChannelDefinition{ + 42: newCd, + }, + }) + require.NoError(t, err) + aos := []types.AttributedObservation{} + for i := 0; i < 4; i++ { + aos = append(aos, + types.AttributedObservation{ + Observation: obs, + Observer: commontypes.OracleID(i), + }) + } + + previousOutcome, err := p.OutcomeCodec.Encode(Outcome{ + ChannelDefinitions: map[llotypes.ChannelID]llotypes.ChannelDefinition{ + 42: { + ReportFormat: llotypes.ReportFormat(1), + ChainSelector: 2, + StreamIDs: []llotypes.StreamID{2, 3, 4}, + }, + }, + }) + require.NoError(t, err) + + outcome, err := p.Outcome(ocr3types.OutcomeContext{PreviousOutcome: previousOutcome, SeqNr: 2}, types.Query{}, aos) + require.NoError(t, err) + + decoded, err := p.OutcomeCodec.Decode(outcome) + require.NoError(t, err) + + assert.Equal(t, newCd, decoded.ChannelDefinitions[42]) + }) + + t.Run("does not add channels beyond MaxOutcomeChannelDefinitionsLength", func(t *testing.T) { + newCd := llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormat(2), + ChainSelector: 3, + StreamIDs: []llotypes.StreamID{1, 2, 3}, + } + obs := Observation{UpdateChannelDefinitions: map[llotypes.ChannelID]llotypes.ChannelDefinition{}} + for i := 0; i < MaxOutcomeChannelDefinitionsLength+10; i++ { + obs.UpdateChannelDefinitions[llotypes.ChannelID(i)] = newCd + } + encoded, err := p.ObservationCodec.Encode(obs) + require.NoError(t, err) + aos := []types.AttributedObservation{} + for i := 0; i < 4; i++ { + aos = append(aos, + types.AttributedObservation{ + Observation: encoded, + Observer: commontypes.OracleID(i), + }) + } + outcome, err := p.Outcome(ocr3types.OutcomeContext{SeqNr: 2}, types.Query{}, aos) + require.NoError(t, err) + + decoded, err := p.OutcomeCodec.Decode(outcome) + require.NoError(t, err) + + assert.Len(t, decoded.ChannelDefinitions, MaxOutcomeChannelDefinitionsLength) + + // should contain channels 0 thru 999 + assert.Contains(t, decoded.ChannelDefinitions, llotypes.ChannelID(0)) + assert.Contains(t, decoded.ChannelDefinitions, llotypes.ChannelID(MaxOutcomeChannelDefinitionsLength-1)) + assert.NotContains(t, decoded.ChannelDefinitions, llotypes.ChannelID(MaxOutcomeChannelDefinitionsLength)) + assert.NotContains(t, decoded.ChannelDefinitions, llotypes.ChannelID(MaxOutcomeChannelDefinitionsLength+1)) + }) +} diff --git a/llo/types.go b/llo/types.go index 6c9ff9f..a1eb271 100644 --- a/llo/types.go +++ b/llo/types.go @@ -25,8 +25,3 @@ type Transmitter interface { // - FromAccount() should return CSA public key ocr3types.ContractTransmitter[llotypes.ReportInfo] } - -type ObsResult[T any] struct { - Val T - Valid bool -}