From 6c51ca3029d7e5616f59810f18c00118f91d0582 Mon Sep 17 00:00:00 2001 From: Derek Anderson Date: Thu, 25 Jan 2024 17:57:40 -0600 Subject: [PATCH] updates and disables the chain tx if unable to connect to an rpc (#33) going to smash this one through, but removes the requirement to connect to the app chain when running the node. ``` {"level":"info","component":"node","request":"1340ceab-d04e-45fa-8849-5d8a537b04a5","function":"bafybeiaugwh3mktzbnudurzk7jcyvmdhyy6jnairiu2phuf2t7c7orowjq","node_count":1,"cluster_size":1,"responded":1,"time":"2024-01-25T17:51:42-06:00","message":"received execution responses"} {"level":"debug","time":"2024-01-25T17:51:46-06:00","message":"inference results would have been submitted to chain"} ``` --- cmd/node/appchain.go | 44 +++++++++++++++------ cmd/node/execute.go | 94 ++++++++++++++++++++++++-------------------- cmd/node/main.go | 3 ++ cmd/node/types.go | 1 + 4 files changed, 89 insertions(+), 53 deletions(-) diff --git a/cmd/node/appchain.go b/cmd/node/appchain.go index 47af5a0..9c1047d 100644 --- a/cmd/node/appchain.go +++ b/cmd/node/appchain.go @@ -8,6 +8,7 @@ import ( "math" "net/http" "os" + "path/filepath" "strconv" cosmossdk_io_math "cosmossdk.io/math" @@ -22,25 +23,46 @@ func (ap *AppChain) start(ctx context.Context) { go ap.startClient(ctx, ap.Config) } -func (ap *AppChain) startClient(ctx context.Context, config AppChainConfig) { - client := ap.Client +func (ap *AppChain) startClient(ctx context.Context, config AppChainConfig) error { + userHomeDir, err := os.UserHomeDir() + if err != nil { + config.Logger.Warn().Err(err).Msg("could not get home directory for app chain") + return err + } + + DefaultNodeHome := filepath.Join(userHomeDir, ".uptd") + client, _ := cosmosclient.New(ctx, cosmosclient.WithAddressPrefix(ap.Config.AddressPrefix), cosmosclient.WithHome(DefaultNodeHome)) + + // this is terrible, no isConnected as part of this code path + if(len(client.Context().ChainID) < 1) { + return fmt.Errorf("client can not connect to allora blockchain") + } - account, err := client.Account(config.AddressKeyName) + account, err := client.Account(ap.Config.AddressKeyName) if err != nil { - config.Logger.Fatal().Err(err).Msg("could not retrieve allora blockchain account") - } + ap.Config.SubmitTx = false + config.Logger.Warn().Err(err).Msg("could not retrieve allora blockchain account, transactions will not be submitted to chain") + return err + } address, err := account.Address(config.AddressPrefix) if err != nil { - log.Fatal(err) - } + ap.Config.SubmitTx = false + config.Logger.Warn().Err(err).Msg("could not retrieve allora blockchain address, transactions will not be submitted to chain") + return err + } - if (!queryIsNodeRegistered(ctx, client, address, config)) { - // not registered, register the node - registerWithBlockchain(ctx, client, account, config) + if(config.SubmitTx) { + if (!queryIsNodeRegistered(ctx, client, address, config)) { + // not registered, register the node + registerWithBlockchain(ctx, client, account, config) + } + config.Logger.Info().Msg("allora blockchain registration verification complete") + } else { + config.Logger.Warn().Err(err).Msg("could not retrieve allora blockchain address, transactions will not be submitted to chain") } - config.Logger.Info().Msg("allora blockchain registration verification complete") + return nil } func (ap *AppChain) New() (*AppChain, error) { diff --git a/cmd/node/execute.go b/cmd/node/execute.go index c118e43..c68a66b 100644 --- a/cmd/node/execute.go +++ b/cmd/node/execute.go @@ -37,6 +37,51 @@ type ExecuteResult struct { RequestID string `json:"request_id,omitempty"` } +func sendResultsToChain(ctx echo.Context, a api.API, appChainClient AppChain, req ExecuteRequest, res ExecuteResponse){ + a.Log.Info().Msg("Sending inferences to appchain") + inferences := appChainClient.SendInferencesToAppChain(1, res.Results) + a.Log.Debug().Any("inferences", inferences).Msg("Inferences sent to appchain") + // Get the dependencies for the weights calculation + ethPrice, latestWeights := appChainClient.GetWeightsCalcDependencies(inferences) + + a.Log.Debug().Float64("ETH price: ", ethPrice) + a.Log.Debug().Float64("eth price", ethPrice) + a.Log.Debug().Any("latest weights", latestWeights) + a.Log.Debug().Any("inferences", inferences) + + // Format the payload for the weights calculation + var weightsReq map[string]interface{} = make(map[string]interface{}) + weightsReq["eth_price"] = ethPrice + weightsReq["inferences"] = inferences + weightsReq["latest_weights"] = latestWeights + payload, err := json.Marshal(weightsReq) + if err != nil { + a.Log.Error().Err(err).Msg("error marshalling weights request") + } + payloadCopy := string(payload) + a.Log.Debug().Any("payload: ", payloadCopy) + + // Calculate the weights + calcWeightsReq := execute.Request{ + FunctionID: "bafybeibuzoxt3jsf6mswlw5sq2o7cltxfpeodseduwhzrv4d33k32baaau", + Method: "eth-price-processing.wasm", + Config: execute.Config{ + Stdin: &payloadCopy, + }, + } + a.Log.Debug().Any("Executing weight adjusment function: ", calcWeightsReq.FunctionID) + + // Get the execution result. + _, _, weightsResults, _, err := a.Node.ExecuteFunction(ctx.Request().Context(), execute.Request(calcWeightsReq), "") + if err != nil { + a.Log.Warn().Str("function", req.FunctionID).Err(err).Msg("node failed to execute function") + } + a.Log.Debug().Any("weights results", weightsResults) + + // Transform the node response format to the one returned by the API. + appChainClient.SendUpdatedWeights(aggregate.Aggregate(weightsResults)) +} + func createExecutor(a api.API, appChainClient AppChain) func(ctx echo.Context) error { return func(ctx echo.Context) error { @@ -47,7 +92,7 @@ func createExecutor(a api.API, appChainClient AppChain) func(ctx echo.Context) e return echo.NewHTTPError(http.StatusBadRequest, fmt.Errorf("could not unpack request: %w", err)) } - a.Log.Debug().Str("Executing inference function: ", req.FunctionID) + a.Log.Debug().Str("executing inference function: ", req.FunctionID) // Get the execution result. code, id, results, cluster, err := a.Node.ExecuteFunction(ctx.Request().Context(), req.Request, req.Subgroup) @@ -68,48 +113,13 @@ func createExecutor(a api.API, appChainClient AppChain) func(ctx echo.Context) e res.Message = err.Error() } - a.Log.Info().Msg("Sending inferences to appchain") - inferences := appChainClient.SendInferencesToAppChain(1, res.Results) - a.Log.Debug().Any("inferences", inferences).Msg("Inferences sent to appchain") - // Get the dependencies for the weights calculation - ethPrice, latestWeights := appChainClient.GetWeightsCalcDependencies(inferences) - - a.Log.Debug().Float64("ETH price: ", ethPrice) - a.Log.Debug().Float64("eth price", ethPrice) - a.Log.Debug().Any("latest weights", latestWeights) - a.Log.Debug().Any("inferences", inferences) - - // Format the payload for the weights calculation - var weightsReq map[string]interface{} = make(map[string]interface{}) - weightsReq["eth_price"] = ethPrice - weightsReq["inferences"] = inferences - weightsReq["latest_weights"] = latestWeights - payload, err := json.Marshal(weightsReq) - if err != nil { - a.Log.Error().Err(err).Msg("error marshalling weights request") - } - payloadCopy := string(payload) - a.Log.Debug().Any("payload: ", payloadCopy) - - // Calculate the weights - calcWeightsReq := execute.Request{ - FunctionID: "bafybeibuzoxt3jsf6mswlw5sq2o7cltxfpeodseduwhzrv4d33k32baaau", - Method: "eth-price-processing.wasm", - Config: execute.Config{ - Stdin: &payloadCopy, - }, - } - a.Log.Debug().Any("Executing weight adjusment function: ", calcWeightsReq.FunctionID) - - // Get the execution result. - _, _, weightsResults, _, err := a.Node.ExecuteFunction(ctx.Request().Context(), execute.Request(calcWeightsReq), "") - if err != nil { - a.Log.Warn().Str("function", req.FunctionID).Err(err).Msg("node failed to execute function") + // Might be disabled if so we should log out + if(appChainClient.Config.SubmitTx) { + // don't block the return to the consumer to send these to chain + go sendResultsToChain(ctx, a, appChainClient, req, res) + } else { + appChainClient.Config.Logger.Debug().Msg("inference results would have been submitted to chain") } - a.Log.Debug().Any("weights results", weightsResults) - - // Transform the node response format to the one returned by the API. - appChainClient.SendUpdatedWeights(aggregate.Aggregate(weightsResults)) // Send the response. return ctx.JSON(http.StatusOK, res) diff --git a/cmd/node/main.go b/cmd/node/main.go index 27ba4aa..06185c4 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -200,9 +200,12 @@ func run() int { failed := make(chan struct{}) cfg.AppChainConfig.AddressPrefix = "upt" + cfg.AppChainConfig.Logger = log + appchain := &AppChain{ Config: cfg.AppChainConfig, } + appchain.start(ctx) // Start node main loop in a separate goroutine. diff --git a/cmd/node/types.go b/cmd/node/types.go index 4acf30d..3d9cd0d 100644 --- a/cmd/node/types.go +++ b/cmd/node/types.go @@ -35,6 +35,7 @@ type AppChainConfig struct { StringSeperator string // string seperator used for key identifiers in cosmos LibP2PKey string // the libp2p key used to sign offchain communications Logger zerolog.Logger + SubmitTx bool // do we need to commit these to the chain, might be a reason not to } type WorkerInference struct {