Skip to content
This repository has been archived by the owner on Sep 4, 2024. It is now read-only.

updates and disables the chain tx if unable to connect to an rpc #33

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 33 additions & 11 deletions cmd/node/appchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math"
"net/http"
"os"
"path/filepath"
"strconv"

cosmossdk_io_math "cosmossdk.io/math"
Expand All @@ -22,25 +23,46 @@ func (ap *AppChain) start(ctx context.Context) {
go ap.startClient(ctx, ap.Config)
}

func (ap *AppChain) startClient(ctx context.Context, config AppChainConfig) {
client := ap.Client
func (ap *AppChain) startClient(ctx context.Context, config AppChainConfig) error {
userHomeDir, err := os.UserHomeDir()
if err != nil {
config.Logger.Warn().Err(err).Msg("could not get home directory for app chain")
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these should wrap error and return, and uppermost function should log. Right now, the function is ran as a goroutine and its return value never checked.

Upper function should be something like:

func (ap *AppChain) start(ctx context.Context) {
	go func() {
		err := ap.startClient(ctx, ap.Config)
		if err != nil {
			ap.Config.Logger.Error().Err(err).Msg("appchain client start failed")
		}
	}()
}

(with logger outside of the config)

}

DefaultNodeHome := filepath.Join(userHomeDir, ".uptd")
client, _ := cosmosclient.New(ctx, cosmosclient.WithAddressPrefix(ap.Config.AddressPrefix), cosmosclient.WithHome(DefaultNodeHome))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's no chain available, this will fail first. Error for me was error while requesting node 'http://localhost:26657': post failed: Post \"http://localhost:26657\": dial tcp 127.0.0.1:26657: connect: connection refused.

Few issues I think:

  1. Right now SubmitTx is never initialized to true? It starts off as and will forever be false.
  2. I think we have some distinct scenarios:
    a) chain being there or not
    b) me wanting to use chain or not

Combination of those two is possible:
a) chain is there but I don't want to use it
b) chain is there, and I do want to use it. If I can't - it's an error
c) chain is not there, and I don't want to use it
d) chain is not there, and I do want to use it - error

Scenarios b) and d) I think are clear errors that should result in stopping the process, not silently continuing. IMO, not even logging and continuing.


// this is terrible, no isConnected as part of this code path
if(len(client.Context().ChainID) < 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code path doesn't clear the SubmitTx boolean. It is never set in the first place either, but if that is fixed, this should also clear it.

return fmt.Errorf("client can not connect to allora blockchain")
}

account, err := client.Account(config.AddressKeyName)
account, err := client.Account(ap.Config.AddressKeyName)
if err != nil {
config.Logger.Fatal().Err(err).Msg("could not retrieve allora blockchain account")
}
ap.Config.SubmitTx = false
config.Logger.Warn().Err(err).Msg("could not retrieve allora blockchain account, transactions will not be submitted to chain")
return err
}

address, err := account.Address(config.AddressPrefix)
if err != nil {
log.Fatal(err)
}
ap.Config.SubmitTx = false
config.Logger.Warn().Err(err).Msg("could not retrieve allora blockchain address, transactions will not be submitted to chain")
return err
}

if (!queryIsNodeRegistered(ctx, client, address, config)) {
// not registered, register the node
registerWithBlockchain(ctx, client, account, config)
if(config.SubmitTx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug because of how this function is used - config is a function parameter, while there's also ap.Config used.

This line should probably use ap.Config.SubmitTx (that's the one that's changed in the upper conditions).

Also, if you invert the function, you can get rid of the nesting:

	if !ap.Config.SubmitTx {
		config.Logger.Warn().Err(err).Msg("could not retrieve allora blockchain address, transactions will not be submitted to chain")
		return nil
	}

	if !queryIsNodeRegistered(ctx, client, address, config) {
		// not registered, register the node
		registerWithBlockchain(ctx, client, account, config)
	}
	

if (!queryIsNodeRegistered(ctx, client, address, config)) {
// not registered, register the node
registerWithBlockchain(ctx, client, account, config)
}
config.Logger.Info().Msg("allora blockchain registration verification complete")
} else {
config.Logger.Warn().Err(err).Msg("could not retrieve allora blockchain address, transactions will not be submitted to chain")
}

config.Logger.Info().Msg("allora blockchain registration verification complete")
return nil
}

