Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implemented RPC Manager for RPC calls #1260

Draft
wants to merge 11 commits into
base: develop
Choose a base branch
from
185 changes: 185 additions & 0 deletions RPC/rpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package RPC

import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/ethclient"
"os"
"path/filepath"
"razor/logger"
"sort"
"strings"
"sync"
"time"
)

var log = logger.NewLogger()

func (m *RPCManager) calculateMetrics(endpoint *RPCEndpoint) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

client, err := ethclient.DialContext(ctx, endpoint.URL)
if err != nil {
return fmt.Errorf("failed to connect to RPC: %w", err)
}

start := time.Now()
blockNumber, err := client.BlockNumber(ctx)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return fmt.Errorf("RPC call timed out: %w", err)
}
return fmt.Errorf("RPC call failed: %w", err)
}
latency := time.Since(start).Seconds()

endpoint.BlockNumber = blockNumber
endpoint.Latency = latency
endpoint.Client = client // Store the client for future use

return nil
}

// updateAndSortEndpoints calculates metrics and sorts the endpoints
func (m *RPCManager) updateAndSortEndpoints() error {
if len(m.Endpoints) == 0 {
return fmt.Errorf("no endpoints available to update")
}

var wg sync.WaitGroup
log.Debug("Starting concurrent metrics calculation for all endpoints...")

for _, endpoint := range m.Endpoints {
wg.Add(1)
go func(ep *RPCEndpoint) {
defer wg.Done()
if err := m.calculateMetrics(ep); err != nil {
log.Printf("Error calculating metrics for endpoint %s: %v", ep.URL, err)
}
}(endpoint)
}
wg.Wait()

log.Debug("Concurrent metrics calculation complete. Sorting endpoints...")

m.mutex.Lock()
defer m.mutex.Unlock()

sort.Slice(m.Endpoints, func(i, j int) bool {
if m.Endpoints[i].BlockNumber == m.Endpoints[j].BlockNumber {
return m.Endpoints[i].Latency < m.Endpoints[j].Latency
}
return m.Endpoints[i].BlockNumber > m.Endpoints[j].BlockNumber
})

// Update the best RPC client after sorting
m.BestRPCClient = m.Endpoints[0].Client

return nil
}

// RefreshEndpoints will update and sort the endpoints.
func (m *RPCManager) RefreshEndpoints() error {
if err := m.updateAndSortEndpoints(); err != nil {
return fmt.Errorf("failed to refresh endpoints: %w", err)
}
log.Infof("Endpoints refreshed successfully")
return nil
}

func InitializeRPCManager(provider string) (*RPCManager, error) {
// Fetch the absolute path to the PROJECT directory and locate endpoints.json file
projectDir := filepath.Join("..", "..") // Relative path from build output
endpointsFile := filepath.Join(projectDir, "endpoints.json")

fileData, err := os.ReadFile(endpointsFile)
if err != nil {
return nil, fmt.Errorf("failed to read endpoints.json: %w", err)
}

// Unmarshal the JSON file into a list of RPC endpoints
var rpcEndpointsList []string
err = json.Unmarshal(fileData, &rpcEndpointsList)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal endpoints.json: %w", err)
}

// Normalize provider input and check if it's already in the list
provider = strings.TrimSpace(provider) // Trim whitespace from the input
providerFound := false
for _, endpoint := range rpcEndpointsList {
if endpoint == provider {
providerFound = true
break
}
}

// If the provider is not found, add it to the list
if !providerFound && provider != "" {
log.Infof("Adding user-provided endpoint: %s", provider)
rpcEndpointsList = append(rpcEndpointsList, provider)
}

// Initialize the RPC endpoints
rpcEndpoints := make([]*RPCEndpoint, len(rpcEndpointsList))
for i, url := range rpcEndpointsList {
rpcEndpoints[i] = &RPCEndpoint{URL: url}
}

rpcManager := &RPCManager{
Endpoints: rpcEndpoints,
}

// Pre-calculate metrics and set the best client on initialization
if err := rpcManager.updateAndSortEndpoints(); err != nil {
return nil, fmt.Errorf("failed to initialize RPC Manager: %w", err)
}

return rpcManager, nil
}

func (m *RPCManager) GetBestRPCClient() (*ethclient.Client, error) {
m.mutex.RLock()
defer m.mutex.RUnlock()

if m.BestRPCClient == nil {
return nil, fmt.Errorf("no best RPC client available")
}
return m.BestRPCClient, nil
}

