Skip to content
This repository has been archived by the owner on Sep 4, 2024. It is now read-only.

Commit

Permalink
Fixed registration flow reputer address
Browse files Browse the repository at this point in the history
  • Loading branch information
guilherme-brandao committed Jan 31, 2024
1 parent 1accfd9 commit a5a378e
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 66 deletions.
96 changes: 32 additions & 64 deletions cmd/node/appchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

// create a new appchain client that we can use
func NewAppChain(config AppChainConfig, log zerolog.Logger) (*AppChain, error){
func NewAppChain(config AppChainConfig, log zerolog.Logger) (*AppChain, error) {
ctx := context.Background()

userHomeDir, err := os.UserHomeDir()
Expand All @@ -39,31 +39,23 @@ func NewAppChain(config AppChainConfig, log zerolog.Logger) (*AppChain, error){
}

var account cosmosaccount.Account
var address string

// if we're giving a keyring ring name, with no mnemonic restore
if(config.AddressRestoreMnemonic == "" && config.AddressKeyName != "") {
if config.AddressRestoreMnemonic == "" && config.AddressKeyName != "" {
// get account from the keyring
account, err = client.Account(config.AddressKeyName)
if err != nil {
config.SubmitTx = false
log.Warn().Err(err).Msg("could not retrieve account from keyring")
} else {

address, err = account.Address(config.AddressPrefix)
if err != nil {
config.SubmitTx = false
log.Warn().Err(err).Msg("could not retrieve allora blockchain address, transactions will not be submitted to chain")
}
}
} else if (config.AddressRestoreMnemonic != "" && config.AddressKeyName != "") {
}
} else if config.AddressRestoreMnemonic != "" && config.AddressKeyName != "" {
// restore from mneumonic
account, err = client.AccountRegistry.Import(config.AddressKeyName, config.AddressRestoreMnemonic, config.AddressAccountPassphrase)
if err != nil {
if err.Error() == "account already exists" {
account, err = client.Account(config.AddressKeyName)
}
}

if err != nil {
config.SubmitTx = false
log.Error().Err(err).Msg("error getting account")
Expand All @@ -74,71 +66,75 @@ func NewAppChain(config AppChainConfig, log zerolog.Logger) (*AppChain, error){
return nil, nil
}

address, err := account.Address(config.AddressPrefix)
if err != nil {
config.SubmitTx = false
log.Warn().Err(err).Msg("could not retrieve allora blockchain address, transactions will not be submitted to chain")
}

// Create query client
queryClient := types.NewQueryClient(client.Context())

// this is terrible, no isConnected as part of this code path
if(client.Context().ChainID == "") {
if client.Context().ChainID == "" {
return nil, nil
}

appchain := &AppChain{
ReputerAddress: address,
ReputerAccount: account,
Logger: log,
Client: &client,
QueryClient: queryClient,
Config: config,
Logger: log,
Client: &client,
QueryClient: queryClient,
Config: config,
}
if(!queryIsNodeRegistered(*appchain)) {
if !queryIsNodeRegistered(*appchain) {
registerWithBlockchain(appchain)
}

return appchain, nil
}

/// Registration
// / Registration
func registerWithBlockchain(appchain *AppChain) {
ctx := context.Background()

msg := &types.MsgRegisterWorker{
Creator: appchain.ReputerAddress,
Owner: appchain.ReputerAddress, // we need to allow a pass in of a claim address
LibP2PKey: appchain.Config.LibP2PKey,
Creator: appchain.ReputerAddress,
Owner: appchain.ReputerAddress, // we need to allow a pass in of a claim address
LibP2PKey: appchain.Config.LibP2PKey,
MultiAddress: appchain.Config.MultiAddress,
InitialStake: cosmossdk_io_math.NewUint(1),
TopicId: 0,
TopicId: 0,
}

txResp, err := appchain.Client.BroadcastTx(ctx, appchain.ReputerAccount, msg)
if err != nil {
if err != nil {
if strings.Contains(fmt.Sprint(err), types.Err_ErrWorkerAlreadyRegistered.String()) {
appchain.Logger.Info().Err(err).Msg("node is already registered")
} else {
appchain.Logger.Fatal().Err(err).Msg("could not register the node with the allora blockchain")
}
} else {
} else {
appchain.Logger.Info().Str("txhash", txResp.TxHash).Msg("successfully registered node with Allora blockchain")
}
}


func queryIsNodeRegistered(appchain AppChain) bool {
ctx := context.Background()

queryResp, err := appchain.QueryClient.GetWorkerNodeRegistration(ctx, &types.QueryRegisteredWorkerNodesRequest{
queryResp, err := appchain.QueryClient.GetWorkerNodeRegistration(ctx, &types.QueryRegisteredWorkerNodesRequest{
NodeId: appchain.ReputerAddress + appchain.Config.StringSeperator + appchain.Config.LibP2PKey,
})

if err != nil {
appchain.Logger.Fatal().Err(err).Msg("node could not be registered with blockchain")
}
if err != nil {
appchain.Logger.Fatal().Err(err).Msg("node could not be registered with blockchain")
}

return (len(queryResp.Nodes) >= 1)
return (len(queryResp.Nodes) >= 1)
}


/// Sending Inferences to the AppChain
// Sending Inferences to the AppChain
func (ap *AppChain) SendInferences(ctx context.Context, topicId uint64, results aggregate.Results) []WorkerInference {
// Aggregate the inferences from all peers/workers
var inferences []*types.Inference
Expand Down Expand Up @@ -191,7 +187,7 @@ func (ap *AppChain) SendUpdatedWeights(ctx context.Context, topicId uint64, resu
}

for peer, value := range extractedWeights {
ap.Logger.Info().Str("peer", peer);
ap.Logger.Info().Str("peer", peer)
parsed, err := parseFloatToUint64Weights(strconv.FormatFloat(value, 'f', -1, 64))
if err != nil {
ap.Logger.Error().Err(err).Msg("Error parsing uint")
Expand Down Expand Up @@ -221,7 +217,6 @@ func (ap *AppChain) SendUpdatedWeights(ctx context.Context, topicId uint64, resu
ap.Logger.Info().Str("txResp:", txResp.TxHash).Msg("weights sent to allora blockchain")
}


func parseFloatToUint64Weights(input string) (uint64, error) {
// Parse the string to a floating-point number
floatValue, err := strconv.ParseFloat(input, 64)
Expand Down Expand Up @@ -270,30 +265,3 @@ func extractWeights(stdout string) (map[string]float64, error) {

return weights.Weights, nil
}

func generateWorkersMap() map[string]string {
// TODO: Add query to get all workers from AppChain
workerMap := make(map[string]string)

peer1Address := os.Getenv("PEER_ADDRESS_1")
if peer1Address == "" {
peer1Address = "2xgSimWsrD59sW3fPxLo3ej2Q6dFNc6DRsWH5stnHB3bkaVTsHZjKDULEL"
}
peer2Address := os.Getenv("PEER_ADDRESS_2")
if peer2Address == "" {
peer2Address = "2xgSimWsrD59sW3fPxLo3ej2Q6dFNc6DRsWH5stnHB3bkaVTsHZjKDULEA"
}
worker1Address := os.Getenv("WORKER_ADDRESS_1")
if worker1Address == "" {
worker1Address = "upt16ar7k93c6razqcuvxdauzdlaz352sfjp2rpj3i"
}
worker2Address := os.Getenv("WORKER_ADDRESS_2")
if worker2Address == "" {
worker2Address = "upt16ar7k93c6razqcuvxdauzdlaz352sfjp2rpj3a"
}

workerMap[peer1Address] = worker1Address
workerMap[peer2Address] = worker2Address

return workerMap
}
2 changes: 1 addition & 1 deletion cmd/node/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func createExecutor(a api.API, appChainClient *AppChain) func(ctx echo.Context)
}

// Might be disabled if so we should log out
if appChainClient != nil && appChainClient.Config.SubmitTx {
if appChainClient != nil && appChainClient.Config.SubmitTx && res.Code == codes.OK {
// don't block the return to the consumer to send these to chain
go sendResultsToChain(ctx, a, *appChainClient, req, res)
} else {
Expand Down
8 changes: 7 additions & 1 deletion cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"os/signal"
"path/filepath"
"time"

"github.com/cockroachdb/pebble"
"github.com/labstack/echo/v4"
Expand Down Expand Up @@ -40,7 +41,7 @@ func run() int {
signal.Notify(sig, os.Interrupt)

// Initialize logging.
log := zerolog.New(os.Stderr).With().Timestamp().Logger().Level(zerolog.DebugLevel)
log := zerolog.New(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: time.RFC3339}).With().Timestamp().Logger().Level(zerolog.DebugLevel)

// Parse CLI flags and validate that the configuration is valid.
cfg := parseFlags()
Expand Down Expand Up @@ -255,6 +256,11 @@ func run() int {
server.POST("/api/v1/functions/install", api.Install)
server.POST("/api/v1/functions/requests/result", api.ExecutionResult)

apiCopy := api
dummyNode := &dummyNode{Node: node}
api.Node = dummyNode
server.POST("/api/v2/functions/execute", createExecutor(*apiCopy, appchain))

// Start API in a separate goroutine.
go func() {

Expand Down

0 comments on commit a5a378e

Please sign in to comment.