func (ap *AppChain) New() (*AppChain, error) {
Expand Down
94 changes: 52 additions & 42 deletions cmd/node/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,51 @@ type ExecuteResult struct {
RequestID string `json:"request_id,omitempty"`
}

func sendResultsToChain(ctx echo.Context, a api.API, appChainClient AppChain, req ExecuteRequest, res ExecuteResponse){
a.Log.Info().Msg("Sending inferences to appchain")
inferences := appChainClient.SendInferencesToAppChain(1, res.Results)
a.Log.Debug().Any("inferences", inferences).Msg("Inferences sent to appchain")
// Get the dependencies for the weights calculation
ethPrice, latestWeights := appChainClient.GetWeightsCalcDependencies(inferences)

a.Log.Debug().Float64("ETH price: ", ethPrice)
a.Log.Debug().Float64("eth price", ethPrice)
a.Log.Debug().Any("latest weights", latestWeights)
a.Log.Debug().Any("inferences", inferences)

// Format the payload for the weights calculation
var weightsReq map[string]interface{} = make(map[string]interface{})
weightsReq["eth_price"] = ethPrice
weightsReq["inferences"] = inferences
weightsReq["latest_weights"] = latestWeights
payload, err := json.Marshal(weightsReq)
if err != nil {
a.Log.Error().Err(err).Msg("error marshalling weights request")
}
payloadCopy := string(payload)
a.Log.Debug().Any("payload: ", payloadCopy)

// Calculate the weights
calcWeightsReq := execute.Request{
FunctionID: "bafybeibuzoxt3jsf6mswlw5sq2o7cltxfpeodseduwhzrv4d33k32baaau",
Method: "eth-price-processing.wasm",
Config: execute.Config{
Stdin: &payloadCopy,
},
}
a.Log.Debug().Any("Executing weight adjusment function: ", calcWeightsReq.FunctionID)

// Get the execution result.
_, _, weightsResults, _, err := a.Node.ExecuteFunction(ctx.Request().Context(), execute.Request(calcWeightsReq), "")
if err != nil {
a.Log.Warn().Str("function", req.FunctionID).Err(err).Msg("node failed to execute function")
}
a.Log.Debug().Any("weights results", weightsResults)

// Transform the node response format to the one returned by the API.
appChainClient.SendUpdatedWeights(aggregate.Aggregate(weightsResults))
}

func createExecutor(a api.API, appChainClient AppChain) func(ctx echo.Context) error {
return func(ctx echo.Context) error {

Expand All @@ -47,7 +92,7 @@ func createExecutor(a api.API, appChainClient AppChain) func(ctx echo.Context) e
return echo.NewHTTPError(http.StatusBadRequest, fmt.Errorf("could not unpack request: %w", err))
}

a.Log.Debug().Str("Executing inference function: ", req.FunctionID)
a.Log.Debug().Str("executing inference function: ", req.FunctionID)

// Get the execution result.
code, id, results, cluster, err := a.Node.ExecuteFunction(ctx.Request().Context(), req.Request, req.Subgroup)
Expand All @@ -68,48 +113,13 @@ func createExecutor(a api.API, appChainClient AppChain) func(ctx echo.Context) e
res.Message = err.Error()
}

a.Log.Info().Msg("Sending inferences to appchain")
inferences := appChainClient.SendInferencesToAppChain(1, res.Results)
a.Log.Debug().Any("inferences", inferences).Msg("Inferences sent to appchain")
// Get the dependencies for the weights calculation
ethPrice, latestWeights := appChainClient.GetWeightsCalcDependencies(inferences)

a.Log.Debug().Float64("ETH price: ", ethPrice)
a.Log.Debug().Float64("eth price", ethPrice)
a.Log.Debug().Any("latest weights", latestWeights)
a.Log.Debug().Any("inferences", inferences)

// Format the payload for the weights calculation
var weightsReq map[string]interface{} = make(map[string]interface{})
weightsReq["eth_price"] = ethPrice
weightsReq["inferences"] = inferences
weightsReq["latest_weights"] = latestWeights
payload, err := json.Marshal(weightsReq)
if err != nil {
a.Log.Error().Err(err).Msg("error marshalling weights request")
}
payloadCopy := string(payload)
a.Log.Debug().Any("payload: ", payloadCopy)

// Calculate the weights
calcWeightsReq := execute.Request{
FunctionID: "bafybeibuzoxt3jsf6mswlw5sq2o7cltxfpeodseduwhzrv4d33k32baaau",
Method: "eth-price-processing.wasm",
Config: execute.Config{
Stdin: &payloadCopy,
},
}
a.Log.Debug().Any("Executing weight adjusment function: ", calcWeightsReq.FunctionID)

// Get the execution result.
_, _, weightsResults, _, err := a.Node.ExecuteFunction(ctx.Request().Context(), execute.Request(calcWeightsReq), "")
if err != nil {
a.Log.Warn().Str("function", req.FunctionID).Err(err).Msg("node failed to execute function")
// Might be disabled if so we should log out
if(appChainClient.Config.SubmitTx) {
// don't block the return to the consumer to send these to chain
go sendResultsToChain(ctx, a, appChainClient, req, res)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a bug.. You want to do this async, in the background, but you use the echo.Context as a context. In the next function you use ctx.Request().Context(). From the docs:

// For incoming server requests, the context is canceled when the
// client's connection closes, the request is canceled (with HTTP/2),
// or when the ServeHTTP method returns.

So, if the client drops off OR - it simply received our response, is content and closes the connection => this context gets canceled, and we may not complete whatever it is we set out to do. This should be context.Background() if we want it truly to run in the background.

} else {
appChainClient.Config.Logger.Debug().Msg("inference results would have been submitted to chain")
}
a.Log.Debug().Any("weights results", weightsResults)

// Transform the node response format to the one returned by the API.
appChainClient.SendUpdatedWeights(aggregate.Aggregate(weightsResults))

// Send the response.
return ctx.JSON(http.StatusOK, res)
Expand Down
3 changes: 3 additions & 0 deletions cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,12 @@ func run() int {
failed := make(chan struct{})

cfg.AppChainConfig.AddressPrefix = "upt"
cfg.AppChainConfig.Logger = log

appchain := &AppChain{
Config: cfg.AppChainConfig,
}

appchain.start(ctx)

// Start node main loop in a separate goroutine.
Expand Down
1 change: 1 addition & 0 deletions cmd/node/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type AppChainConfig struct {
StringSeperator string // string seperator used for key identifiers in cosmos
LibP2PKey string // the libp2p key used to sign offchain communications
Logger zerolog.Logger
SubmitTx bool // do we need to commit these to the chain, might be a reason not to
}

type WorkerInference struct {
Expand Down
Loading