// SwitchToNextBestRPCClient switches to the next best available client after the current best client.
func (m *RPCManager) SwitchToNextBestRPCClient() error {
m.mutex.Lock()
defer m.mutex.Unlock()

// If there are fewer than 2 endpoints, there are no alternate clients to switch to.
if len(m.Endpoints) < 2 {
return fmt.Errorf("no other RPC clients to switch to")
}

// Find the index of the current best client
var currentIndex = -1
for i, endpoint := range m.Endpoints {
if endpoint.Client == m.BestRPCClient {
currentIndex = i
break
}
}

// If current client is not found (which is rare), return an error
if currentIndex == -1 {
return fmt.Errorf("current best client not found in the list of endpoints")
}

// Calculate the next index by wrapping around if necessary
nextIndex := (currentIndex + 1) % len(m.Endpoints)

// Switch to the next client in the list
m.BestRPCClient = m.Endpoints[nextIndex].Client
log.Infof("Switched to the next best RPC client: %s", m.Endpoints[nextIndex].URL)
return nil
}
25 changes: 25 additions & 0 deletions RPC/rpc_struct.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package RPC

import (
"context"
"github.com/ethereum/go-ethereum/ethclient"
"sync"
)

type RPCEndpoint struct {
URL string
BlockNumber uint64
Latency float64
Client *ethclient.Client
}

type RPCManager struct {
Endpoints []*RPCEndpoint
mutex sync.RWMutex
BestRPCClient *ethclient.Client // Holds the current best RPC client
}

type RPCParameters struct {
Ctx context.Context // Context with timeout for handling unresponsive RPC calls
RPCManager *RPCManager // RPC manager for client selection and contract calls
}
65 changes: 23 additions & 42 deletions cmd/addStake.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
package cmd

