Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TTL to Operator RPC Client #259

Closed
wants to merge 34 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
0b420fb
Initial V2 impl for `rpc_client`
emlautarom1 Jun 19, 2024
27f918b
Add `retry later`
emlautarom1 Jun 19, 2024
ae9904e
Add `submittedAt` to actions
emlautarom1 Jun 19, 2024
9e1e849
Extract retry and retry later strategies
emlautarom1 Jun 19, 2024
890ac87
Add `retryIfRecentEnough`
emlautarom1 Jun 19, 2024
9a46a4f
Add `TestRetryLaterIfRecentEnough`
emlautarom1 Jun 19, 2024
4fef16a
Rename retry mechanism
emlautarom1 Jun 19, 2024
8e558f5
Test `retryAtMost`
emlautarom1 Jun 19, 2024
3950423
Store `retryPredicate` in struct member
emlautarom1 Jun 19, 2024
e0a82a0
Separate impl from tests
emlautarom1 Jun 19, 2024
6a7c004
Rename to `AggregatorRpcClient`
emlautarom1 Jun 19, 2024
6b0107d
Don't use `self`
emlautarom1 Jun 19, 2024
3fc60c6
Abstract `rpc.Client`
emlautarom1 Jun 19, 2024
658f752
Add `NewHTTPAggregatorRpcClient`
emlautarom1 Jun 19, 2024
17c134c
Add `AggregatorRpcClienter` interface
emlautarom1 Jun 19, 2024
ca0820a
Remove interface (single implementation)
emlautarom1 Jun 19, 2024
b64cb17
Rename client method names
emlautarom1 Jun 19, 2024
5eae2a1
Add `TestGetAggregatedCheckpointMessages`
emlautarom1 Jun 19, 2024
a0f833f
Make `GetAggregatedCheckpointMessages` blocking
emlautarom1 Jun 19, 2024
b5f5207
Replace event loop with blocking code
emlautarom1 Jun 20, 2024
24b027e
Add `Metricable` interface to `AggregatorRpcClient`
emlautarom1 Jun 20, 2024
cf09334
Use listener in `SendSignedCheckpointTaskResponseToAggregator`
emlautarom1 Jun 20, 2024
5ec7d28
Use listener in `SendSignedStateRootUpdateToAggregator`
emlautarom1 Jun 20, 2024
e576b5f
Use listener in `SendSignedOperatorSetUpdateToAggregator`
emlautarom1 Jun 20, 2024
22f5264
Add logs to `GetAggregatedCheckpointMessages`
emlautarom1 Jun 20, 2024
fefaa66
Add `RetryWithDelay`
emlautarom1 Jun 20, 2024
f3d197c
Rename parameter
emlautarom1 Jun 20, 2024
fc5665b
Add `RetryAnd` to compose rather than decorators
emlautarom1 Jun 20, 2024
f71d642
Add `DefaultRetryStrategy`
emlautarom1 Jun 20, 2024
6a6db7d
Start aggregator before operator
emlautarom1 Jun 20, 2024
28b0005
Replace original with v2
emlautarom1 Jun 20, 2024
7bcc7ff
Increase coverage by using different messages
emlautarom1 Jun 20, 2024
f4ee227
Ensure retry strategy is local to each method call
emlautarom1 Jun 20, 2024
9f3809e
Typo
emlautarom1 Jun 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions operator/mocks/rpc_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,13 @@ func NewOperatorFromConfig(c optypes.NodeConfig) (*Operator, error) {
return nil, err
}

aggregatorRpcClient, err := NewAggregatorRpcClient(c.AggregatorServerIpPortAddress, operatorId, registryCoordinatorAddress, logger)
// TODO: We never close `httpRpcClient`
httpRpcClient, err := NewHTTPAggregatorRpcClient(c.AggregatorServerIpPortAddress, operatorId, registryCoordinatorAddress, logger)
Comment on lines +176 to +177
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up to discussion: how do we want to handle this dependency, in particular closing it?

if err != nil {
logger.Error("Cannot create AggregatorRpcClient. Is aggregator running?", "err", err)
logger.Error("Cannot create HTTPAggregatorRpcClient. Is aggregator running?", "err", err)
return nil, err
}
aggregatorRpcClient := NewAggregatorRpcClient(httpRpcClient, DefaultAggregatorRpcRetry, logger)

avsManager, err := NewAvsManager(&c, ethHttpClient, ethWsClient, elChainReader, elChainWriter, txMgr, logger)
if err != nil {
Expand Down Expand Up @@ -219,7 +221,7 @@ func NewOperatorFromConfig(c optypes.NodeConfig) (*Operator, error) {
blsKeypair: blsKeyPair,
operatorAddr: common.HexToAddress(c.OperatorAddress),
aggregatorServerIpPortAddr: c.AggregatorServerIpPortAddress,
aggregatorRpcClient: aggregatorRpcClient,
aggregatorRpcClient: &aggregatorRpcClient,
registryCoordinatorAddr: registryCoordinatorAddress,
operatorId: operatorId,
taskResponseWait: time.Duration(c.TaskResponseWaitMs) * time.Millisecond,
Expand Down
Loading
Loading