Skip to content
This repository has been archived by the owner on Sep 4, 2024. It is now read-only.

Commit

Permalink
Diego/ora 1255 adapt registration code on b7s nodes (#114)
Browse files Browse the repository at this point in the history
* Uses new functions `QueryIsWorkerRegisteredInTopicIdRequest`
`QueryIsReputerRegisteredInTopicIdRequest` instead of getting a list of
topics.
* No dereg: it will be done manually by user
* Unified reputer/worker code, much smaller now.
  • Loading branch information
xmariachi authored Apr 22, 2024
1 parent 3dc281d commit b97d75e
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 162 deletions.
229 changes: 70 additions & 159 deletions cmd/node/appchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"math/rand"
"os"
"path/filepath"
"slices"
"strconv"
"strings"
"time"
Expand All @@ -19,7 +17,6 @@ import (
"github.com/allora-network/b7s/models/blockless"
"github.com/allora-network/b7s/node/aggregate"
sdktypes "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/query"
"github.com/ignite/cli/v28/ignite/pkg/cosmosaccount"
"github.com/ignite/cli/v28/ignite/pkg/cosmosclient"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -159,6 +156,36 @@ func parseTopicIds(appchain *AppChain, topicIds []string) []uint64 {
return b7sTopicIds
}

func isReputerRegistered(appchain *AppChain, topicId uint64) (bool, error) {
ctx := context.Background()

res, err := appchain.QueryClient.IsReputerRegisteredInTopicId(ctx, &types.QueryIsReputerRegisteredInTopicIdRequest{
TopicId: topicId,
Address: appchain.ReputerAddress,
})

if err != nil {
return false, err
}

return res.IsRegistered, nil
}

func isWorkerRegistered(appchain *AppChain, topicId uint64) (bool, error) {
ctx := context.Background()

res, err := appchain.QueryClient.IsWorkerRegisteredInTopicId(ctx, &types.QueryIsWorkerRegisteredInTopicIdRequest{
TopicId: topicId,
Address: appchain.ReputerAddress,
})

if err != nil {
return false, err
}

return res.IsRegistered, nil
}

// / Registration
func registerWithBlockchain(appchain *AppChain) {
ctx := context.Background()
Expand All @@ -179,171 +206,55 @@ func registerWithBlockchain(appchain *AppChain) {
topicsList := strings.Join(strings.Fields(fmt.Sprint(b7sTopicIds)), ", ")
appchain.Logger.Info().Str("topicsList", topicsList).Msg("Topics list")

appchain.Logger.Info().Str("Address", appchain.ReputerAddress).Msg("Node address")
// Check if address is already registered in a topic, getting all topics already reg'd
res, err := appchain.QueryClient.GetRegisteredTopicIds(ctx, &types.QueryRegisteredTopicIdsRequest{
Address: appchain.ReputerAddress,
IsReputer: isReputer,
})
if err != nil {
appchain.Logger.Error().Err(err).Msg("could not check if the node is already registered. Topic not created?")
return
}
var msg sdktypes.Msg
appchain.Logger.Info().Str("Worker", appchain.ReputerAddress).Msg("Current Address")
if len(res.TopicIds) > 0 {
appchain.Logger.Debug().Msg("Worker already registered for some topics, checking...")
// Check if libp2p key is already registered - if not, register it
var topicsToRegister []uint64
var topicsToDeRegister []uint64
// Calculate topics to deregister
for _, topicUint64 := range res.TopicIds {
if !slices.Contains(b7sTopicIds, topicUint64) {
appchain.Logger.Info().Uint64("topic", topicUint64).Msg("marking deregistration for topic")
topicsToDeRegister = append(topicsToDeRegister, topicUint64)
if isReputer {
// Iterate each topic
for _, topicId := range b7sTopicIds {
var is_registered bool
var err error
if isReputer {
is_registered, err = isReputerRegistered(appchain, topicId)
} else {
appchain.Logger.Info().Uint64("topic", topicUint64).Msg("Not deregistering topic")
}
}
// Calculate topics to register
for _, topicUint64 := range b7sTopicIds {
if !slices.Contains(res.TopicIds, topicUint64) {
appchain.Logger.Info().Uint64("topic", topicUint64).Msg("marking registration for topic")
topicsToRegister = append(topicsToRegister, topicUint64)
} else {
appchain.Logger.Info().Uint64("topic", topicUint64).Msg("Topic is already registered, no registration for topic")
}
}
// Registration on new topics
for _, topicId := range topicsToRegister {
if err != nil {
appchain.Logger.Info().Err(err).Uint64("topic", topicId).Msg("Could not register for topic")
break
}
msg = &types.MsgRegister{
Sender: appchain.ReputerAddress,
LibP2PKey: appchain.Config.LibP2PKey,
MultiAddress: appchain.Config.MultiAddress,
TopicId: topicId,
Owner: appchain.ReputerAddress,
IsReputer: isReputer,
is_registered, err = isWorkerRegistered(appchain, topicId)
}
res, err := appchain.SendDataWithRetry(ctx, msg, NUM_REGISTRATION_RETRIES,
NUM_REGISTRATION_RETRY_MIN_DELAY, NUM_REGISTRATION_RETRY_MAX_DELAY, "register node")
if err != nil {
appchain.Logger.Fatal().Err(err).Uint64("topic", topicId).Str("txHash", res.TxHash).Msg("could not register the node with the Allora blockchain in topic")
} else {
if isReputer {
var initstake = appchain.Config.InitialStake
if initstake > 0 {
msg = &types.MsgAddStake{
Sender: appchain.ReputerAddress,
Amount: cosmossdk_io_math.NewUint(initstake),
TopicId: topicId,
}
res, err := appchain.SendDataWithRetry(ctx, msg, NUM_STAKING_RETRIES, NUM_STAKING_RETRY_MIN_DELAY, NUM_STAKING_RETRY_MAX_DELAY, "add stake")
if err != nil {
appchain.Logger.Error().Err(err).Uint64("topic", topicId).Str("txHash", res.TxHash).Msg("could not register the node with the Allora blockchain in specified topic")
}
}
} else {
appchain.Logger.Info().Msg("No initial stake configured")
}
}
}
// Deregistration on old topics
for _, topicId := range topicsToDeRegister {
if err != nil {
appchain.Logger.Info().Err(err).Uint64("topic", topicId).Msg("Could not register for topic")
break
}
msg = &types.MsgRemoveRegistration{
Sender: appchain.ReputerAddress,
TopicId: topicId,
IsReputer: isReputer,
}

res, err := appchain.SendDataWithRetry(ctx, msg, NUM_REGISTRATION_RETRIES,
NUM_REGISTRATION_RETRY_MIN_DELAY, NUM_REGISTRATION_RETRY_MAX_DELAY, "deregister node")
if err != nil {
appchain.Logger.Fatal().Err(err).Uint64("topic", topicId).Msg("could not deregister the node with the Allora blockchain in topic")
} else {
appchain.Logger.Info().Str("txhash", res.TxHash).Uint64("topic", topicId).Msg("successfully deregistered node with Allora blockchain in topic")
appchain.Logger.Error().Err(err).Uint64("topicId", topicId).Msg("could not check if the node is already registered for topic, skipping.")
continue
}
}
} else {
appchain.Logger.Debug().Msg("Attempting first registration for this node")
// First registration: Check current balance of the account
pageRequest := &query.PageRequest{
Limit: 100,
Offset: 0,
}
// Check balance is over initial stake configured
balanceRes, err := appchain.Client.BankBalances(ctx, appchain.ReputerAddress, pageRequest)
if err != nil {
appchain.Logger.Error().Err(err).Msg("could not get account balance - is account funded?")
return
} else {
if len(balanceRes) > 0 {
// Get uallo balance
var ualloBalance sdktypes.Coin
var initstake = appchain.Config.InitialStake
for _, coin := range balanceRes {
if coin.Denom == "uallo" {
// Found the balance in "uallo"
appchain.Logger.Info().Str("balance", coin.Amount.BigInt().String()).Msg("Found allo balance in account, calculating...")
ualloBalance = coin
break
} else if coin.Denom == "allo" {
appchain.Logger.Info().Msg("Found allo balance in account, calculating...")
}
if !is_registered {
// register the wroker in the topic
msg := &types.MsgRegister{
Sender: appchain.ReputerAddress,
LibP2PKey: appchain.Config.LibP2PKey,
MultiAddress: appchain.Config.MultiAddress,
TopicId: topicId,
Owner: appchain.ReputerAddress,
IsReputer: isReputer,
}
if initstake > math.MaxInt64 {
initstake = math.MaxInt64
}
if ualloBalance.Amount.GTE(cosmossdk_io_math.NewInt(int64(initstake))) {
for _, topicToRegisterUint64 := range b7sTopicIds {
// If not registered in any topic
msg = &types.MsgRegister{
Sender: appchain.ReputerAddress,
LibP2PKey: appchain.Config.LibP2PKey,
MultiAddress: appchain.Config.MultiAddress,
TopicId: topicToRegisterUint64,
Owner: appchain.ReputerAddress,
IsReputer: isReputer,
}
res, err := appchain.SendDataWithRetry(ctx, msg, NUM_REGISTRATION_RETRIES,
NUM_REGISTRATION_RETRY_MIN_DELAY, NUM_REGISTRATION_RETRY_MAX_DELAY, "register node")
if err != nil {
appchain.Logger.Fatal().Err(err).Msg("could not register the node with the Allora blockchain in specified topics")
} else {
appchain.Logger.Info().Str("txhash", res.TxHash).Msg("successfully registered node with Allora blockchain")
if isReputer {
if initstake > 0 {
msg = &types.MsgAddStake{
Sender: appchain.ReputerAddress,
Amount: cosmossdk_io_math.NewUint(initstake),
TopicId: topicToRegisterUint64,
}
res, err := appchain.SendDataWithRetry(ctx, msg, NUM_STAKING_RETRIES, NUM_STAKING_RETRY_MIN_DELAY, NUM_STAKING_RETRY_MAX_DELAY, "add stake")
if err != nil {
appchain.Logger.Fatal().Err(err).Msg("could not register the node with the Allora blockchain in specified topics")
} else {
appchain.Logger.Info().Str("txhash", res.TxHash).Uint64("stake", initstake).Msg("successfully staked with Allora blockchain")
}
} else {
appchain.Logger.Info().Msg("No initial stake configured")
}
res, err := appchain.SendDataWithRetry(ctx, msg, NUM_REGISTRATION_RETRIES,
NUM_REGISTRATION_RETRY_MIN_DELAY, NUM_REGISTRATION_RETRY_MAX_DELAY, "register node")
if err != nil {
appchain.Logger.Fatal().Err(err).Uint64("topic", topicId).Str("txHash", res.TxHash).
Msg("could not register the node with the Allora blockchain in topic")
} else {
if isReputer {
var initstake = appchain.Config.InitialStake
if initstake > 0 {
msg := &types.MsgAddStake{
Sender: appchain.ReputerAddress,
Amount: cosmossdk_io_math.NewUint(initstake),
TopicId: topicId,
}
res, err := appchain.SendDataWithRetry(ctx, msg, NUM_STAKING_RETRIES,
NUM_STAKING_RETRY_MIN_DELAY, NUM_STAKING_RETRY_MAX_DELAY, "add stake")
if err != nil {
appchain.Logger.Error().Err(err).Uint64("topic", topicId).Str("txHash", res.TxHash).
Msg("could not register the node with the Allora blockchain in specified topic")
}
}
appchain.Logger.Info().Str("balance", balanceRes.String()).Msg("Registered Node")
} else {
appchain.Logger.Info().Msg("No initial stake configured")
}
} else {
appchain.Logger.Fatal().Str("balance", ualloBalance.Amount.BigInt().Text(10)).Int("InitialStake", int(appchain.Config.InitialStake)).Msg("account balance is lower than the initialStake requested")
}
} else {
appchain.Logger.Info().Str("account", appchain.ReputerAddress).Msg("account is not funded in uallo")
return
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22.2

require (
cosmossdk.io/math v1.3.0
github.com/allora-network/allora-chain v0.0.11-0.20240420173103-00e789106f23
github.com/allora-network/allora-chain v0.0.11-0.20240422191608-2da85ac47fb4
github.com/allora-network/b7s v0.0.2-0.20240418175046-eca9bfd68831
github.com/cockroachdb/pebble v1.1.0
github.com/cosmos/cosmos-sdk v0.50.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/allora-network/allora-chain v0.0.11-0.20240420173103-00e789106f23 h1:EmXS/v16rcbnNCV8yCSG7/zmUuL6yfYc3VH2GPCX6ps=
github.com/allora-network/allora-chain v0.0.11-0.20240420173103-00e789106f23/go.mod h1:evALp8UB4d8uR9XJ3EM1NQdsj9CpwJOJ9bZqhgl54eM=
github.com/allora-network/allora-chain v0.0.11-0.20240422191608-2da85ac47fb4 h1:NiAGWTiljUQhhaUxsU0vexirnG5OrVUNsAUG57wVO8Y=
github.com/allora-network/allora-chain v0.0.11-0.20240422191608-2da85ac47fb4/go.mod h1:evALp8UB4d8uR9XJ3EM1NQdsj9CpwJOJ9bZqhgl54eM=
github.com/allora-network/b7s v0.0.2-0.20240418175046-eca9bfd68831 h1:4s9e1sjeHlqG4SWoV29vcf/WGX9KeATx1V38X4k2f+I=
github.com/allora-network/b7s v0.0.2-0.20240418175046-eca9bfd68831/go.mod h1:rJJrdC5Y83LEDFxo/iJp3JJpi8I6TJncOTigMWk8ieE=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
Expand Down

0 comments on commit b97d75e

Please sign in to comment.