import (
"context"
"razor/accounts"
"razor/RPC"
"razor/core"
"razor/core/types"
"razor/logger"
"razor/pkg/bindings"
"razor/utils"

Expand Down Expand Up @@ -34,33 +32,11 @@ func initialiseStake(cmd *cobra.Command, args []string) {

//This function sets the flags appropriately and executes the StakeCoins function
func (*UtilsStruct) ExecuteStake(flagSet *pflag.FlagSet) {
config, err := cmdUtils.GetConfigData()
utils.CheckError("Error in getting config: ", err)
log.Debugf("ExecuteStake: config: %+v", config)
config, rpcParameters, account, err := InitializeCommandDependencies(flagSet)
utils.CheckError("Error in initialising command dependencies: ", err)

client := razorUtils.ConnectToClient(config.Provider)

address, err := flagSetUtils.GetStringAddress(flagSet)
utils.CheckError("Error in getting address: ", err)
log.Debug("ExecuteStake: Address: ", address)

logger.SetLoggerParameters(client, address)
log.Debug("Checking to assign log file...")
fileUtils.AssignLogFile(flagSet, config)

log.Debug("Getting password...")
password := razorUtils.AssignPassword(flagSet)

accountManager, err := razorUtils.AccountManagerForKeystore()
utils.CheckError("Error in getting accounts manager for keystore: ", err)

account := accounts.InitAccountStruct(address, password, accountManager)

err = razorUtils.CheckPassword(account)
utils.CheckError("Error in fetching private key from given password: ", err)

balance, err := razorUtils.FetchBalance(client, address)
utils.CheckError("Error in fetching razor balance for account: "+address, err)
balance, err := razorUtils.FetchBalance(rpcParameters, account.Address)
utils.CheckError("Error in fetching razor balance for account: "+account.Address, err)
log.Debug("Getting amount in wei...")
valueInWei, err := cmdUtils.AssignAmountInWei(flagSet)
utils.CheckError("Error in getting amount: ", err)
Expand All @@ -70,13 +46,13 @@ func (*UtilsStruct) ExecuteStake(flagSet *pflag.FlagSet) {
razorUtils.CheckAmountAndBalance(valueInWei, balance)

log.Debug("Checking whether sFUEL balance is not 0...")
razorUtils.CheckEthBalanceIsZero(context.Background(), client, address)
razorUtils.CheckEthBalanceIsZero(rpcParameters, account.Address)

minSafeRazor, err := razorUtils.GetMinSafeRazor(context.Background(), client)
minSafeRazor, err := razorUtils.GetMinSafeRazor(rpcParameters)
utils.CheckError("Error in getting minimum safe razor amount: ", err)
log.Debug("ExecuteStake: Minimum razor that you can stake for first time: ", minSafeRazor)

stakerId, err := razorUtils.GetStakerId(context.Background(), client, address)
stakerId, err := razorUtils.GetStakerId(rpcParameters, account.Address)
utils.CheckError("Error in getting stakerId: ", err)
log.Debug("ExecuteStake: Staker Id: ", stakerId)

Expand All @@ -85,7 +61,7 @@ func (*UtilsStruct) ExecuteStake(flagSet *pflag.FlagSet) {
}

if stakerId != 0 {
staker, err := razorUtils.GetStaker(context.Background(), client, stakerId)
staker, err := razorUtils.GetStaker(rpcParameters, stakerId)
utils.CheckError("Error in getting staker: ", err)

if staker.IsSlashed {
Expand All @@ -94,33 +70,32 @@ func (*UtilsStruct) ExecuteStake(flagSet *pflag.FlagSet) {
}

txnArgs := types.TransactionOptions{
Client: client,
Amount: valueInWei,
ChainId: core.ChainId,
Config: config,
Account: account,
}

log.Debug("ExecuteStake: Calling Approve() for amount: ", txnArgs.Amount)
approveTxnHash, err := cmdUtils.Approve(txnArgs)
approveTxnHash, err := cmdUtils.Approve(rpcParameters, txnArgs)
utils.CheckError("Approve error: ", err)

if approveTxnHash != core.NilHash {
err = razorUtils.WaitForBlockCompletion(txnArgs.Client, approveTxnHash.Hex())
err = razorUtils.WaitForBlockCompletion(rpcParameters, approveTxnHash.Hex())
utils.CheckError("Error in WaitForBlockCompletion for approve: ", err)
}

log.Debug("ExecuteStake: Calling StakeCoins() for amount: ", txnArgs.Amount)
stakeTxnHash, err := cmdUtils.StakeCoins(txnArgs)
stakeTxnHash, err := cmdUtils.StakeCoins(rpcParameters, txnArgs)
utils.CheckError("Stake error: ", err)

err = razorUtils.WaitForBlockCompletion(txnArgs.Client, stakeTxnHash.Hex())
err = razorUtils.WaitForBlockCompletion(rpcParameters, stakeTxnHash.Hex())
utils.CheckError("Error in WaitForBlockCompletion for stake: ", err)
}

//This function allows the user to stake razors in the razor network and returns the hash
func (*UtilsStruct) StakeCoins(txnArgs types.TransactionOptions) (common.Hash, error) {
epoch, err := razorUtils.GetEpoch(context.Background(), txnArgs.Client)
func (*UtilsStruct) StakeCoins(rpcParameters RPC.RPCParameters, txnArgs types.TransactionOptions) (common.Hash, error) {
epoch, err := razorUtils.GetEpoch(rpcParameters)
if err != nil {
return core.NilHash, err
}
Expand All @@ -130,9 +105,15 @@ func (*UtilsStruct) StakeCoins(txnArgs types.TransactionOptions) (common.Hash, e
txnArgs.MethodName = "stake"
txnArgs.Parameters = []interface{}{epoch, txnArgs.Amount}
txnArgs.ABI = bindings.StakeManagerMetaData.ABI
txnOpts := razorUtils.GetTxnOpts(context.Background(), txnArgs)
txnOpts := razorUtils.GetTxnOpts(rpcParameters, txnArgs)

client, err := rpcParameters.RPCManager.GetBestRPCClient()
if err != nil {
return core.NilHash, err
}

log.Debugf("Executing Stake transaction with epoch = %d, amount = %d", epoch, txnArgs.Amount)
txn, err := stakeManagerUtils.Stake(txnArgs.Client, txnOpts, epoch, txnArgs.Amount)
txn, err := stakeManagerUtils.Stake(client, txnOpts, epoch, txnArgs.Amount)
if err != nil {
return core.NilHash, err
}
Expand Down
17 changes: 11 additions & 6 deletions cmd/approve.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@
package cmd

import (
"context"
"github.com/ethereum/go-ethereum/common"
"razor/RPC"
"razor/core"
"razor/core/types"
"razor/pkg/bindings"
)

//This function approves the transaction if the user has sufficient balance otherwise it fails the transaction
func (*UtilsStruct) Approve(txnArgs types.TransactionOptions) (common.Hash, error) {
opts := razorUtils.GetOptions()
allowance, err := tokenManagerUtils.Allowance(txnArgs.Client, &opts, common.HexToAddress(txnArgs.Account.Address), common.HexToAddress(core.StakeManagerAddress))
func (*UtilsStruct) Approve(rpcParameters RPC.RPCParameters, txnArgs types.TransactionOptions) (common.Hash, error) {
allowance, err := razorUtils.Allowance(rpcParameters, common.HexToAddress(txnArgs.Account.Address), common.HexToAddress(core.StakeManagerAddress))
if err != nil {
return core.NilHash, err
}
Expand All @@ -26,9 +25,15 @@ func (*UtilsStruct) Approve(txnArgs types.TransactionOptions) (common.Hash, erro
txnArgs.MethodName = "approve"
txnArgs.ABI = bindings.RAZORMetaData.ABI
txnArgs.Parameters = []interface{}{common.HexToAddress(core.StakeManagerAddress), txnArgs.Amount}
txnOpts := razorUtils.GetTxnOpts(context.Background(), txnArgs)
txnOpts := razorUtils.GetTxnOpts(rpcParameters, txnArgs)

client, err := rpcParameters.RPCManager.GetBestRPCClient()
if err != nil {
return core.NilHash, err
}

log.Debug("Executing Approve transaction with amount: ", txnArgs.Amount)
txn, err := tokenManagerUtils.Approve(txnArgs.Client, txnOpts, common.HexToAddress(core.StakeManagerAddress), txnArgs.Amount)
txn, err := tokenManagerUtils.Approve(client, txnOpts, common.HexToAddress(core.StakeManagerAddress), txnArgs.Amount)
if err != nil {
return core.NilHash, err
}
Expand Down
Loading
Loading