diff --git a/cmd/node/appchain.go b/cmd/node/appchain.go index 9d049d7..f640527 100644 --- a/cmd/node/appchain.go +++ b/cmd/node/appchain.go @@ -10,6 +10,7 @@ import ( "path/filepath" "strconv" "strings" + "time" cosmossdk_io_math "cosmossdk.io/math" types "github.com/allora-network/allora-chain/x/emissions" @@ -166,11 +167,48 @@ func queryIsNodeRegistered(appchain AppChain) bool { return (len(queryResp.Nodes) >= 1) } +// Retry function with a constant number of retries. +func (ap *AppChain) SendInferencesWithRetry(ctx context.Context, req *types.MsgProcessInferences, MaxRetries int, Delay int) (*cosmosclient.Response, error) { + var txResp *cosmosclient.Response + var err error + + for retryCount := 0; retryCount <= MaxRetries; retryCount++ { + txResp, err := ap.Client.BroadcastTx(ctx, ap.ReputerAccount, req) + if err == nil { + ap.Logger.Info().Any("Tx Hash:", txResp.TxHash).Msg("successfully sent inferences to allora blockchain") + break + } + // Log the error for each retry. + ap.Logger.Info().Err(err).Msgf("Failed to send inferences to allora blockchain, retrying... (Retry %d/%d)", retryCount, MaxRetries) + // Wait for a short duration before retrying (you may customize this duration). + time.Sleep(time.Second) + } + return txResp, err +} + +// Retry function with a constant number of retries. +func (ap *AppChain) SendWeightsWithRetry(ctx context.Context, req *types.MsgSetWeights, MaxRetries int, Delay int) (*cosmosclient.Response, error) { + var txResp *cosmosclient.Response + var err error + + for retryCount := 0; retryCount <= MaxRetries; retryCount++ { + txResp, err := ap.Client.BroadcastTx(ctx, ap.ReputerAccount, req) + if err == nil { + ap.Logger.Info().Any("Tx Hash:", txResp.TxHash).Msg("successfully sent inferences to allora blockchain") + break + } + // Log the error for each retry. + ap.Logger.Info().Err(err).Msgf("Failed to send inferences to allora blockchain, retrying... (Retry %d/%d)", retryCount, MaxRetries) + // Wait for a short duration before retrying (you may customize this duration). + time.Sleep(time.Second) + } + return txResp, err +} + // Sending Inferences to the AppChain -func (ap *AppChain) SendInferences(ctx context.Context, topicId uint64, results aggregate.Results) []WorkerInference { +func (ap *AppChain) SendInferences(ctx context.Context, topicId uint64, results aggregate.Results) { // Aggregate the inferences from all peers/workers var inferences []*types.Inference - var workersInferences []WorkerInference for _, result := range results { for _, peer := range result.Peers { @@ -200,7 +238,6 @@ func (ap *AppChain) SendInferences(ctx context.Context, topicId uint64, results Value: cosmossdk_io_math.NewUint(parsed), } inferences = append(inferences, inference) - workersInferences = append(workersInferences, WorkerInference{Worker: inference.Worker, Inference: inference.Value}) } } @@ -209,14 +246,7 @@ func (ap *AppChain) SendInferences(ctx context.Context, topicId uint64, results Inferences: inferences, } - txResp, err := ap.Client.BroadcastTx(ctx, ap.ReputerAccount, req) - if err != nil { - ap.Logger.Info().Err(err).Msg("failed to send inferences to allora blockchain") - } else { - ap.Logger.Info().Any("Tx Hash:", txResp.TxHash).Msg("successfully sent inferences to allora blockchain") - } - - return workersInferences + ap.SendInferencesWithRetry(ctx, req, 3, 0) } func (ap *AppChain) SendUpdatedWeights(ctx context.Context, topicId uint64, results aggregate.Results) { @@ -255,12 +285,7 @@ func (ap *AppChain) SendUpdatedWeights(ctx context.Context, topicId uint64, resu Weights: weights, } - txResp, err := ap.Client.BroadcastTx(ctx, ap.ReputerAccount, req) - if err != nil { - ap.Logger.Info().Err(err).Msg("failed to send weights to allora blockchain") - } else { - ap.Logger.Info().Any("Tx Resp:", txResp).Msg("successfully sent weights to allora blockchain") - } + ap.SendWeightsWithRetry(ctx, req, 3, 0) } func parseFloatToUint64Weights(input string) (uint64, error) { diff --git a/cmd/node/types.go b/cmd/node/types.go index d814d92..0612201 100644 --- a/cmd/node/types.go +++ b/cmd/node/types.go @@ -1,7 +1,6 @@ package main import ( - cosmossdk_io_math "cosmossdk.io/math" types "github.com/allora-network/allora-chain/x/emissions" "github.com/blocklessnetwork/b7s/config" "github.com/blocklessnetwork/b7s/models/blockless" @@ -41,11 +40,6 @@ type AppChainConfig struct { ReconnectSeconds uint64 // seconds to wait for reconnection } -type WorkerInference struct { - Worker string `json:"worker"` - Inference cosmossdk_io_math.Uint `json:"inference"` -} - type WeightsResponse struct { Value string `json:"value"` }