From b97d75e1b937de0c4c59b70ae4c7c73ab7303c4b Mon Sep 17 00:00:00 2001 From: Diego C Date: Mon, 22 Apr 2024 23:18:47 +0200 Subject: [PATCH] Diego/ora 1255 adapt registration code on b7s nodes (#114) * Uses new functions `QueryIsWorkerRegisteredInTopicIdRequest` `QueryIsReputerRegisteredInTopicIdRequest` instead of getting a list of topics. * No dereg: it will be done manually by user * Unified reputer/worker code, much smaller now. --- cmd/node/appchain.go | 229 +++++++++++++------------------------------ go.mod | 2 +- go.sum | 4 +- 3 files changed, 73 insertions(+), 162 deletions(-) diff --git a/cmd/node/appchain.go b/cmd/node/appchain.go index 04fb62d..326a159 100644 --- a/cmd/node/appchain.go +++ b/cmd/node/appchain.go @@ -5,11 +5,9 @@ import ( "encoding/json" "errors" "fmt" - "math" "math/rand" "os" "path/filepath" - "slices" "strconv" "strings" "time" @@ -19,7 +17,6 @@ import ( "github.com/allora-network/b7s/models/blockless" "github.com/allora-network/b7s/node/aggregate" sdktypes "github.com/cosmos/cosmos-sdk/types" - "github.com/cosmos/cosmos-sdk/types/query" "github.com/ignite/cli/v28/ignite/pkg/cosmosaccount" "github.com/ignite/cli/v28/ignite/pkg/cosmosclient" "github.com/rs/zerolog" @@ -159,6 +156,36 @@ func parseTopicIds(appchain *AppChain, topicIds []string) []uint64 { return b7sTopicIds } +func isReputerRegistered(appchain *AppChain, topicId uint64) (bool, error) { + ctx := context.Background() + + res, err := appchain.QueryClient.IsReputerRegisteredInTopicId(ctx, &types.QueryIsReputerRegisteredInTopicIdRequest{ + TopicId: topicId, + Address: appchain.ReputerAddress, + }) + + if err != nil { + return false, err + } + + return res.IsRegistered, nil +} + +func isWorkerRegistered(appchain *AppChain, topicId uint64) (bool, error) { + ctx := context.Background() + + res, err := appchain.QueryClient.IsWorkerRegisteredInTopicId(ctx, &types.QueryIsWorkerRegisteredInTopicIdRequest{ + TopicId: topicId, + Address: appchain.ReputerAddress, + }) + + if err != nil { + return false, err + } + + return res.IsRegistered, nil +} + // / Registration func registerWithBlockchain(appchain *AppChain) { ctx := context.Background() @@ -179,171 +206,55 @@ func registerWithBlockchain(appchain *AppChain) { topicsList := strings.Join(strings.Fields(fmt.Sprint(b7sTopicIds)), ", ") appchain.Logger.Info().Str("topicsList", topicsList).Msg("Topics list") - appchain.Logger.Info().Str("Address", appchain.ReputerAddress).Msg("Node address") - // Check if address is already registered in a topic, getting all topics already reg'd - res, err := appchain.QueryClient.GetRegisteredTopicIds(ctx, &types.QueryRegisteredTopicIdsRequest{ - Address: appchain.ReputerAddress, - IsReputer: isReputer, - }) - if err != nil { - appchain.Logger.Error().Err(err).Msg("could not check if the node is already registered. Topic not created?") - return - } - var msg sdktypes.Msg - appchain.Logger.Info().Str("Worker", appchain.ReputerAddress).Msg("Current Address") - if len(res.TopicIds) > 0 { - appchain.Logger.Debug().Msg("Worker already registered for some topics, checking...") - // Check if libp2p key is already registered - if not, register it - var topicsToRegister []uint64 - var topicsToDeRegister []uint64 - // Calculate topics to deregister - for _, topicUint64 := range res.TopicIds { - if !slices.Contains(b7sTopicIds, topicUint64) { - appchain.Logger.Info().Uint64("topic", topicUint64).Msg("marking deregistration for topic") - topicsToDeRegister = append(topicsToDeRegister, topicUint64) + if isReputer { + // Iterate each topic + for _, topicId := range b7sTopicIds { + var is_registered bool + var err error + if isReputer { + is_registered, err = isReputerRegistered(appchain, topicId) } else { - appchain.Logger.Info().Uint64("topic", topicUint64).Msg("Not deregistering topic") - } - } - // Calculate topics to register - for _, topicUint64 := range b7sTopicIds { - if !slices.Contains(res.TopicIds, topicUint64) { - appchain.Logger.Info().Uint64("topic", topicUint64).Msg("marking registration for topic") - topicsToRegister = append(topicsToRegister, topicUint64) - } else { - appchain.Logger.Info().Uint64("topic", topicUint64).Msg("Topic is already registered, no registration for topic") - } - } - // Registration on new topics - for _, topicId := range topicsToRegister { - if err != nil { - appchain.Logger.Info().Err(err).Uint64("topic", topicId).Msg("Could not register for topic") - break - } - msg = &types.MsgRegister{ - Sender: appchain.ReputerAddress, - LibP2PKey: appchain.Config.LibP2PKey, - MultiAddress: appchain.Config.MultiAddress, - TopicId: topicId, - Owner: appchain.ReputerAddress, - IsReputer: isReputer, + is_registered, err = isWorkerRegistered(appchain, topicId) } - res, err := appchain.SendDataWithRetry(ctx, msg, NUM_REGISTRATION_RETRIES, - NUM_REGISTRATION_RETRY_MIN_DELAY, NUM_REGISTRATION_RETRY_MAX_DELAY, "register node") if err != nil { - appchain.Logger.Fatal().Err(err).Uint64("topic", topicId).Str("txHash", res.TxHash).Msg("could not register the node with the Allora blockchain in topic") - } else { - if isReputer { - var initstake = appchain.Config.InitialStake - if initstake > 0 { - msg = &types.MsgAddStake{ - Sender: appchain.ReputerAddress, - Amount: cosmossdk_io_math.NewUint(initstake), - TopicId: topicId, - } - res, err := appchain.SendDataWithRetry(ctx, msg, NUM_STAKING_RETRIES, NUM_STAKING_RETRY_MIN_DELAY, NUM_STAKING_RETRY_MAX_DELAY, "add stake") - if err != nil { - appchain.Logger.Error().Err(err).Uint64("topic", topicId).Str("txHash", res.TxHash).Msg("could not register the node with the Allora blockchain in specified topic") - } - } - } else { - appchain.Logger.Info().Msg("No initial stake configured") - } - } - } - // Deregistration on old topics - for _, topicId := range topicsToDeRegister { - if err != nil { - appchain.Logger.Info().Err(err).Uint64("topic", topicId).Msg("Could not register for topic") - break - } - msg = &types.MsgRemoveRegistration{ - Sender: appchain.ReputerAddress, - TopicId: topicId, - IsReputer: isReputer, - } - - res, err := appchain.SendDataWithRetry(ctx, msg, NUM_REGISTRATION_RETRIES, - NUM_REGISTRATION_RETRY_MIN_DELAY, NUM_REGISTRATION_RETRY_MAX_DELAY, "deregister node") - if err != nil { - appchain.Logger.Fatal().Err(err).Uint64("topic", topicId).Msg("could not deregister the node with the Allora blockchain in topic") - } else { - appchain.Logger.Info().Str("txhash", res.TxHash).Uint64("topic", topicId).Msg("successfully deregistered node with Allora blockchain in topic") + appchain.Logger.Error().Err(err).Uint64("topicId", topicId).Msg("could not check if the node is already registered for topic, skipping.") + continue } - } - } else { - appchain.Logger.Debug().Msg("Attempting first registration for this node") - // First registration: Check current balance of the account - pageRequest := &query.PageRequest{ - Limit: 100, - Offset: 0, - } - // Check balance is over initial stake configured - balanceRes, err := appchain.Client.BankBalances(ctx, appchain.ReputerAddress, pageRequest) - if err != nil { - appchain.Logger.Error().Err(err).Msg("could not get account balance - is account funded?") - return - } else { - if len(balanceRes) > 0 { - // Get uallo balance - var ualloBalance sdktypes.Coin - var initstake = appchain.Config.InitialStake - for _, coin := range balanceRes { - if coin.Denom == "uallo" { - // Found the balance in "uallo" - appchain.Logger.Info().Str("balance", coin.Amount.BigInt().String()).Msg("Found allo balance in account, calculating...") - ualloBalance = coin - break - } else if coin.Denom == "allo" { - appchain.Logger.Info().Msg("Found allo balance in account, calculating...") - } + if !is_registered { + // register the wroker in the topic + msg := &types.MsgRegister{ + Sender: appchain.ReputerAddress, + LibP2PKey: appchain.Config.LibP2PKey, + MultiAddress: appchain.Config.MultiAddress, + TopicId: topicId, + Owner: appchain.ReputerAddress, + IsReputer: isReputer, } - if initstake > math.MaxInt64 { - initstake = math.MaxInt64 - } - if ualloBalance.Amount.GTE(cosmossdk_io_math.NewInt(int64(initstake))) { - for _, topicToRegisterUint64 := range b7sTopicIds { - // If not registered in any topic - msg = &types.MsgRegister{ - Sender: appchain.ReputerAddress, - LibP2PKey: appchain.Config.LibP2PKey, - MultiAddress: appchain.Config.MultiAddress, - TopicId: topicToRegisterUint64, - Owner: appchain.ReputerAddress, - IsReputer: isReputer, - } - res, err := appchain.SendDataWithRetry(ctx, msg, NUM_REGISTRATION_RETRIES, - NUM_REGISTRATION_RETRY_MIN_DELAY, NUM_REGISTRATION_RETRY_MAX_DELAY, "register node") - if err != nil { - appchain.Logger.Fatal().Err(err).Msg("could not register the node with the Allora blockchain in specified topics") - } else { - appchain.Logger.Info().Str("txhash", res.TxHash).Msg("successfully registered node with Allora blockchain") - if isReputer { - if initstake > 0 { - msg = &types.MsgAddStake{ - Sender: appchain.ReputerAddress, - Amount: cosmossdk_io_math.NewUint(initstake), - TopicId: topicToRegisterUint64, - } - res, err := appchain.SendDataWithRetry(ctx, msg, NUM_STAKING_RETRIES, NUM_STAKING_RETRY_MIN_DELAY, NUM_STAKING_RETRY_MAX_DELAY, "add stake") - if err != nil { - appchain.Logger.Fatal().Err(err).Msg("could not register the node with the Allora blockchain in specified topics") - } else { - appchain.Logger.Info().Str("txhash", res.TxHash).Uint64("stake", initstake).Msg("successfully staked with Allora blockchain") - } - } else { - appchain.Logger.Info().Msg("No initial stake configured") - } + res, err := appchain.SendDataWithRetry(ctx, msg, NUM_REGISTRATION_RETRIES, + NUM_REGISTRATION_RETRY_MIN_DELAY, NUM_REGISTRATION_RETRY_MAX_DELAY, "register node") + if err != nil { + appchain.Logger.Fatal().Err(err).Uint64("topic", topicId).Str("txHash", res.TxHash). + Msg("could not register the node with the Allora blockchain in topic") + } else { + if isReputer { + var initstake = appchain.Config.InitialStake + if initstake > 0 { + msg := &types.MsgAddStake{ + Sender: appchain.ReputerAddress, + Amount: cosmossdk_io_math.NewUint(initstake), + TopicId: topicId, + } + res, err := appchain.SendDataWithRetry(ctx, msg, NUM_STAKING_RETRIES, + NUM_STAKING_RETRY_MIN_DELAY, NUM_STAKING_RETRY_MAX_DELAY, "add stake") + if err != nil { + appchain.Logger.Error().Err(err).Uint64("topic", topicId).Str("txHash", res.TxHash). + Msg("could not register the node with the Allora blockchain in specified topic") } } - appchain.Logger.Info().Str("balance", balanceRes.String()).Msg("Registered Node") + } else { + appchain.Logger.Info().Msg("No initial stake configured") } - } else { - appchain.Logger.Fatal().Str("balance", ualloBalance.Amount.BigInt().Text(10)).Int("InitialStake", int(appchain.Config.InitialStake)).Msg("account balance is lower than the initialStake requested") } - } else { - appchain.Logger.Info().Str("account", appchain.ReputerAddress).Msg("account is not funded in uallo") - return } } } diff --git a/go.mod b/go.mod index d558c5d..7466442 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.22.2 require ( cosmossdk.io/math v1.3.0 - github.com/allora-network/allora-chain v0.0.11-0.20240420173103-00e789106f23 + github.com/allora-network/allora-chain v0.0.11-0.20240422191608-2da85ac47fb4 github.com/allora-network/b7s v0.0.2-0.20240418175046-eca9bfd68831 github.com/cockroachdb/pebble v1.1.0 github.com/cosmos/cosmos-sdk v0.50.5 diff --git a/go.sum b/go.sum index da08dd3..227ca74 100644 --- a/go.sum +++ b/go.sum @@ -57,8 +57,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/allora-network/allora-chain v0.0.11-0.20240420173103-00e789106f23 h1:EmXS/v16rcbnNCV8yCSG7/zmUuL6yfYc3VH2GPCX6ps= -github.com/allora-network/allora-chain v0.0.11-0.20240420173103-00e789106f23/go.mod h1:evALp8UB4d8uR9XJ3EM1NQdsj9CpwJOJ9bZqhgl54eM= +github.com/allora-network/allora-chain v0.0.11-0.20240422191608-2da85ac47fb4 h1:NiAGWTiljUQhhaUxsU0vexirnG5OrVUNsAUG57wVO8Y= +github.com/allora-network/allora-chain v0.0.11-0.20240422191608-2da85ac47fb4/go.mod h1:evALp8UB4d8uR9XJ3EM1NQdsj9CpwJOJ9bZqhgl54eM= github.com/allora-network/b7s v0.0.2-0.20240418175046-eca9bfd68831 h1:4s9e1sjeHlqG4SWoV29vcf/WGX9KeATx1V38X4k2f+I= github.com/allora-network/b7s v0.0.2-0.20240418175046-eca9bfd68831/go.mod h1:rJJrdC5Y83LEDFxo/iJp3JJpi8I6TJncOTigMWk8ieE= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=