Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: custom pubkeys inmemory service #194

Merged
merged 14 commits into from
Jul 24, 2024
7 changes: 4 additions & 3 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/Layr-Labs/eigensdk-go/metrics"
"github.com/Layr-Labs/eigensdk-go/services/avsregistry"
blsagg "github.com/Layr-Labs/eigensdk-go/services/bls_aggregation"
oppubkeysserv "github.com/Layr-Labs/eigensdk-go/services/operatorpubkeys"
"github.com/Layr-Labs/eigensdk-go/signerv2"
eigentypes "github.com/Layr-Labs/eigensdk-go/types"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -91,6 +90,7 @@ type Aggregator struct {
restListener RestEventListener
aggregatorListener AggregatorEventListener

operatorRegistrationsService OperatorRegistrationsService
taskBlsAggregationService blsagg.BlsAggregationService
stateRootUpdateBlsAggregationService MessageBlsAggregationService
operatorSetUpdateBlsAggregationService MessageBlsAggregationService
Expand Down Expand Up @@ -200,8 +200,8 @@ func NewAggregator(ctx context.Context, config *config.Config, logger logging.Lo
return nil, err
}

operatorPubkeysService := oppubkeysserv.NewOperatorPubkeysServiceInMemory(ctx, clients.AvsRegistryChainSubscriber, clients.AvsRegistryChainReader, logger)
avsRegistryService := avsregistry.NewAvsRegistryServiceChainCaller(avsReader, operatorPubkeysService, logger)
operatorRegistrationsService := NewOperatorRegistrationsServiceInMemory(ctx, clients.AvsRegistryChainSubscriber, clients.AvsRegistryChainReader, logger)
avsRegistryService := avsregistry.NewAvsRegistryServiceChainCaller(avsReader, operatorRegistrationsService, logger)
taskBlsAggregationService := blsagg.NewBlsAggregatorService(avsRegistryService, logger)
stateRootUpdateBlsAggregationService := NewMessageBlsAggregatorService(avsRegistryService, clients.EthHttpClient, logger)
operatorSetUpdateBlsAggregationService := NewMessageBlsAggregatorService(avsRegistryService, clients.EthHttpClient, logger)
Expand All @@ -217,6 +217,7 @@ func NewAggregator(ctx context.Context, config *config.Config, logger logging.Lo
rollupBroadcaster: rollupBroadcaster,
httpClient: ethHttpClient,
wsClient: ethWsClient,
operatorRegistrationsService: operatorRegistrationsService,
taskBlsAggregationService: taskBlsAggregationService,
stateRootUpdateBlsAggregationService: stateRootUpdateBlsAggregationService,
operatorSetUpdateBlsAggregationService: operatorSetUpdateBlsAggregationService,
Expand Down
23 changes: 13 additions & 10 deletions aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ var MOCK_OPERATOR_BLS_PRIVATE_KEY, _ = bls.NewPrivateKey(MOCK_OPERATOR_BLS_PRIVA
var MOCK_OPERATOR_KEYPAIR = bls.NewKeyPair(MOCK_OPERATOR_BLS_PRIVATE_KEY)
var MOCK_OPERATOR_G1PUBKEY = MOCK_OPERATOR_KEYPAIR.GetPubKeyG1()
var MOCK_OPERATOR_G2PUBKEY = MOCK_OPERATOR_KEYPAIR.GetPubKeyG2()
var MOCK_OPERATOR_PUBKEYS = eigentypes.OperatorPubkeys{
G1Pubkey: MOCK_OPERATOR_G1PUBKEY,
G2Pubkey: MOCK_OPERATOR_G2PUBKEY,
}
var MOCK_OPERATOR_PUBKEY_DICT = map[eigentypes.OperatorId]types.OperatorInfo{
MOCK_OPERATOR_ID: {
OperatorPubkeys: eigentypes.OperatorPubkeys{
G1Pubkey: MOCK_OPERATOR_G1PUBKEY,
G2Pubkey: MOCK_OPERATOR_G2PUBKEY,
},
OperatorAddr: common.Address{},
OperatorPubkeys: MOCK_OPERATOR_PUBKEYS,
OperatorAddr: common.Address{},
},
}

Expand All @@ -54,7 +55,7 @@ func TestSendNewTask(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

aggregator, mockAvsReaderer, mockAvsWriterer, mockTaskBlsAggService, _, _, _, _, mockClient, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
aggregator, mockAvsReaderer, mockAvsWriterer, mockTaskBlsAggService, _, _, _, _, _, mockClient, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.Nil(t, err)

var TASK_INDEX = uint32(0)
Expand Down Expand Up @@ -87,7 +88,7 @@ func TestHandleStateRootUpdateAggregationReachedQuorum(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

aggregator, _, _, _, _, _, mockMsgDb, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
aggregator, _, _, _, _, _, _, mockMsgDb, _, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.Nil(t, err)

msg := messages.StateRootUpdateMessage{}
Expand Down Expand Up @@ -117,7 +118,7 @@ func TestHandleOperatorSetUpdateAggregationReachedQuorum(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

aggregator, _, _, _, _, _, mockMsgDb, mockRollupBroadcaster, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
aggregator, _, _, _, _, _, _, mockMsgDb, mockRollupBroadcaster, _, err := createMockAggregator(mockCtrl, MOCK_OPERATOR_PUBKEY_DICT)
assert.Nil(t, err)

msg := messages.OperatorSetUpdateMessage{}
Expand Down Expand Up @@ -151,7 +152,7 @@ func TestHandleOperatorSetUpdateAggregationReachedQuorum(t *testing.T) {

func createMockAggregator(
mockCtrl *gomock.Controller, operatorPubkeyDict map[eigentypes.OperatorId]types.OperatorInfo,
) (*Aggregator, *chainiomocks.MockAvsReaderer, *chainiomocks.MockAvsWriterer, *blsaggservmock.MockBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *dbmocks.MockDatabaser, *aggmocks.MockRollupBroadcasterer, *safeclientmocks.MockSafeClient, error) {
) (*Aggregator, *chainiomocks.MockAvsReaderer, *chainiomocks.MockAvsWriterer, *blsaggservmock.MockBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *aggmocks.MockOperatorRegistrationsService, *dbmocks.MockDatabaser, *aggmocks.MockRollupBroadcasterer, *safeclientmocks.MockSafeClient, error) {
logger := sdklogging.NewNoopLogger()
mockAvsWriter := chainiomocks.NewMockAvsWriterer(mockCtrl)
mockAvsReader := chainiomocks.NewMockAvsReaderer(mockCtrl)
Expand All @@ -161,6 +162,7 @@ func createMockAggregator(
mockMsgDb := dbmocks.NewMockDatabaser(mockCtrl)
mockRollupBroadcaster := aggmocks.NewMockRollupBroadcasterer(mockCtrl)
mockClient := safeclientmocks.NewMockSafeClient(mockCtrl)
mockOperatorRegistrationsService := aggmocks.NewMockOperatorRegistrationsService(mockCtrl)

aggregator := &Aggregator{
logger: logger,
Expand All @@ -169,6 +171,7 @@ func createMockAggregator(
taskBlsAggregationService: mockTaskBlsAggregationService,
stateRootUpdateBlsAggregationService: mockStateRootUpdateBlsAggregationService,
operatorSetUpdateBlsAggregationService: mockOperatorSetUpdateBlsAggregationService,
operatorRegistrationsService: mockOperatorRegistrationsService,
msgDb: mockMsgDb,
tasks: make(map[coretypes.TaskIndex]taskmanager.CheckpointTask),
taskResponses: make(map[coretypes.TaskIndex]map[eigentypes.TaskResponseDigest]messages.CheckpointTaskResponse),
Expand All @@ -181,5 +184,5 @@ func createMockAggregator(
restListener: &SelectiveRestListener{},
aggregatorListener: &SelectiveAggregatorListener{},
}
return aggregator, mockAvsReader, mockAvsWriter, mockTaskBlsAggregationService, mockStateRootUpdateBlsAggregationService, mockOperatorSetUpdateBlsAggregationService, mockMsgDb, mockRollupBroadcaster, mockClient, nil
return aggregator, mockAvsReader, mockAvsWriter, mockTaskBlsAggregationService, mockStateRootUpdateBlsAggregationService, mockOperatorSetUpdateBlsAggregationService, mockOperatorRegistrationsService, mockMsgDb, mockRollupBroadcaster, mockClient, nil
}
1 change: 1 addition & 0 deletions aggregator/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ package aggregator

//go:generate mockgen -destination=./mocks/message_blsagg.go -package=mocks github.com/NethermindEth/near-sffl/aggregator MessageBlsAggregationService
//go:generate mockgen -destination=./mocks/rollup_broadcaster.go -package=mocks github.com/NethermindEth/near-sffl/aggregator RollupBroadcasterer
//go:generate mockgen -destination=./mocks/operator_registrations_inmemory.go -package=mocks github.com/NethermindEth/near-sffl/aggregator OperatorRegistrationsService
//go:generate mockgen -destination=./mocks/eth_client.go -package=mocks github.com/Layr-Labs/eigensdk-go/chainio/clients/eth Client
71 changes: 71 additions & 0 deletions aggregator/mocks/operator_registrations_inmemory.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

191 changes: 191 additions & 0 deletions aggregator/operator_registrations_inmemory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package aggregator

import (
"context"
"sync"

"github.com/Layr-Labs/eigensdk-go/chainio/clients/avsregistry"
"github.com/Layr-Labs/eigensdk-go/crypto/bls"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/Layr-Labs/eigensdk-go/services/operatorpubkeys"
"github.com/Layr-Labs/eigensdk-go/types"
"github.com/ethereum/go-ethereum/common"
)

type OperatorRegistrationsService interface {
operatorpubkeys.OperatorPubkeysService

GetOperatorPubkeysById(ctx context.Context, operatorId types.OperatorId) (operatorPubkeys types.OperatorPubkeys, operatorFound bool)
}

type OperatorRegistrationsServiceInMemory struct {
avsRegistrySubscriber avsregistry.AvsRegistrySubscriber
avsRegistryReader avsregistry.AvsRegistryReader
logger logging.Logger
queryByAddrC chan<- queryByAddr
queryByIdC chan<- queryById
}

type queryByAddr struct {
operatorAddr common.Address
// channel through which to receive the response (operator pubkeys)
respC chan<- resp
}
type queryById struct {
operatorId types.OperatorId
// channel through which to receive the response (operator pubkeys)
respC chan<- resp
}

type resp struct {
Hyodar marked this conversation as resolved.
Show resolved Hide resolved
operatorPubkeys types.OperatorPubkeys
// false if operators were not present in the pubkey dict
operatorExists bool
}

var _ operatorpubkeys.OperatorPubkeysService = (*OperatorRegistrationsServiceInMemory)(nil)

// NewOperatorRegistrationsServiceInMemory constructs a OperatorRegistrationsServiceInMemory and starts it in a goroutine.
// It takes a context as argument because the "backfilling" of the database is done inside this constructor,
// so we wait for all past NewPubkeyRegistration events to be queried and the db to be filled before returning the service.
// The constructor is thus following a RAII-like pattern, of initializing the serving during construction.
// Using a separate initialize() function might lead to some users forgetting to call it and the service not behaving properly.
func NewOperatorRegistrationsServiceInMemory(
ctx context.Context,
avsRegistrySubscriber avsregistry.AvsRegistrySubscriber,
Hyodar marked this conversation as resolved.
Show resolved Hide resolved
avsRegistryReader avsregistry.AvsRegistryReader,
logger logging.Logger,
) *OperatorRegistrationsServiceInMemory {
queryByAddrC := make(chan queryByAddr)
queryByIdC := make(chan queryById)

pkcs := &OperatorRegistrationsServiceInMemory{
avsRegistrySubscriber: avsRegistrySubscriber,
avsRegistryReader: avsRegistryReader,
logger: logger,
queryByAddrC: queryByAddrC,
queryByIdC: queryByIdC,
}

// We use this waitgroup to wait on the initialization of the inmemory pubkey dict,
// which requires querying the past events of the pubkey registration contract
wg := sync.WaitGroup{}
wg.Add(1)
pkcs.startServiceInGoroutine(ctx, queryByAddrC, queryByIdC, &wg)
wg.Wait()

return pkcs
}

func (ors *OperatorRegistrationsServiceInMemory) startServiceInGoroutine(ctx context.Context, queryByAddrC <-chan queryByAddr, queryByIdC <-chan queryById, wg *sync.WaitGroup) {
go func() {
ors.logger.Debug("Subscribing to new pubkey registration events on blsApkRegistry contract", "service", "OperatorRegistrationsServiceInMemory")
newPubkeyRegistrationC, newPubkeyRegistrationSub, err := ors.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations()
if err != nil {
ors.logger.Error("Fatal error opening websocket subscription for new pubkey registrations", "err", err, "service", "OperatorRegistrationsServiceInMemory")
// see the warning above the struct definition to understand why we panic here
panic(err)
}

pubkeyByAddrDict, pubkeyByIdDict := ors.queryPastRegisteredOperators(ctx)

// The constructor can return after we have backfilled the db by querying the events of operators that have registered with the blsApkRegistry
// before the block at which we started the ws subscription above
wg.Done()
Hyodar marked this conversation as resolved.
Show resolved Hide resolved

for {
select {
case <-ctx.Done():
ors.logger.Infof("OperatorRegistrationsServiceInMemory: Context cancelled, exiting")
Hyodar marked this conversation as resolved.
Show resolved Hide resolved
return

case err := <-newPubkeyRegistrationSub.Err():
ors.logger.Error("Error in websocket subscription for new pubkey registration events. Attempting to reconnect...", "err", err, "service", "OperatorRegistrationsServiceInMemory")
Hyodar marked this conversation as resolved.
Show resolved Hide resolved
newPubkeyRegistrationSub.Unsubscribe()
newPubkeyRegistrationC, newPubkeyRegistrationSub, err = ors.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations()
if err != nil {
ors.logger.Error("Error opening websocket subscription for new pubkey registrations", "err", err, "service", "OperatorRegistrationsServiceInMemory")
// see the warning above the struct definition to understand why we panic here
panic(err)
}
emlautarom1 marked this conversation as resolved.
Show resolved Hide resolved

case newPubkeyRegistrationEvent := <-newPubkeyRegistrationC:
pubkeys := types.OperatorPubkeys{
G1Pubkey: bls.NewG1Point(newPubkeyRegistrationEvent.PubkeyG1.X, newPubkeyRegistrationEvent.PubkeyG1.Y),
G2Pubkey: bls.NewG2Point(newPubkeyRegistrationEvent.PubkeyG2.X, newPubkeyRegistrationEvent.PubkeyG2.Y),
}
operatorId := types.OperatorIdFromPubkey(pubkeys.G1Pubkey)
operatorAddr := newPubkeyRegistrationEvent.Operator

pubkeyByAddrDict[operatorAddr] = pubkeys
pubkeyByIdDict[operatorId] = pubkeys

ors.logger.Debug("Added operator pubkeys to pubkey dict",
"service", "OperatorRegistrationsServiceInMemory",
"block", newPubkeyRegistrationEvent.Raw.BlockNumber,
"operatorAddr", operatorAddr,
"operatorId", operatorId,
"G1pubkey", pubkeyByAddrDict[operatorAddr].G1Pubkey,
"G2pubkey", pubkeyByAddrDict[operatorAddr].G2Pubkey,
)

// Receive a queryByAddr from GetOperatorPubkeys
case operatorPubkeyQuery := <-queryByAddrC:
pubkeys, ok := pubkeyByAddrDict[operatorPubkeyQuery.operatorAddr]
operatorPubkeyQuery.respC <- resp{pubkeys, ok}

// Receive a queryById from GetOperatorPubkeysById
case operatorPubkeyQuery := <-queryByIdC:
pubkeys, ok := pubkeyByIdDict[operatorPubkeyQuery.operatorId]
operatorPubkeyQuery.respC <- resp{pubkeys, ok}
}
}
}()
}

func (ors *OperatorRegistrationsServiceInMemory) queryPastRegisteredOperators(ctx context.Context) (map[common.Address]types.OperatorPubkeys, map[types.OperatorId]types.OperatorPubkeys) {
// Querying with nil startBlock and stopBlock will return all events. It doesn't matter if we queryByAddr some events that we will receive again in the websocket,
// since we will just overwrite the pubkey dict with the same values.
alreadyRegisteredOperatorAddrs, alreadyRegisteredOperatorPubkeys, err := ors.avsRegistryReader.QueryExistingRegisteredOperatorPubKeys(ctx, nil, nil)
Hyodar marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
ors.logger.Error("Fatal error querying existing registered operators", "err", err, "service", "OperatorRegistrationsServiceInMemory")
panic(err)
}
ors.logger.Debug("List of queried operator registration events in blsApkRegistry", "alreadyRegisteredOperatorAddr", alreadyRegisteredOperatorAddrs, "service", "OperatorRegistrationsServiceInMemory")

pubkeyByAddrDict := make(map[common.Address]types.OperatorPubkeys)
pubkeyByIdDict := make(map[types.OperatorId]types.OperatorPubkeys)
for i, operatorAddr := range alreadyRegisteredOperatorAddrs {
operatorPubkeys := alreadyRegisteredOperatorPubkeys[i]
pubkeyByAddrDict[operatorAddr] = operatorPubkeys

operatorId := types.OperatorIdFromPubkey(operatorPubkeys.G1Pubkey)
pubkeyByIdDict[operatorId] = operatorPubkeys
}

return pubkeyByAddrDict, pubkeyByIdDict
}

func (ors *OperatorRegistrationsServiceInMemory) GetOperatorPubkeys(ctx context.Context, operator common.Address) (types.OperatorPubkeys, bool) {
respC := make(chan resp)
ors.queryByAddrC <- queryByAddr{operator, respC}

select {
case <-ctx.Done():
return types.OperatorPubkeys{}, false
case resp := <-respC:
return resp.operatorPubkeys, resp.operatorExists
}
}

func (ors *OperatorRegistrationsServiceInMemory) GetOperatorPubkeysById(ctx context.Context, operatorId types.OperatorId) (types.OperatorPubkeys, bool) {
respC := make(chan resp)
ors.queryByIdC <- queryById{operatorId, respC}

select {
case <-ctx.Done():
return types.OperatorPubkeys{}, false
case resp := <-respC:
return resp.operatorPubkeys, resp.operatorExists
}
}
Loading
Loading