diff --git a/operator/mocks/rpc_client.go b/operator/mocks/rpc_client.go index 95a56ddb..27e6ddf0 100644 --- a/operator/mocks/rpc_client.go +++ b/operator/mocks/rpc_client.go @@ -5,6 +5,7 @@ // // mockgen -destination=./mocks/rpc_client.go -package=mocks github.com/NethermindEth/near-sffl/operator AggregatorRpcClienter // + // Package mocks is a generated GoMock package. package mocks @@ -69,9 +70,11 @@ func (mr *MockAggregatorRpcClienterMockRecorder) GetAggregatedCheckpointMessages } // SendSignedCheckpointTaskResponseToAggregator mocks base method. -func (m *MockAggregatorRpcClienter) SendSignedCheckpointTaskResponseToAggregator(arg0 *messages.SignedCheckpointTaskResponse) { +func (m *MockAggregatorRpcClienter) SendSignedCheckpointTaskResponseToAggregator(arg0 *messages.SignedCheckpointTaskResponse) error { m.ctrl.T.Helper() - m.ctrl.Call(m, "SendSignedCheckpointTaskResponseToAggregator", arg0) + ret := m.ctrl.Call(m, "SendSignedCheckpointTaskResponseToAggregator", arg0) + ret0, _ := ret[0].(error) + return ret0 } // SendSignedCheckpointTaskResponseToAggregator indicates an expected call of SendSignedCheckpointTaskResponseToAggregator. @@ -81,9 +84,11 @@ func (mr *MockAggregatorRpcClienterMockRecorder) SendSignedCheckpointTaskRespons } // SendSignedOperatorSetUpdateToAggregator mocks base method. -func (m *MockAggregatorRpcClienter) SendSignedOperatorSetUpdateToAggregator(arg0 *messages.SignedOperatorSetUpdateMessage) { +func (m *MockAggregatorRpcClienter) SendSignedOperatorSetUpdateToAggregator(arg0 *messages.SignedOperatorSetUpdateMessage) error { m.ctrl.T.Helper() - m.ctrl.Call(m, "SendSignedOperatorSetUpdateToAggregator", arg0) + ret := m.ctrl.Call(m, "SendSignedOperatorSetUpdateToAggregator", arg0) + ret0, _ := ret[0].(error) + return ret0 } // SendSignedOperatorSetUpdateToAggregator indicates an expected call of SendSignedOperatorSetUpdateToAggregator. @@ -93,9 +98,11 @@ func (mr *MockAggregatorRpcClienterMockRecorder) SendSignedOperatorSetUpdateToAg } // SendSignedStateRootUpdateToAggregator mocks base method. -func (m *MockAggregatorRpcClienter) SendSignedStateRootUpdateToAggregator(arg0 *messages.SignedStateRootUpdateMessage) { +func (m *MockAggregatorRpcClienter) SendSignedStateRootUpdateToAggregator(arg0 *messages.SignedStateRootUpdateMessage) error { m.ctrl.T.Helper() - m.ctrl.Call(m, "SendSignedStateRootUpdateToAggregator", arg0) + ret := m.ctrl.Call(m, "SendSignedStateRootUpdateToAggregator", arg0) + ret0, _ := ret[0].(error) + return ret0 } // SendSignedStateRootUpdateToAggregator indicates an expected call of SendSignedStateRootUpdateToAggregator. diff --git a/operator/operator.go b/operator/operator.go index a3649ab1..2b02ee34 100644 --- a/operator/operator.go +++ b/operator/operator.go @@ -173,11 +173,13 @@ func NewOperatorFromConfig(c optypes.NodeConfig) (*Operator, error) { return nil, err } - aggregatorRpcClient, err := NewAggregatorRpcClient(c.AggregatorServerIpPortAddress, operatorId, registryCoordinatorAddress, logger) + // TODO: We never close `httpRpcClient` + httpRpcClient, err := NewHTTPAggregatorRpcClient(c.AggregatorServerIpPortAddress, operatorId, registryCoordinatorAddress, logger) if err != nil { - logger.Error("Cannot create AggregatorRpcClient. Is aggregator running?", "err", err) + logger.Error("Cannot create HTTPAggregatorRpcClient. Is aggregator running?", "err", err) return nil, err } + aggregatorRpcClient := NewAggregatorRpcClient(httpRpcClient, DefaultAggregatorRpcRetry, logger) avsManager, err := NewAvsManager(&c, ethHttpClient, ethWsClient, elChainReader, elChainWriter, txMgr, logger) if err != nil { @@ -219,7 +221,7 @@ func NewOperatorFromConfig(c optypes.NodeConfig) (*Operator, error) { blsKeypair: blsKeyPair, operatorAddr: common.HexToAddress(c.OperatorAddress), aggregatorServerIpPortAddr: c.AggregatorServerIpPortAddress, - aggregatorRpcClient: aggregatorRpcClient, + aggregatorRpcClient: &aggregatorRpcClient, registryCoordinatorAddr: registryCoordinatorAddress, operatorId: operatorId, taskResponseWait: time.Duration(c.TaskResponseWaitMs) * time.Millisecond, diff --git a/operator/rpc_client.go b/operator/rpc_client.go index 88f18158..da171533 100644 --- a/operator/rpc_client.go +++ b/operator/rpc_client.go @@ -2,421 +2,267 @@ package operator import ( "errors" - "fmt" - "net" "net/rpc" - "sync" "time" "github.com/Layr-Labs/eigensdk-go/logging" eigentypes "github.com/Layr-Labs/eigensdk-go/types" - "github.com/ethereum/go-ethereum/common" - "github.com/prometheus/client_golang/prometheus" - "github.com/NethermindEth/near-sffl/core" "github.com/NethermindEth/near-sffl/core/types/messages" + "github.com/ethereum/go-ethereum/common" + "github.com/prometheus/client_golang/prometheus" ) -const ( - ResendInterval = 2 * time.Second - MaxRetries = 10 -) - -type AggregatorRpcClienter interface { - core.Metricable - - SendSignedCheckpointTaskResponseToAggregator(signedCheckpointTaskResponse *messages.SignedCheckpointTaskResponse) - SendSignedStateRootUpdateToAggregator(signedStateRootUpdateMessage *messages.SignedStateRootUpdateMessage) - SendSignedOperatorSetUpdateToAggregator(signedOperatorSetUpdateMessage *messages.SignedOperatorSetUpdateMessage) - GetAggregatedCheckpointMessages(fromTimestamp, toTimestamp uint64) (*messages.CheckpointMessages, error) -} - -type unsentRpcMessage struct { - Message interface{} - Retries int -} - -type AggregatorRpcClient struct { - rpcClientLock sync.RWMutex - rpcClient *rpc.Client - aggregatorIpPortAddr string - registryCoordinatorAddress common.Address - - operatorId eigentypes.OperatorId - notifiedInitialized bool - - unsentMessagesLock sync.Mutex - unsentMessages []unsentRpcMessage - resendTicker *time.Ticker - - logger logging.Logger - listener RpcClientEventListener +type RpcClient interface { + Call(serviceMethod string, args any, reply any) error + // TODO: Do we also want a `Close` method? } -var _ core.Metricable = (*AggregatorRpcClient)(nil) - -func NewAggregatorRpcClient(aggregatorIpPortAddr string, operatorId eigentypes.OperatorId, registryCoordinatorAddress common.Address, logger logging.Logger) (*AggregatorRpcClient, error) { - resendTicker := time.NewTicker(ResendInterval) - - client := &AggregatorRpcClient{ - // set to nil so that we can create an rpc client even if the aggregator is not running - rpcClient: nil, - logger: logger, - aggregatorIpPortAddr: aggregatorIpPortAddr, - registryCoordinatorAddress: registryCoordinatorAddress, - unsentMessages: make([]unsentRpcMessage, 0), - resendTicker: resendTicker, - listener: &SelectiveRpcClientListener{}, - operatorId: operatorId, - } +var _ RpcClient = (*rpc.Client)(nil) - go client.onTick() - return client, nil -} - -func (c *AggregatorRpcClient) EnableMetrics(registry *prometheus.Registry) error { - listener, err := MakeRpcClientMetrics(registry) +func NewHTTPAggregatorRpcClient(aggregatorIpPortAddr string, operatorId eigentypes.OperatorId, expectedRegistryCoordinatorAddress common.Address, logger logging.Logger) (*rpc.Client, error) { + client, err := rpc.DialHTTP("tcp", aggregatorIpPortAddr) if err != nil { - return err - } - - c.listener = listener - return nil -} - -func (c *AggregatorRpcClient) dialAggregatorRpcClient() error { - c.rpcClientLock.Lock() - defer c.rpcClientLock.Unlock() - - if c.rpcClient != nil { - return nil - } - - c.logger.Info("rpc client is nil. Dialing aggregator rpc client") - - client, err := rpc.DialHTTP("tcp", c.aggregatorIpPortAddr) - if err != nil { - c.logger.Error("Error dialing aggregator rpc client", "err", err) - return err + logger.Error("Error dialing aggregator rpc client", "err", err) + return nil, err } var aggregatorRegistryCoordinatorAddress string err = client.Call("Aggregator.GetRegistryCoordinatorAddress", struct{}{}, &aggregatorRegistryCoordinatorAddress) if err != nil { - c.logger.Info("Received error when getting registry coordinator address", "err", err) - return err + logger.Error("Received error when getting registry coordinator address", "err", err) + return nil, err } - if !c.notifiedInitialized { - c.logger.Info("Notifying aggregator of initialization") + logger.Debug("Notifying aggregator of initialization") - var reply bool - err := client.Call("Aggregator.NotifyOperatorInitialization", c.operatorId, &reply) - if err != nil { - c.logger.Error("Error notifying aggregator of initialization", "err", err) - return err - } - - c.notifiedInitialized = true + var reply bool + err = client.Call("Aggregator.NotifyOperatorInitialization", operatorId, &reply) + if err != nil { + logger.Error("Error notifying aggregator of initialization", "err", err) + return nil, err } - if common.HexToAddress(aggregatorRegistryCoordinatorAddress).Cmp(c.registryCoordinatorAddress) != 0 { - c.logger.Fatal("Registry coordinator address from aggregator does not match the one in the config", "aggregator", aggregatorRegistryCoordinatorAddress, "config", c.registryCoordinatorAddress.String()) - return errors.New("mismatching registry coordinator address from aggregator") + if common.HexToAddress(aggregatorRegistryCoordinatorAddress).Cmp(expectedRegistryCoordinatorAddress) != 0 { + logger.Fatal("Registry coordinator address from aggregator does not match the one in the config", "aggregator", aggregatorRegistryCoordinatorAddress, "config", expectedRegistryCoordinatorAddress.String()) + return nil, errors.New("mismatching registry coordinator address from aggregator") } - c.rpcClient = client - - return nil + return client, nil } -func (c *AggregatorRpcClient) InitializeClientIfNotExist() error { - c.rpcClientLock.RLock() - if c.rpcClient != nil { - c.rpcClientLock.RUnlock() - return nil - } - c.rpcClientLock.RUnlock() +type RetryStrategy func(submittedAt time.Time, err error) bool - return c.dialAggregatorRpcClient() +func NeverRetry(_ time.Time, _ error) bool { + return false } -func isShutdownOrNetworkError(err error) bool { - if err == rpc.ErrShutdown { - return true - } +func AlwaysRetry(_ time.Time, _ error) bool { + return true +} - if _, ok := err.(*net.OpError); ok { +func RetryWithDelay(delay time.Duration) RetryStrategy { + return func(submittedAt time.Time, err error) bool { + time.Sleep(delay) return true } - - return false } -func (c *AggregatorRpcClient) handleRpcError(err error) { - if isShutdownOrNetworkError(err) { - go c.handleRpcShutdown() +func RetryIfRecentEnough(ttl time.Duration) RetryStrategy { + return func(submittedAt time.Time, err error) bool { + return time.Since(submittedAt) < ttl } } -func (c *AggregatorRpcClient) handleRpcShutdown() { - c.rpcClientLock.Lock() - defer c.rpcClientLock.Unlock() - - if c.rpcClient != nil { - c.logger.Info("Closing RPC client due to shutdown") - - err := c.rpcClient.Close() - if err != nil { - c.logger.Error("Error closing RPC client", "err", err) - } - - c.rpcClient = nil +func RetryAtMost(retries int) RetryStrategy { + retryCount := 0 + return func(_ time.Time, err error) bool { + result := retryCount < retries + retryCount++ + return result } } -func (c *AggregatorRpcClient) onTick() { - for { - <-c.resendTicker.C - - err := c.InitializeClientIfNotExist() - if err != nil { - c.logger.Error("Error initializing client", "err", err) - continue - } - - c.unsentMessagesLock.Lock() - if len(c.unsentMessages) == 0 { - c.unsentMessagesLock.Unlock() - continue - } - c.unsentMessagesLock.Unlock() - - c.tryResendFromDeque() +func RetryAnd(s1 RetryStrategy, s2 RetryStrategy) RetryStrategy { + return func(submittedAt time.Time, err error) bool { + return s1(submittedAt, err) && s2(submittedAt, err) } } -// Expected to be called with initialized client. -func (c *AggregatorRpcClient) tryResendFromDeque() { - c.rpcClientLock.RLock() - defer c.rpcClientLock.RUnlock() +type RetryFactory = func() RetryStrategy - c.unsentMessagesLock.Lock() - defer c.unsentMessagesLock.Unlock() - - if len(c.unsentMessages) != 0 { - c.logger.Info("Resending messages from queue") - } - - errorPos := 0 - for i := 0; i < len(c.unsentMessages); i++ { - entry := c.unsentMessages[i] - message := entry.Message - - // Assumes client exists - var err error - var reply bool - - switch message := message.(type) { - case *messages.SignedCheckpointTaskResponse: - err = c.rpcClient.Call("Aggregator.ProcessSignedCheckpointTaskResponse", message, &reply) - if err != nil { - c.listener.IncErroredCheckpointSubmissions(true) - } else { - c.listener.IncCheckpointTaskResponseSubmissions(true) - c.listener.ObserveLastCheckpointIdResponded(message.TaskResponse.ReferenceTaskIndex) - } - - case *messages.SignedStateRootUpdateMessage: - err = c.rpcClient.Call("Aggregator.ProcessSignedStateRootUpdateMessage", message, &reply) - if err != nil { - c.listener.IncErroredStateRootUpdateSubmissions(message.Message.RollupId, true) - } else { - c.listener.IncStateRootUpdateSubmissions(message.Message.RollupId, true) - } - - case *messages.SignedOperatorSetUpdateMessage: - err = c.rpcClient.Call("Aggregator.ProcessSignedOperatorSetUpdateMessage", message, &reply) - if err != nil { - c.listener.IncErroredOperatorSetUpdateSubmissions(true) - } else { - c.listener.IncOperatorSetUpdateUpdateSubmissions(true) - c.listener.ObserveLastOperatorSetUpdateIdResponded(message.Message.Id) - } - - default: - panic("unreachable") - } - - if err != nil { - c.logger.Error("Couldn't resend message", "err", err) - - if isShutdownOrNetworkError(err) { - c.logger.Error("Couldn't resend message due to shutdown or network error") - - if errorPos == 0 { - c.unsentMessages = c.unsentMessages[i:] - return - } - - for j := i; j < len(c.unsentMessages); j++ { - rpcMessage := c.unsentMessages[j] - c.unsentMessages[errorPos] = rpcMessage - errorPos++ - } - - break - } - - entry.Retries++ - if entry.Retries >= MaxRetries { - c.logger.Error("Max retries reached, dropping message", "message", fmt.Sprintf("%#v", message)) - continue - } - - c.unsentMessages[errorPos] = entry - errorPos++ - } - } - - c.unsentMessages = c.unsentMessages[:errorPos] - c.listener.ObserveResendQueueSize(len(c.unsentMessages)) +// By default, retry with a delay of 2 seconds between calls, +// at most 10 times, and only if the error is recent enough (24 hours) +// TODO: Discuss the "recent enough" part +func DefaultAggregatorRpcRetry() RetryStrategy { + return RetryAnd( + RetryWithDelay(2*time.Second), RetryAnd( + RetryAtMost(10), + RetryIfRecentEnough(24*time.Hour))) } -func (c *AggregatorRpcClient) sendOperatorMessage(sendCb func() error, message interface{}) { - c.rpcClientLock.RLock() - defer c.rpcClientLock.RUnlock() +type AggregatorRpcClienter interface { + core.Metricable - appendProtected := func() { - c.unsentMessagesLock.Lock() - defer c.unsentMessagesLock.Unlock() + SendSignedCheckpointTaskResponseToAggregator(signedCheckpointTaskResponse *messages.SignedCheckpointTaskResponse) error + SendSignedStateRootUpdateToAggregator(signedStateRootUpdateMessage *messages.SignedStateRootUpdateMessage) error + SendSignedOperatorSetUpdateToAggregator(signedOperatorSetUpdateMessage *messages.SignedOperatorSetUpdateMessage) error + GetAggregatedCheckpointMessages(fromTimestamp, toTimestamp uint64) (*messages.CheckpointMessages, error) +} - c.unsentMessages = append(c.unsentMessages, unsentRpcMessage{Message: message}) - c.listener.ObserveResendQueueSize(len(c.unsentMessages)) - } +type AggregatorRpcClient struct { + rpcClient RpcClient + newRetryStrategy RetryFactory + listener RpcClientEventListener + logger logging.Logger +} - if c.rpcClient == nil { - appendProtected() - return - } +var _ AggregatorRpcClienter = (*AggregatorRpcClient)(nil) - c.logger.Info("Sending request to aggregator") - err := sendCb() - if err != nil { - c.handleRpcError(err) - appendProtected() - return +func NewAggregatorRpcClient(rpcClient RpcClient, retryFactory RetryFactory, logger logging.Logger) AggregatorRpcClient { + return AggregatorRpcClient{ + rpcClient: rpcClient, + newRetryStrategy: retryFactory, + listener: &SelectiveRpcClientListener{}, + logger: logger, } } -func (c *AggregatorRpcClient) sendRequest(sendCb func() error) error { - c.rpcClientLock.RLock() - defer c.rpcClientLock.RUnlock() +func (a *AggregatorRpcClient) SendSignedCheckpointTaskResponseToAggregator(signedCheckpointTaskResponse *messages.SignedCheckpointTaskResponse) error { + a.logger.Info("Sending signed task response header to aggregator", "signedCheckpointTaskResponse", signedCheckpointTaskResponse) - if c.rpcClient == nil { - return errors.New("rpc client is nil") + submittedAt := time.Now() + var reply bool + action := func() error { + err := a.rpcClient.Call("Aggregator.ProcessSignedCheckpointTaskResponse", signedCheckpointTaskResponse, &reply) + if err != nil { + a.logger.Error("Received error from aggregator", "err", err) + } + return err } - c.logger.Info("Sending request to aggregator") + retried := false + shouldRetry := a.newRetryStrategy() + err := action() + for err != nil && shouldRetry(submittedAt, err) { + a.listener.IncErroredCheckpointSubmissions(retried) + err = action() + retried = true + } - err := sendCb() if err != nil { - c.handleRpcError(err) + a.logger.Error("Dropping message after error", "err", err) return err } - c.logger.Info("Request successfully sent to aggregator") + a.logger.Info("Signed task response header accepted by aggregator", "reply", reply) + a.listener.IncCheckpointTaskResponseSubmissions(retried) + a.listener.ObserveLastCheckpointIdResponded(signedCheckpointTaskResponse.TaskResponse.ReferenceTaskIndex) + a.listener.OnMessagesReceived() return nil } -func (c *AggregatorRpcClient) SendSignedCheckpointTaskResponseToAggregator(signedCheckpointTaskResponse *messages.SignedCheckpointTaskResponse) { - c.logger.Info("Sending signed task response header to aggregator", "signedCheckpointTaskResponse", fmt.Sprintf("%#v", signedCheckpointTaskResponse)) +func (a *AggregatorRpcClient) SendSignedStateRootUpdateToAggregator(signedStateRootUpdateMessage *messages.SignedStateRootUpdateMessage) error { + a.logger.Info("Sending signed state root update message to aggregator", "signedStateRootUpdateMessage", signedStateRootUpdateMessage) - c.sendOperatorMessage(func() error { - var reply bool - err := c.rpcClient.Call("Aggregator.ProcessSignedCheckpointTaskResponse", signedCheckpointTaskResponse, &reply) + submittedAt := time.Now() + var reply bool + action := func() error { + err := a.rpcClient.Call("Aggregator.ProcessSignedStateRootUpdateMessage", signedStateRootUpdateMessage, &reply) if err != nil { - c.listener.IncErroredCheckpointSubmissions(false) - - c.logger.Info("Received error from aggregator", "err", err) - return err + a.logger.Error("Received error from aggregator", "err", err) } + return err + } - c.listener.IncCheckpointTaskResponseSubmissions(false) - c.listener.ObserveLastCheckpointIdResponded(signedCheckpointTaskResponse.TaskResponse.ReferenceTaskIndex) - - c.logger.Info("Signed task response header accepted by aggregator.", "reply", reply) - c.listener.OnMessagesReceived() - return nil - }, signedCheckpointTaskResponse) -} - -func (c *AggregatorRpcClient) SendSignedStateRootUpdateToAggregator(signedStateRootUpdateMessage *messages.SignedStateRootUpdateMessage) { - c.logger.Info("Sending signed state root update message to aggregator", "signedStateRootUpdateMessage", fmt.Sprintf("%#v", signedStateRootUpdateMessage)) - - c.sendOperatorMessage(func() error { - var reply bool - err := c.rpcClient.Call("Aggregator.ProcessSignedStateRootUpdateMessage", signedStateRootUpdateMessage, &reply) - if err != nil { - c.listener.IncErroredStateRootUpdateSubmissions(signedStateRootUpdateMessage.Message.RollupId, false) + retried := false + shouldRetry := a.newRetryStrategy() + err := action() + for err != nil && shouldRetry(submittedAt, err) { + a.listener.IncErroredStateRootUpdateSubmissions(signedStateRootUpdateMessage.Message.RollupId, retried) + err = action() + retried = true + } - c.logger.Info("Received error from aggregator", "err", err) - return err - } + if err != nil { + a.logger.Error("Dropping message after error", "err", err) + return err + } - c.listener.IncStateRootUpdateSubmissions(signedStateRootUpdateMessage.Message.RollupId, false) + a.logger.Info("Signed state root update message accepted by aggregator", "reply", reply) + a.listener.IncStateRootUpdateSubmissions(signedStateRootUpdateMessage.Message.RollupId, retried) + a.listener.OnMessagesReceived() - c.logger.Info("Signed state root update message accepted by aggregator.", "reply", reply) - c.listener.OnMessagesReceived() - return nil - }, signedStateRootUpdateMessage) + return nil } -func (c *AggregatorRpcClient) SendSignedOperatorSetUpdateToAggregator(signedOperatorSetUpdateMessage *messages.SignedOperatorSetUpdateMessage) { - c.logger.Info("Sending operator set update message to aggregator", "signedOperatorSetUpdateMessage", fmt.Sprintf("%#v", signedOperatorSetUpdateMessage)) +func (a *AggregatorRpcClient) SendSignedOperatorSetUpdateToAggregator(signedOperatorSetUpdateMessage *messages.SignedOperatorSetUpdateMessage) error { + a.logger.Info("Sending operator set update message to aggregator", "signedOperatorSetUpdateMessage", signedOperatorSetUpdateMessage) - c.sendOperatorMessage(func() error { - var reply bool - err := c.rpcClient.Call("Aggregator.ProcessSignedOperatorSetUpdateMessage", signedOperatorSetUpdateMessage, &reply) + submittedAt := time.Now() + var reply bool + action := func() error { + err := a.rpcClient.Call("Aggregator.ProcessSignedOperatorSetUpdateMessage", signedOperatorSetUpdateMessage, &reply) if err != nil { - c.listener.IncErroredOperatorSetUpdateSubmissions(false) - - c.logger.Info("Received error from aggregator", "err", err) - return err + a.logger.Error("Received error from aggregator", "err", err) } + return err + } - c.listener.IncOperatorSetUpdateUpdateSubmissions(false) - c.listener.ObserveLastOperatorSetUpdateIdResponded(signedOperatorSetUpdateMessage.Message.Id) + retried := false + shouldRetry := a.newRetryStrategy() + err := action() + for err != nil && shouldRetry(submittedAt, err) { + a.listener.IncErroredOperatorSetUpdateSubmissions(retried) + err = action() + retried = true + } - c.logger.Info("Signed operator set update message accepted by aggregator.", "reply", reply) - c.listener.OnMessagesReceived() - return nil - }, signedOperatorSetUpdateMessage) -} + if err != nil { + a.logger.Error("Dropping message after error", "err", err) + return err + } -func (c *AggregatorRpcClient) GetAggregatedCheckpointMessages(fromTimestamp, toTimestamp uint64) (*messages.CheckpointMessages, error) { - c.logger.Info("Getting checkpoint messages from aggregator") + a.logger.Info("Signed operator set update message accepted by aggregator", "reply", reply) + a.listener.IncOperatorSetUpdateUpdateSubmissions(retried) + a.listener.ObserveLastOperatorSetUpdateIdResponded(signedOperatorSetUpdateMessage.Message.Id) + a.listener.OnMessagesReceived() + return nil +} - var checkpointMessages messages.CheckpointMessages +func (a *AggregatorRpcClient) GetAggregatedCheckpointMessages(fromTimestamp, toTimestamp uint64) (*messages.CheckpointMessages, error) { + a.logger.Info("Getting checkpoint messages from aggregator", "fromTimestamp", fromTimestamp, "toTimestamp", toTimestamp) type Args struct { FromTimestamp, ToTimestamp uint64 } - err := c.sendRequest(func() error { - err := c.rpcClient.Call("Aggregator.GetAggregatedCheckpointMessages", &Args{fromTimestamp, toTimestamp}, &checkpointMessages) + submittedAt := time.Now() + var checkpointMessages messages.CheckpointMessages + action := func() error { + err := a.rpcClient.Call("Aggregator.GetAggregatedCheckpointMessages", &Args{fromTimestamp, toTimestamp}, &checkpointMessages) if err != nil { - c.logger.Info("Received error from aggregator", "err", err) - return err + a.logger.Error("Received error from aggregator", "err", err) } + return err + } + + shouldRetry := a.newRetryStrategy() + err := action() + for err != nil && shouldRetry(submittedAt, err) { + err = action() + } - c.logger.Info("Checkpoint messages fetched from aggregator") - return nil - }) + return &checkpointMessages, err +} + +func (c *AggregatorRpcClient) EnableMetrics(registry *prometheus.Registry) error { + listener, err := MakeRpcClientMetrics(registry) if err != nil { - return nil, err + return err } - return &checkpointMessages, nil + c.listener = listener + return nil } diff --git a/operator/rpc_client_test.go b/operator/rpc_client_test.go new file mode 100644 index 00000000..f193e7a2 --- /dev/null +++ b/operator/rpc_client_test.go @@ -0,0 +1,248 @@ +package operator_test + +import ( + "sync" + "testing" + "time" + + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/NethermindEth/near-sffl/core/types/messages" + "github.com/NethermindEth/near-sffl/operator" + "github.com/stretchr/testify/assert" +) + +var _ = operator.RpcClient(&MockRpcClient{}) + +type MockRpcClient struct { + lock sync.Mutex + call func(serviceMethod string, args any, reply any) error +} + +func (self *MockRpcClient) Call(serviceMethod string, args any, reply any) error { + self.lock.Lock() + defer self.lock.Unlock() + + return self.call(serviceMethod, args, reply) +} + +func NoopRpcClient() *MockRpcClient { + return &MockRpcClient{ + call: func(serviceMethod string, args any, reply any) error { return nil }, + } +} + +func TestSendSuccessfulMessages(t *testing.T) { + logger, _ := logging.NewZapLogger(logging.Development) + + rpcClientCallCount := 0 + rpcClient := MockRpcClient{ + call: func(serviceMethod string, args any, reply any) error { + logger.Debug("MockRpcClient.Call", "method", serviceMethod, "args", args) + rpcClientCallCount++ + + return nil + }, + } + + client := operator.NewAggregatorRpcClient(&rpcClient, func() operator.RetryStrategy { return operator.NeverRetry }, logger) + + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + defer wg.Done() + client.SendSignedStateRootUpdateToAggregator(&messages.SignedStateRootUpdateMessage{}) + }() + + go func() { + defer wg.Done() + client.SendSignedOperatorSetUpdateToAggregator(&messages.SignedOperatorSetUpdateMessage{}) + }() + + wg.Wait() + + assert.Equal(t, 2, rpcClientCallCount) +} + +func TestUnboundedRetry(t *testing.T) { + logger, _ := logging.NewZapLogger(logging.Development) + + rpcSuccess := false + rpcFailCount := 0 + rpcClient := MockRpcClient{ + call: func(serviceMethod string, args any, reply any) error { + if rpcFailCount < 2 { + rpcFailCount++ + return assert.AnError + } + + rpcSuccess = true + return nil + }, + } + + client := operator.NewAggregatorRpcClient(&rpcClient, func() operator.RetryStrategy { return operator.AlwaysRetry }, logger) + + client.SendSignedStateRootUpdateToAggregator(&messages.SignedStateRootUpdateMessage{}) + + assert.Equal(t, 2, rpcFailCount) + assert.True(t, rpcSuccess) +} + +func TestUnboundedRetry_Delayed(t *testing.T) { + logger, _ := logging.NewZapLogger(logging.Development) + + rpcSuccess := false + rpcFailCount := 0 + rpcClient := MockRpcClient{ + call: func(serviceMethod string, args any, reply any) error { + if rpcFailCount < 2 { + rpcFailCount++ + return assert.AnError + } + + rpcSuccess = true + return nil + }, + } + + retryFactory := func() operator.RetryStrategy { + return operator.RetryAnd(operator.RetryWithDelay(100*time.Millisecond), operator.AlwaysRetry) + } + client := operator.NewAggregatorRpcClient(&rpcClient, retryFactory, logger) + + startedAt := time.Now() + client.SendSignedCheckpointTaskResponseToAggregator(&messages.SignedCheckpointTaskResponse{}) + execTime := time.Since(startedAt) + + assert.True(t, execTime > 180*time.Millisecond) + assert.True(t, execTime < 220*time.Millisecond) + assert.Equal(t, 2, rpcFailCount) + assert.True(t, rpcSuccess) +} + +func TestRetryAtMost(t *testing.T) { + logger, _ := logging.NewZapLogger(logging.Development) + + rpcFailCount := 0 + rpcClient := MockRpcClient{ + call: func(serviceMethod string, args any, reply any) error { + rpcFailCount++ + return assert.AnError + }, + } + + client := operator.NewAggregatorRpcClient(&rpcClient, func() operator.RetryStrategy { return operator.RetryAtMost(4) }, logger) + + client.SendSignedOperatorSetUpdateToAggregator(&messages.SignedOperatorSetUpdateMessage{}) + + assert.Equal(t, 5, rpcFailCount) // 1 run, 4 retries +} + +func TestRetryAtMost_Concurrent(t *testing.T) { + logger, _ := logging.NewZapLogger(logging.Development) + + rpcFailCount := 0 + rpcClient := MockRpcClient{ + call: func(serviceMethod string, args any, reply any) error { + rpcFailCount++ + return assert.AnError + }, + } + + client := operator.NewAggregatorRpcClient(&rpcClient, func() operator.RetryStrategy { return operator.RetryAtMost(4) }, logger) + + wg := sync.WaitGroup{} + wg.Add(3) + go func() { + defer wg.Done() + client.SendSignedCheckpointTaskResponseToAggregator(&messages.SignedCheckpointTaskResponse{}) + }() + go func() { + defer wg.Done() + client.SendSignedStateRootUpdateToAggregator(&messages.SignedStateRootUpdateMessage{}) + }() + go func() { + defer wg.Done() + client.SendSignedOperatorSetUpdateToAggregator(&messages.SignedOperatorSetUpdateMessage{}) + }() + wg.Wait() + + assert.Equal(t, 15, rpcFailCount) // 1 run, 4 retries for each of the 3 calls = 15 calls in total +} + +func TestRetryLaterIfRecentEnough(t *testing.T) { + logger, _ := logging.NewZapLogger(logging.Development) + + rpcFailCount := 0 + rpcClient := MockRpcClient{ + call: func(serviceMethod string, args any, reply any) error { + time.Sleep(100 * time.Millisecond) + + rpcFailCount++ + return assert.AnError + }, + } + + client := operator.NewAggregatorRpcClient(&rpcClient, func() operator.RetryStrategy { return operator.RetryIfRecentEnough(500 * time.Millisecond) }, logger) + + client.SendSignedStateRootUpdateToAggregator(&messages.SignedStateRootUpdateMessage{}) + + assert.Equal(t, 5, rpcFailCount) +} + +func TestGetAggregatedCheckpointMessages(t *testing.T) { + logger, _ := logging.NewZapLogger(logging.Development) + + expected := messages.CheckpointMessages{ + StateRootUpdateMessages: []messages.StateRootUpdateMessage{{BlockHeight: 100}}, + } + + rpcClient := MockRpcClient{ + call: func(serviceMethod string, args any, reply any) error { + switch v := reply.(type) { + case *messages.CheckpointMessages: + *v = expected + } + return nil + }, + } + + client := operator.NewAggregatorRpcClient(&rpcClient, func() operator.RetryStrategy { return operator.NeverRetry }, logger) + + response, err := client.GetAggregatedCheckpointMessages(0, 0) + + assert.NoError(t, err) + assert.Equal(t, expected, *response) +} + +func TestGetAggregatedCheckpointMessagesRetry(t *testing.T) { + logger, _ := logging.NewZapLogger(logging.Development) + + expected := messages.CheckpointMessages{ + StateRootUpdateMessages: []messages.StateRootUpdateMessage{{BlockHeight: 100}}, + } + + rpcFailCount := 0 + rpcClient := MockRpcClient{ + call: func(serviceMethod string, args any, reply any) error { + if rpcFailCount < 2 { + rpcFailCount++ + return assert.AnError + } + + switch v := reply.(type) { + case *messages.CheckpointMessages: + *v = expected + } + return nil + }, + } + + client := operator.NewAggregatorRpcClient(&rpcClient, func() operator.RetryStrategy { return operator.AlwaysRetry }, logger) + + response, err := client.GetAggregatedCheckpointMessages(0, 0) + + assert.NoError(t, err) + assert.Equal(t, expected, *response) + assert.Equal(t, 2, rpcFailCount) +} diff --git a/tests/integration/integration_test.go b/tests/integration/integration_test.go index 03ff51c1..4826c4bf 100644 --- a/tests/integration/integration_test.go +++ b/tests/integration/integration_test.go @@ -212,14 +212,14 @@ func setupTestEnv(t *testing.T, ctx context.Context) *testEnv { t.Fatalf("Failed to create logger: %s", err.Error()) } - nodeConfig, _, _ := genOperatorConfig(t, ctx, "3", mainnetAnvil, rollupAnvils, rabbitMq) - addresses, registryRollups, registryRollupAuths, _ := deployRegistryRollups(t, rollupAnvils) - operator := startOperator(t, ctx, nodeConfig) config := buildConfig(t, sfflDeploymentRaw, addresses, rollupAnvils, configRaw) aggregator := startAggregator(t, ctx, config, logger) + nodeConfig, _, _ := genOperatorConfig(t, ctx, "3", mainnetAnvil, rollupAnvils, rabbitMq) + operator := startOperator(t, ctx, nodeConfig) + avsReader, err := chainio.BuildAvsReader(common.HexToAddress(sfflDeploymentRaw.Addresses.RegistryCoordinatorAddr), common.HexToAddress(sfflDeploymentRaw.Addresses.OperatorStateRetrieverAddr), mainnetAnvil.HttpClient, logger) if err != nil { t.Fatalf("Cannot create AVS Reader: %s", err.Error())