Skip to content
This repository has been archived by the owner on Sep 4, 2024. It is now read-only.

Commit

Permalink
updates and disables the chain tx if unable to connect to an rpc (#33)
Browse files Browse the repository at this point in the history
going to smash this one through, but removes the requirement to connect
to the app chain when running the node.

```
{"level":"info","component":"node","request":"1340ceab-d04e-45fa-8849-5d8a537b04a5","function":"bafybeiaugwh3mktzbnudurzk7jcyvmdhyy6jnairiu2phuf2t7c7orowjq","node_count":1,"cluster_size":1,"responded":1,"time":"2024-01-25T17:51:42-06:00","message":"received execution responses"}
{"level":"debug","time":"2024-01-25T17:51:46-06:00","message":"inference results would have been submitted to chain"}
```
  • Loading branch information
dmikey authored Jan 25, 2024
1 parent 0a359d2 commit 6c51ca3
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 53 deletions.
44 changes: 33 additions & 11 deletions cmd/node/appchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math"
"net/http"
"os"
"path/filepath"
"strconv"

cosmossdk_io_math "cosmossdk.io/math"
Expand All @@ -22,25 +23,46 @@ func (ap *AppChain) start(ctx context.Context) {
go ap.startClient(ctx, ap.Config)
}

func (ap *AppChain) startClient(ctx context.Context, config AppChainConfig) {
client := ap.Client
func (ap *AppChain) startClient(ctx context.Context, config AppChainConfig) error {
userHomeDir, err := os.UserHomeDir()
if err != nil {
config.Logger.Warn().Err(err).Msg("could not get home directory for app chain")
return err
}

DefaultNodeHome := filepath.Join(userHomeDir, ".uptd")
client, _ := cosmosclient.New(ctx, cosmosclient.WithAddressPrefix(ap.Config.AddressPrefix), cosmosclient.WithHome(DefaultNodeHome))

// this is terrible, no isConnected as part of this code path
if(len(client.Context().ChainID) < 1) {
return fmt.Errorf("client can not connect to allora blockchain")
}

account, err := client.Account(config.AddressKeyName)
account, err := client.Account(ap.Config.AddressKeyName)
if err != nil {
config.Logger.Fatal().Err(err).Msg("could not retrieve allora blockchain account")
}
ap.Config.SubmitTx = false
config.Logger.Warn().Err(err).Msg("could not retrieve allora blockchain account, transactions will not be submitted to chain")
return err
}

address, err := account.Address(config.AddressPrefix)
if err != nil {
log.Fatal(err)
}
ap.Config.SubmitTx = false
config.Logger.Warn().Err(err).Msg("could not retrieve allora blockchain address, transactions will not be submitted to chain")
return err
}

if (!queryIsNodeRegistered(ctx, client, address, config)) {
// not registered, register the node
registerWithBlockchain(ctx, client, account, config)
if(config.SubmitTx) {
if (!queryIsNodeRegistered(ctx, client, address, config)) {
// not registered, register the node
registerWithBlockchain(ctx, client, account, config)
}
config.Logger.Info().Msg("allora blockchain registration verification complete")
} else {
config.Logger.Warn().Err(err).Msg("could not retrieve allora blockchain address, transactions will not be submitted to chain")
}

config.Logger.Info().Msg("allora blockchain registration verification complete")
return nil
}

func (ap *AppChain) New() (*AppChain, error) {
Expand Down
94 changes: 52 additions & 42 deletions cmd/node/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,51 @@ type ExecuteResult struct {
RequestID string `json:"request_id,omitempty"`
}

func sendResultsToChain(ctx echo.Context, a api.API, appChainClient AppChain, req ExecuteRequest, res ExecuteResponse){
a.Log.Info().Msg("Sending inferences to appchain")
inferences := appChainClient.SendInferencesToAppChain(1, res.Results)
a.Log.Debug().Any("inferences", inferences).Msg("Inferences sent to appchain")
// Get the dependencies for the weights calculation
ethPrice, latestWeights := appChainClient.GetWeightsCalcDependencies(inferences)

a.Log.Debug().Float64("ETH price: ", ethPrice)
a.Log.Debug().Float64("eth price", ethPrice)
a.Log.Debug().Any("latest weights", latestWeights)
a.Log.Debug().Any("inferences", inferences)

// Format the payload for the weights calculation
var weightsReq map[string]interface{} = make(map[string]interface{})
weightsReq["eth_price"] = ethPrice
weightsReq["inferences"] = inferences
weightsReq["latest_weights"] = latestWeights
payload, err := json.Marshal(weightsReq)
if err != nil {
a.Log.Error().Err(err).Msg("error marshalling weights request")
}
payloadCopy := string(payload)
a.Log.Debug().Any("payload: ", payloadCopy)

// Calculate the weights
calcWeightsReq := execute.Request{
FunctionID: "bafybeibuzoxt3jsf6mswlw5sq2o7cltxfpeodseduwhzrv4d33k32baaau",
Method: "eth-price-processing.wasm",
Config: execute.Config{
Stdin: &payloadCopy,
},
}
a.Log.Debug().Any("Executing weight adjusment function: ", calcWeightsReq.FunctionID)

// Get the execution result.
_, _, weightsResults, _, err := a.Node.ExecuteFunction(ctx.Request().Context(), execute.Request(calcWeightsReq), "")
if err != nil {
a.Log.Warn().Str("function", req.FunctionID).Err(err).Msg("node failed to execute function")
}
a.Log.Debug().Any("weights results", weightsResults)

// Transform the node response format to the one returned by the API.
appChainClient.SendUpdatedWeights(aggregate.Aggregate(weightsResults))
}

func createExecutor(a api.API, appChainClient AppChain) func(ctx echo.Context) error {
return func(ctx echo.Context) error {

Expand All @@ -47,7 +92,7 @@ func createExecutor(a api.API, appChainClient AppChain) func(ctx echo.Context) e
return echo.NewHTTPError(http.StatusBadRequest, fmt.Errorf("could not unpack request: %w", err))
}

a.Log.Debug().Str("Executing inference function: ", req.FunctionID)
a.Log.Debug().Str("executing inference function: ", req.FunctionID)

// Get the execution result.
code, id, results, cluster, err := a.Node.ExecuteFunction(ctx.Request().Context(), req.Request, req.Subgroup)
Expand All @@ -68,48 +113,13 @@ func createExecutor(a api.API, appChainClient AppChain) func(ctx echo.Context) e
res.Message = err.Error()
}

a.Log.Info().Msg("Sending inferences to appchain")
inferences := appChainClient.SendInferencesToAppChain(1, res.Results)
a.Log.Debug().Any("inferences", inferences).Msg("Inferences sent to appchain")
// Get the dependencies for the weights calculation
ethPrice, latestWeights := appChainClient.GetWeightsCalcDependencies(inferences)

a.Log.Debug().Float64("ETH price: ", ethPrice)
a.Log.Debug().Float64("eth price", ethPrice)
a.Log.Debug().Any("latest weights", latestWeights)
a.Log.Debug().Any("inferences", inferences)

// Format the payload for the weights calculation
var weightsReq map[string]interface{} = make(map[string]interface{})
weightsReq["eth_price"] = ethPrice
weightsReq["inferences"] = inferences
weightsReq["latest_weights"] = latestWeights
payload, err := json.Marshal(weightsReq)
if err != nil {
a.Log.Error().Err(err).Msg("error marshalling weights request")
}
payloadCopy := string(payload)
a.Log.Debug().Any("payload: ", payloadCopy)

// Calculate the weights
calcWeightsReq := execute.Request{
FunctionID: "bafybeibuzoxt3jsf6mswlw5sq2o7cltxfpeodseduwhzrv4d33k32baaau",
Method: "eth-price-processing.wasm",
Config: execute.Config{
Stdin: &payloadCopy,
},
}
a.Log.Debug().Any("Executing weight adjusment function: ", calcWeightsReq.FunctionID)

// Get the execution result.
_, _, weightsResults, _, err := a.Node.ExecuteFunction(ctx.Request().Context(), execute.Request(calcWeightsReq), "")
if err != nil {
a.Log.Warn().Str("function", req.FunctionID).Err(err).Msg("node failed to execute function")
// Might be disabled if so we should log out
if(appChainClient.Config.SubmitTx) {
// don't block the return to the consumer to send these to chain
go sendResultsToChain(ctx, a, appChainClient, req, res)
} else {
appChainClient.Config.Logger.Debug().Msg("inference results would have been submitted to chain")
}
a.Log.Debug().Any("weights results", weightsResults)

// Transform the node response format to the one returned by the API.
appChainClient.SendUpdatedWeights(aggregate.Aggregate(weightsResults))

// Send the response.
return ctx.JSON(http.StatusOK, res)
Expand Down
3 changes: 3 additions & 0 deletions cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,12 @@ func run() int {
failed := make(chan struct{})

cfg.AppChainConfig.AddressPrefix = "upt"
cfg.AppChainConfig.Logger = log

appchain := &AppChain{
Config: cfg.AppChainConfig,
}

appchain.start(ctx)

// Start node main loop in a separate goroutine.
Expand Down
1 change: 1 addition & 0 deletions cmd/node/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type AppChainConfig struct {
StringSeperator string // string seperator used for key identifiers in cosmos
LibP2PKey string // the libp2p key used to sign offchain communications
Logger zerolog.Logger
SubmitTx bool // do we need to commit these to the chain, might be a reason not to
}

type WorkerInference struct {
Expand Down

0 comments on commit 6c51ca3

Please sign in to comment.