Skip to content

Commit

Permalink
feat(cannon): Add Blockprint Block Classification Deriving (#227)
Browse files Browse the repository at this point in the history
* feat(cannon): Add Blockprint Block Classification Deriving

* refactor: Simplify BlocksPerClient function

* feat(event): add blockprint block classification event

* Merge master

* refactor(block_classification): Remove unnecessary log statement

* refactor: Add sleep to avoid API hammering

* fix: Correct client name for Grandine
  • Loading branch information
samcm authored Oct 5, 2023
1 parent 0d93680 commit 38a15fe
Show file tree
Hide file tree
Showing 30 changed files with 3,179 additions and 1,842 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ proto:
protoc --proto_path=./ --go_opt=module=github.com/ethpandaops/xatu/pkg/proto/eth/v1 --go_out=./pkg/proto/eth/v1/ pkg/proto/eth/v1/*.proto
protoc --proto_path=./ --go_opt=module=github.com/ethpandaops/xatu/pkg/proto/eth/v2 --go_out=./pkg/proto/eth/v2/ pkg/proto/eth/v2/*.proto
protoc --proto_path=./ --proto_path=./pkg/proto/eth/v1 --proto_path=./pkg/proto/eth/v2 --go_opt=module=github.com/ethpandaops/xatu/pkg/proto/xatu --go-grpc_out=. --go-grpc_opt=paths=source_relative --go_out=./pkg/proto/xatu pkg/proto/xatu/*.proto
protoc --proto_path=./ --go_opt=module=github.com/ethpandaops/xatu/pkg/proto/blockprint --go_out=./pkg/proto/blockprint pkg/proto/blockprint/*.proto
61 changes: 61 additions & 0 deletions pkg/cannon/blockprint/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package blockprint

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
)

type Client struct {
endpoint string
httpClient *http.Client
headers map[string]string
}

func NewClient(endpoint string, headers map[string]string) *Client {
return &Client{
endpoint: endpoint,
httpClient: http.DefaultClient,
headers: headers,
}
}

func (c *Client) get(ctx context.Context, path string) (json.RawMessage, error) {
req, err := http.NewRequestWithContext(ctx, "GET", c.endpoint+path, http.NoBody)
if err != nil {
return nil, err
}

// Set headers from c.headers
for k, v := range c.headers {
req.Header.Set(k, v)
}

rsp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}

defer rsp.Body.Close()

if rsp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("status code: %d", rsp.StatusCode)
}

data, err := io.ReadAll(rsp.Body)
if err != nil {
return nil, err
}

data = bytes.TrimPrefix(data, []byte("\xef\xbb\xbf"))

resp := new(json.RawMessage)
if err := json.Unmarshal(data, &resp); err != nil {
return nil, err
}

return *resp, nil
}
52 changes: 52 additions & 0 deletions pkg/cannon/blockprint/clients.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package blockprint

type ProbabilityMap map[ClientName]float64

type ClientName string

var (
ClientNameUnknown = ClientName("Unknown")
ClientNameUncertain = ClientName("Uncertain")
ClientNamePrysm = ClientName("Prysm")
ClientNameLighthouse = ClientName("Lighthouse")
ClientNameLodestar = ClientName("Lodestar")
ClientNameNimbus = ClientName("Nimbus")
ClientNameTeku = ClientName("Teku")
ClientNameGrandine = ClientName("Grandine")
)

func (p ProbabilityMap) safeGet(name ClientName) float64 {
if p[name] == 0 {
return 0
}

return p[name]
}

func (p ProbabilityMap) Prysm() float64 {
return p.safeGet(ClientNamePrysm)
}

func (p ProbabilityMap) Lighthouse() float64 {
return p.safeGet(ClientNameLighthouse)
}

func (p ProbabilityMap) Lodestar() float64 {
return p.safeGet(ClientNameLodestar)
}

func (p ProbabilityMap) Nimbus() float64 {
return p.safeGet(ClientNameNimbus)
}

func (p ProbabilityMap) Teku() float64 {
return p.safeGet(ClientNameTeku)
}

func (p ProbabilityMap) Grandine() float64 {
return p.safeGet(ClientNameGrandine)
}

func (p ProbabilityMap) Uncertain() float64 {
return p.safeGet(ClientNameUncertain)
}
44 changes: 44 additions & 0 deletions pkg/cannon/blockprint/private.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package blockprint

import (
"context"
"encoding/json"
"fmt"

"github.com/attestantio/go-eth2-client/spec/phase0"
)

type ProposersBlocksResponse struct {
//nolint:tagliatelle // Defined by API.
ProposerIndex uint64 `json:"proposer_index"`
Slot uint64 `json:"slot"`
//nolint:tagliatelle // Defined by API.
BestGuessSingle ClientName `json:"best_guess_single"`
//nolint:tagliatelle // Defined by API.
BestGuessMulti string `json:"best_guess_multi"`
//nolint:tagliatelle // Defined by API.
ProbabilityMap *ProbabilityMap `json:"probability_map"`
}

func (c *Client) BlocksRange(ctx context.Context, startSlot phase0.Slot, endSlot ...phase0.Slot) ([]*ProposersBlocksResponse, error) {
var path string
if len(endSlot) > 0 {
path = fmt.Sprintf("/blocks/%d/%d", startSlot, endSlot[0])
} else {
path = fmt.Sprintf("/blocks/%d", startSlot)
}

data, err := c.get(ctx, path)
if err != nil {
return nil, err
}

var result []*ProposersBlocksResponse

err = json.Unmarshal(data, &result)
if err != nil {
return nil, err
}

return result, nil
}
63 changes: 63 additions & 0 deletions pkg/cannon/blockprint/public.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package blockprint

import (
"context"
"encoding/json"
)

type BlocksPerClientResponse struct {
//nolint:tagliatelle // Defined by API.
Uncertain uint64 `json:"Uncertain"`
//nolint:tagliatelle // Defined by API.
Lighthouse uint64 `json:"Lighthouse"`
//nolint:tagliatelle // Defined by API.
Lodestar uint64 `json:"Lodestar"`
//nolint:tagliatelle // Defined by API.
Nimbus uint64 `json:"Nimbus"`
//nolint:tagliatelle // Defined by API.
Other uint64 `json:"Other"`
//nolint:tagliatelle // Defined by API.
Prysm uint64 `json:"Prysm"`
//nolint:tagliatelle // Defined by API.
Teku uint64 `json:"Teku"`
//nolint:tagliatelle // Defined by API.
Grandine uint64 `json:"Grandine"`
}

type SyncStatusResponse struct {
//nolint:tagliatelle // Defined by API.
GreatestBlockSlot uint64 `json:"greatest_block_slot"`
Synced bool `json:"synced"`
}

func (c *Client) BlocksPerClient(ctx context.Context, startEpoch, endEpoch string) (*BlocksPerClientResponse, error) {
data, err := c.get(ctx, "/blocks_per_client/"+startEpoch+"/"+endEpoch)
if err != nil {
return nil, err
}

var result BlocksPerClientResponse

err = json.Unmarshal(data, &result)
if err != nil {
return nil, err
}

return &result, nil
}

func (c *Client) SyncStatus(ctx context.Context) (*SyncStatusResponse, error) {
data, err := c.get(ctx, "/sync/status")
if err != nil {
return nil, err
}

var result SyncStatusResponse

err = json.Unmarshal(data, &result)
if err != nil {
return nil, err
}

return &result, nil
}
31 changes: 25 additions & 6 deletions pkg/cannon/cannon.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import (
_ "net/http/pprof"

"github.com/beevik/ntp"
aBlockprint "github.com/ethpandaops/xatu/pkg/cannon/blockprint"
"github.com/ethpandaops/xatu/pkg/cannon/coordinator"
"github.com/ethpandaops/xatu/pkg/cannon/deriver"
v2 "github.com/ethpandaops/xatu/pkg/cannon/deriver/beacon/eth/v2"
"github.com/ethpandaops/xatu/pkg/cannon/deriver/blockprint"
"github.com/ethpandaops/xatu/pkg/cannon/ethereum"
"github.com/ethpandaops/xatu/pkg/cannon/iterator"
"github.com/ethpandaops/xatu/pkg/observability"
Expand Down Expand Up @@ -328,8 +330,15 @@ func (c *Cannon) startBeaconBlockProcessor(ctx context.Context) error {

checkpointIteratorMetrics := iterator.NewCheckpointMetrics("xatu_cannon")

blockprintIteratorMetrics := iterator.NewBlockprintMetrics("xatu_cannon")

finalizedCheckpoint := "finalized"

blockprintClient := aBlockprint.NewClient(
c.Config.Derivers.BlockClassificationConfig.Endpoint,
c.Config.Derivers.BlockClassificationConfig.Headers,
)

eventDerivers := []deriver.EventDeriver{
v2.NewAttesterSlashingDeriver(
c.log,
Expand Down Expand Up @@ -467,6 +476,22 @@ func (c *Cannon) startBeaconBlockProcessor(ctx context.Context) error {
c.beacon,
clientMeta,
),
blockprint.NewBlockClassificationDeriver(
c.log,
&c.Config.Derivers.BlockClassificationConfig,
iterator.NewBlockprintIterator(
c.log,
networkName,
networkID,
xatu.CannonType_BLOCKPRINT_BLOCK_CLASSIFICATION,
c.coordinatorClient,
&blockprintIteratorMetrics,
blockprintClient,
),
c.beacon,
clientMeta,
blockprintClient,
),
}

c.eventDerivers = eventDerivers
Expand All @@ -478,12 +503,6 @@ func (c *Cannon) startBeaconBlockProcessor(ctx context.Context) error {
return c.handleNewDecoratedEvents(ctx, events)
})

d.OnLocationUpdated(ctx, func(ctx context.Context, location uint64) error {
c.metrics.SetDeriverLocation(location, d.CannonType(), string(c.beacon.Metadata().Network.Name))

return nil
})

c.log.
WithField("deriver", deriver.Name()).
WithField("type", deriver.CannonType()).
Expand Down
23 changes: 6 additions & 17 deletions pkg/cannon/deriver/beacon/eth/v2/attester_slashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,12 @@ type AttesterSlashingDeriverConfig struct {
}

type AttesterSlashingDeriver struct {
log logrus.FieldLogger
cfg *AttesterSlashingDeriverConfig
iterator *iterator.CheckpointIterator
onEventsCallbacks []func(ctx context.Context, events []*xatu.DecoratedEvent) error
onLocationCallbacks []func(ctx context.Context, loc uint64) error
beacon *ethereum.BeaconNode
clientMeta *xatu.ClientMeta
log logrus.FieldLogger
cfg *AttesterSlashingDeriverConfig
iterator *iterator.CheckpointIterator
onEventsCallbacks []func(ctx context.Context, events []*xatu.DecoratedEvent) error
beacon *ethereum.BeaconNode
clientMeta *xatu.ClientMeta
}

func NewAttesterSlashingDeriver(log logrus.FieldLogger, config *AttesterSlashingDeriverConfig, iter *iterator.CheckpointIterator, beacon *ethereum.BeaconNode, clientMeta *xatu.ClientMeta) *AttesterSlashingDeriver {
Expand All @@ -63,10 +62,6 @@ func (a *AttesterSlashingDeriver) OnEventsDerived(ctx context.Context, fn func(c
a.onEventsCallbacks = append(a.onEventsCallbacks, fn)
}

func (a *AttesterSlashingDeriver) OnLocationUpdated(ctx context.Context, fn func(ctx context.Context, loc uint64) error) {
a.onLocationCallbacks = append(a.onLocationCallbacks, fn)
}

func (a *AttesterSlashingDeriver) Start(ctx context.Context) error {
if !a.cfg.Enabled {
a.log.Info("Attester slashing deriver disabled")
Expand Down Expand Up @@ -118,12 +113,6 @@ func (a *AttesterSlashingDeriver) run(rctx context.Context) {
// Look ahead
a.lookAheadAtLocations(ctx, lookAhead)

for _, fn := range a.onLocationCallbacks {
if errr := fn(ctx, location.GetEthV2BeaconBlockAttesterSlashing().GetEpoch()); errr != nil {
a.log.WithError(errr).Error("Failed to send location")
}
}

// Process the epoch
events, err := a.processEpoch(ctx, phase0.Epoch(location.GetEthV2BeaconBlockAttesterSlashing().GetEpoch()))
if err != nil {
Expand Down
27 changes: 7 additions & 20 deletions pkg/cannon/deriver/beacon/eth/v2/beacon_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,12 @@ type BeaconBlockDeriverConfig struct {
}

type BeaconBlockDeriver struct {
log logrus.FieldLogger
cfg *BeaconBlockDeriverConfig
iterator *iterator.CheckpointIterator
onEventsCallbacks []func(ctx context.Context, events []*xatu.DecoratedEvent) error
onLocationCallbacks []func(ctx context.Context, location uint64) error
beacon *ethereum.BeaconNode
clientMeta *xatu.ClientMeta
log logrus.FieldLogger
cfg *BeaconBlockDeriverConfig
iterator *iterator.CheckpointIterator
onEventsCallbacks []func(ctx context.Context, events []*xatu.DecoratedEvent) error
beacon *ethereum.BeaconNode
clientMeta *xatu.ClientMeta
}

func NewBeaconBlockDeriver(log logrus.FieldLogger, config *BeaconBlockDeriverConfig, iter *iterator.CheckpointIterator, beacon *ethereum.BeaconNode, clientMeta *xatu.ClientMeta) *BeaconBlockDeriver {
Expand All @@ -68,10 +67,6 @@ func (b *BeaconBlockDeriver) OnEventsDerived(ctx context.Context, fn func(ctx co
b.onEventsCallbacks = append(b.onEventsCallbacks, fn)
}

func (b *BeaconBlockDeriver) OnLocationUpdated(ctx context.Context, fn func(ctx context.Context, location uint64) error) {
b.onLocationCallbacks = append(b.onLocationCallbacks, fn)
}

func (b *BeaconBlockDeriver) Start(ctx context.Context) error {
if !b.cfg.Enabled {
b.log.Info("Beacon block deriver disabled")
Expand Down Expand Up @@ -134,15 +129,7 @@ func (b *BeaconBlockDeriver) run(rctx context.Context) {
// Look ahead
b.lookAheadAtLocation(ctx, lookAhead)

span.AddEvent("Look ahead complete. Firing location callbacks...")

for _, fn := range b.onLocationCallbacks {
if errr := fn(ctx, location.GetEthV2BeaconBlock().GetEpoch()); errr != nil {
b.log.WithError(errr).Error("Failed to send location")
}
}

span.AddEvent("Location callbacks complete. Processing epoch...")
span.AddEvent("Look ahead complete. Processing epoch...")

// Process the epoch
events, err := b.processEpoch(ctx, phase0.Epoch(location.GetEthV2BeaconBlock().GetEpoch()))
Expand Down
Loading

0 comments on commit 38a15fe

Please sign in to comment.