From 1da9db5dedf6da312c897d44067677d79166baf7 Mon Sep 17 00:00:00 2001 From: T Date: Sat, 15 Jun 2024 10:35:14 -0400 Subject: [PATCH] [ORA-1625] Check Actor Balance Before Registration (#127) Check the user has a sufficient balance to register before wasting their transaction fees on a failed registration. --- cmd/node/appchain.go | 93 ++++++++++++++++++++++++---------- cmd/node/execute.go | 1 - cmd/node/types.go | 20 ++++---- docker/Dockerfile_worker | 3 +- docker/Dockerfile_worker_py3.9 | 3 +- 5 files changed, 78 insertions(+), 42 deletions(-) diff --git a/cmd/node/appchain.go b/cmd/node/appchain.go index 952d0ee..e33def0 100644 --- a/cmd/node/appchain.go +++ b/cmd/node/appchain.go @@ -13,10 +13,12 @@ import ( "time" cosmossdk_io_math "cosmossdk.io/math" - "github.com/allora-network/allora-chain/x/emissions/types" + chainParams "github.com/allora-network/allora-chain/app/params" + emissionstypes "github.com/allora-network/allora-chain/x/emissions/types" "github.com/allora-network/b7s/models/blockless" "github.com/allora-network/b7s/node/aggregate" sdktypes "github.com/cosmos/cosmos-sdk/types" + banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" "github.com/ignite/cli/v28/ignite/pkg/cosmosaccount" "github.com/ignite/cli/v28/ignite/pkg/cosmosclient" "github.com/rs/zerolog" @@ -118,7 +120,7 @@ func NewAppChain(config AppChainConfig, log zerolog.Logger) (*AppChain, error) { } // Create query client - queryClient := types.NewQueryClient(client.Context()) + queryClient := emissionstypes.NewQueryClient(client.Context()) // this is terrible, no isConnected as part of this code path if client.Context().ChainID == "" { @@ -126,12 +128,12 @@ func NewAppChain(config AppChainConfig, log zerolog.Logger) (*AppChain, error) { } appchain := &AppChain{ - Address: address, - Account: account, - Logger: log, - Client: client, - QueryClient: queryClient, - Config: config, + Address: address, + Account: account, + Logger: log, + Client: client, + EmissionsQueryClient: queryClient, + Config: config, } if config.NodeRole == blockless.WorkerNode { @@ -160,7 +162,7 @@ func parseTopicIds(appchain *AppChain, topicIds []string) []uint64 { func isReputerRegistered(appchain *AppChain, topicId uint64) (bool, error) { ctx := context.Background() - res, err := appchain.QueryClient.IsReputerRegisteredInTopicId(ctx, &types.QueryIsReputerRegisteredInTopicIdRequest{ + res, err := appchain.EmissionsQueryClient.IsReputerRegisteredInTopicId(ctx, &emissionstypes.QueryIsReputerRegisteredInTopicIdRequest{ TopicId: topicId, Address: appchain.Address, }) @@ -175,7 +177,7 @@ func isReputerRegistered(appchain *AppChain, topicId uint64) (bool, error) { func isWorkerRegistered(appchain *AppChain, topicId uint64) (bool, error) { ctx := context.Background() - res, err := appchain.QueryClient.IsWorkerRegisteredInTopicId(ctx, &types.QueryIsWorkerRegisteredInTopicIdRequest{ + res, err := appchain.EmissionsQueryClient.IsWorkerRegisteredInTopicId(ctx, &emissionstypes.QueryIsWorkerRegisteredInTopicIdRequest{ TopicId: topicId, Address: appchain.Address, }) @@ -187,6 +189,21 @@ func isWorkerRegistered(appchain *AppChain, topicId uint64) (bool, error) { return res.IsRegistered, nil } +func hasBalanceForRegistration( + ctx context.Context, + appchain *AppChain, + registrationFee cosmossdk_io_math.Int, +) (bool, error) { + resp, err := appchain.BankQueryClient.Balance(ctx, &banktypes.QueryBalanceRequest{ + Address: appchain.Address, + Denom: chainParams.DefaultBondDenom, + }) + if err != nil { + return false, err + } + return registrationFee.LTE(resp.Balance.Amount), nil +} + // / Registration func registerWithBlockchain(appchain *AppChain) { ctx := context.Background() @@ -206,6 +223,11 @@ func registerWithBlockchain(appchain *AppChain) { // Print the array entries as a comma-separated value list topicsList := strings.Join(strings.Fields(fmt.Sprint(b7sTopicIds)), ", ") appchain.Logger.Info().Str("topicsList", topicsList).Msg("Topics list") + moduleParams, err := appchain.EmissionsQueryClient.Params(ctx, &emissionstypes.QueryParamsRequest{}) + if err != nil { + appchain.Logger.Error().Err(err).Msg("could not get chain params") + return + } // Iterate each topic for _, topicId := range b7sTopicIds { @@ -221,8 +243,23 @@ func registerWithBlockchain(appchain *AppChain) { continue } if !is_registered { + hasBalance, err := hasBalanceForRegistration(ctx, appchain, moduleParams.Params.RegistrationFee) + if err != nil { + appchain.Logger.Error().Err(err). + Uint64("topic", topicId). + Str("addr", appchain.Address). + Msg("could not check if the node has enough balance to register, skipping.") + continue + } + if !hasBalance { + appchain.Logger.Error(). + Uint64("topic", topicId). + Str("addr", appchain.Address). + Msg("node does not have enough balance to register, skipping.") + continue + } // register the wroker in the topic - msg := &types.MsgRegister{ + msg := &emissionstypes.MsgRegister{ Sender: appchain.Address, LibP2PKey: appchain.Config.LibP2PKey, MultiAddress: appchain.Config.MultiAddress, @@ -239,7 +276,7 @@ func registerWithBlockchain(appchain *AppChain) { if isReputer { var initstake = appchain.Config.InitialStake if initstake > 0 { - msg := &types.MsgAddStake{ + msg := &emissionstypes.MsgAddStake{ Sender: appchain.Address, Amount: cosmossdk_io_math.NewInt(initstake), TopicId: topicId, @@ -287,14 +324,14 @@ func (ap *AppChain) SendDataWithRetry(ctx context.Context, req sdktypes.Msg, Max // Sending Inferences/Forecasts to the AppChain func (ap *AppChain) SendWorkerModeData(ctx context.Context, topicId uint64, results aggregate.Results) { // Aggregate the inferences from all peers/workers - WorkerDataBundles := make([]*types.WorkerDataBundle, 0) - var nonce *types.Nonce + WorkerDataBundles := make([]*emissionstypes.WorkerDataBundle, 0) + var nonce *emissionstypes.Nonce for _, result := range results { for _, peer := range result.Peers { ap.Logger.Debug().Str("worker peer", peer.String()) // Get Peer's $allo address - res, err := ap.QueryClient.GetWorkerAddressByP2PKey(ctx, &types.QueryWorkerAddressByP2PKeyRequest{ + res, err := ap.EmissionsQueryClient.GetWorkerAddressByP2PKey(ctx, &emissionstypes.QueryWorkerAddressByP2PKeyRequest{ Libp2PKey: peer.String(), }) if err != nil { @@ -311,7 +348,7 @@ func (ap *AppChain) SendWorkerModeData(ctx context.Context, topicId uint64, resu continue } if nonce == nil { - nonce = &types.Nonce{BlockHeight: value.BlockHeight} + nonce = &emissionstypes.Nonce{BlockHeight: value.BlockHeight} } // Here reputer leader can choose to validate data further to ensure set is correct and act accordingly if value.WorkerDataBundle == nil { @@ -339,7 +376,7 @@ func (ap *AppChain) SendWorkerModeData(ctx context.Context, topicId uint64, resu } // Make 1 request per worker - req := &types.MsgInsertBulkWorkerPayload{ + req := &emissionstypes.MsgInsertBulkWorkerPayload{ Sender: ap.Address, Nonce: nonce, TopicId: topicId, @@ -367,7 +404,7 @@ const MAX_NUMBER_STAKE_QUERIES_PER_REQUEST = uint64(3) // Get the stake of each reputer in the given topic func (ap *AppChain) getStakePerReputer(ctx context.Context, topicId uint64, reputerAddrs []*string) (map[string]cosmossdk_io_math.Int, error) { maxReputers := DEFAULT_MAX_REPUTERS_FOR_STAKE_QUERY - params, err := ap.QueryClient.Params(ctx, &types.QueryParamsRequest{}) + params, err := ap.EmissionsQueryClient.Params(ctx, &emissionstypes.QueryParamsRequest{}) if err != nil { ap.Logger.Error().Err(err).Uint64("topic", topicId).Msg("could not get chain params") } @@ -394,7 +431,7 @@ func (ap *AppChain) getStakePerReputer(ctx context.Context, topicId uint64, repu } addresses = append(addresses, *addr) } - res, err := ap.QueryClient.GetMultiReputerStakeInTopic(ctx, &types.QueryMultiReputerStakeInTopicRequest{ + res, err := ap.EmissionsQueryClient.GetMultiReputerStakeInTopic(ctx, &emissionstypes.QueryMultiReputerStakeInTopicRequest{ TopicId: topicId, Addresses: addresses, }) @@ -487,11 +524,11 @@ func (ap *AppChain) getStakeWeightedBlockHeights( // Sending Losses to the AppChain func (ap *AppChain) SendReputerModeData(ctx context.Context, topicId uint64, results aggregate.Results) { // Aggregate the forecast from reputer leader - var valueBundles []*types.ReputerValueBundle + var valueBundles []*emissionstypes.ReputerValueBundle var reputerAddrs []*string var reputerAddrSet = make(map[string]bool) // Prevents duplicate reputer addresses from being counted in vote tally - var nonceCurrent *types.Nonce - var nonceEval *types.Nonce + var nonceCurrent *emissionstypes.Nonce + var nonceEval *emissionstypes.Nonce var blockCurrentToReputer = make(map[int64][]string) // map blockHeight to addresses of reputers who sent data for current block height var blockEvalToReputer = make(map[int64][]string) // map blockHeight to addresses of reputers who sent data for eval block height @@ -501,7 +538,7 @@ func (ap *AppChain) SendReputerModeData(ctx context.Context, topicId uint64, res ap.Logger.Debug().Str("worker peer", peer.String()) // Get Peer $allo address - res, err := ap.QueryClient.GetReputerAddressByP2PKey(ctx, &types.QueryReputerAddressByP2PKeyRequest{ + res, err := ap.EmissionsQueryClient.GetReputerAddressByP2PKey(ctx, &emissionstypes.QueryReputerAddressByP2PKeyRequest{ Libp2PKey: peer.String(), }) if err != nil { @@ -566,11 +603,11 @@ func (ap *AppChain) SendReputerModeData(ctx context.Context, topicId uint64, res ap.Logger.Error().Int64("blockCurrentHeight", blockCurrentHeight).Int64("blockEvalHeight", blockEvalHeight).Msg("blockCurrentHeight < blockEvalHeight, not sending data to the chain") return } - nonceCurrent = &types.Nonce{BlockHeight: blockCurrentHeight} - nonceEval = &types.Nonce{BlockHeight: blockEvalHeight} + nonceCurrent = &emissionstypes.Nonce{BlockHeight: blockCurrentHeight} + nonceEval = &emissionstypes.Nonce{BlockHeight: blockEvalHeight} // Remove those bundles that do not come from the current block height - var valueBundlesFiltered []*types.ReputerValueBundle + var valueBundlesFiltered []*emissionstypes.ReputerValueBundle for _, valueBundle := range valueBundles { if valueBundle.ValueBundle.ReputerRequestNonce.ReputerNonce.BlockHeight == blockCurrentHeight && valueBundle.ValueBundle.ReputerRequestNonce.WorkerNonce.BlockHeight == blockEvalHeight { @@ -590,9 +627,9 @@ func (ap *AppChain) SendReputerModeData(ctx context.Context, topicId uint64, res } // Make 1 request per worker - req := &types.MsgInsertBulkReputerPayload{ + req := &emissionstypes.MsgInsertBulkReputerPayload{ Sender: ap.Address, - ReputerRequestNonce: &types.ReputerRequestNonce{ + ReputerRequestNonce: &emissionstypes.ReputerRequestNonce{ ReputerNonce: nonceCurrent, WorkerNonce: nonceEval, }, diff --git a/cmd/node/execute.go b/cmd/node/execute.go index 7d348ce..5fe12ef 100644 --- a/cmd/node/execute.go +++ b/cmd/node/execute.go @@ -50,7 +50,6 @@ func sendResultsToChain(log zerolog.Logger, appChainClient *AppChain, res node.C log.Debug().Str("Topic", res.Topic).Str("worker mode", appChainClient.Config.WorkerMode).Msg("Found topic ID") - // TODO: We can move this context to the AppChain struct (previous context was breaking the tx broadcast response) reqCtx := context.Background() if appChainClient.Config.WorkerMode == WorkerModeWorker { // for inference or forecast topicId, err := strconv.ParseUint(res.Topic, 10, 64) diff --git a/cmd/node/types.go b/cmd/node/types.go index b4536f0..405c5b4 100644 --- a/cmd/node/types.go +++ b/cmd/node/types.go @@ -1,9 +1,10 @@ package main import ( - "github.com/allora-network/allora-chain/x/emissions/types" + emissionstypes "github.com/allora-network/allora-chain/x/emissions/types" "github.com/allora-network/b7s/config" "github.com/allora-network/b7s/models/blockless" + banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" "github.com/ignite/cli/v28/ignite/pkg/cosmosaccount" "github.com/ignite/cli/v28/ignite/pkg/cosmosclient" "github.com/rs/zerolog" @@ -15,12 +16,13 @@ type alloraCfg struct { } type AppChain struct { - Address string - Account cosmosaccount.Account - Client *cosmosclient.Client - QueryClient types.QueryClient - Config AppChainConfig - Logger zerolog.Logger + Address string + Account cosmosaccount.Account + Client *cosmosclient.Client + EmissionsQueryClient emissionstypes.QueryClient + BankQueryClient banktypes.QueryClient + Config AppChainConfig + Logger zerolog.Logger } type AppChainConfig struct { @@ -55,7 +57,7 @@ type InferenceForecastResponse struct { } type WorkerDataResponse struct { - *types.WorkerDataBundle + *emissionstypes.WorkerDataBundle BlockHeight int64 `json:"blockHeight,omitempty"` TopicId int64 `json:"topicId,omitempty"` } @@ -74,7 +76,7 @@ type ValueBundle struct { // Wrapper around the ReputerValueBundle to include the block height and topic id for the leader type ReputerDataResponse struct { - *types.ReputerValueBundle + *emissionstypes.ReputerValueBundle BlockHeight int64 `json:"blockHeight,omitempty"` BlockHeightEval int64 `json:"blockHeightEval,omitempty"` TopicId int64 `json:"topicId,omitempty"` diff --git a/docker/Dockerfile_worker b/docker/Dockerfile_worker index 080696d..8e94579 100644 --- a/docker/Dockerfile_worker +++ b/docker/Dockerfile_worker @@ -16,8 +16,7 @@ ENV DEBIAN_FRONTEND=noninteractive \ APP_PATH=/app ## curl, unzip other utilities -#! libssl-dev - BLS_RUNTIME dependency # - temporary use libssl 1.1 TODO: Should use fresher libssl -#! gh - to downaload release from priv repo +#! gh - to download release from priv repo RUN apt update && \ apt -y dist-upgrade && \ apt install -y --no-install-recommends \ diff --git a/docker/Dockerfile_worker_py3.9 b/docker/Dockerfile_worker_py3.9 index 1d0eb17..f5d4042 100644 --- a/docker/Dockerfile_worker_py3.9 +++ b/docker/Dockerfile_worker_py3.9 @@ -16,8 +16,7 @@ ENV DEBIAN_FRONTEND=noninteractive \ APP_PATH=/app ## curl, unzip other utilities -#! libssl-dev - BLS_RUNTIME dependency # - temporary use libssl 1.1 TODO: Should use fresher libssl -#! gh - to downaload release from priv repo +#! gh - to download release from priv repo RUN apt update && \ apt -y dist-upgrade && \ apt install -y --no-install-recommends \