Skip to content
This repository has been archived by the owner on Sep 4, 2024. It is now read-only.

Commit

Permalink
Retry sending + clean WorkerInferences (#75)
Browse files Browse the repository at this point in the history
* Add sending retrial mechanisms on inferences and weights
* Cleaned the WorkerInferences type and usages, no longer in use. 
* SendInferences aligned with SendWeights in that they don't return
values.
  • Loading branch information
xmariachi authored Feb 19, 2024
1 parent 5d7b857 commit ff67a71
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 23 deletions.
59 changes: 42 additions & 17 deletions cmd/node/appchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path/filepath"
"strconv"
"strings"
"time"

cosmossdk_io_math "cosmossdk.io/math"
types "github.com/allora-network/allora-chain/x/emissions"
Expand Down Expand Up @@ -166,11 +167,48 @@ func queryIsNodeRegistered(appchain AppChain) bool {
return (len(queryResp.Nodes) >= 1)
}

// Retry function with a constant number of retries.
func (ap *AppChain) SendInferencesWithRetry(ctx context.Context, req *types.MsgProcessInferences, MaxRetries int, Delay int) (*cosmosclient.Response, error) {
var txResp *cosmosclient.Response
var err error

for retryCount := 0; retryCount <= MaxRetries; retryCount++ {
txResp, err := ap.Client.BroadcastTx(ctx, ap.ReputerAccount, req)
if err == nil {
ap.Logger.Info().Any("Tx Hash:", txResp.TxHash).Msg("successfully sent inferences to allora blockchain")
break
}
// Log the error for each retry.
ap.Logger.Info().Err(err).Msgf("Failed to send inferences to allora blockchain, retrying... (Retry %d/%d)", retryCount, MaxRetries)
// Wait for a short duration before retrying (you may customize this duration).
time.Sleep(time.Second)
}
return txResp, err
}

// Retry function with a constant number of retries.
func (ap *AppChain) SendWeightsWithRetry(ctx context.Context, req *types.MsgSetWeights, MaxRetries int, Delay int) (*cosmosclient.Response, error) {
var txResp *cosmosclient.Response
var err error

for retryCount := 0; retryCount <= MaxRetries; retryCount++ {
txResp, err := ap.Client.BroadcastTx(ctx, ap.ReputerAccount, req)
if err == nil {
ap.Logger.Info().Any("Tx Hash:", txResp.TxHash).Msg("successfully sent inferences to allora blockchain")
break
}
// Log the error for each retry.
ap.Logger.Info().Err(err).Msgf("Failed to send inferences to allora blockchain, retrying... (Retry %d/%d)", retryCount, MaxRetries)
// Wait for a short duration before retrying (you may customize this duration).
time.Sleep(time.Second)
}
return txResp, err
}

// Sending Inferences to the AppChain
func (ap *AppChain) SendInferences(ctx context.Context, topicId uint64, results aggregate.Results) []WorkerInference {
func (ap *AppChain) SendInferences(ctx context.Context, topicId uint64, results aggregate.Results) {
// Aggregate the inferences from all peers/workers
var inferences []*types.Inference
var workersInferences []WorkerInference

for _, result := range results {
for _, peer := range result.Peers {
Expand Down Expand Up @@ -200,7 +238,6 @@ func (ap *AppChain) SendInferences(ctx context.Context, topicId uint64, results
Value: cosmossdk_io_math.NewUint(parsed),
}
inferences = append(inferences, inference)
workersInferences = append(workersInferences, WorkerInference{Worker: inference.Worker, Inference: inference.Value})
}
}

Expand All @@ -209,14 +246,7 @@ func (ap *AppChain) SendInferences(ctx context.Context, topicId uint64, results
Inferences: inferences,
}

txResp, err := ap.Client.BroadcastTx(ctx, ap.ReputerAccount, req)
if err != nil {
ap.Logger.Info().Err(err).Msg("failed to send inferences to allora blockchain")
} else {
ap.Logger.Info().Any("Tx Hash:", txResp.TxHash).Msg("successfully sent inferences to allora blockchain")
}

return workersInferences
ap.SendInferencesWithRetry(ctx, req, 3, 0)
}

func (ap *AppChain) SendUpdatedWeights(ctx context.Context, topicId uint64, results aggregate.Results) {
Expand Down Expand Up @@ -255,12 +285,7 @@ func (ap *AppChain) SendUpdatedWeights(ctx context.Context, topicId uint64, resu
Weights: weights,
}

txResp, err := ap.Client.BroadcastTx(ctx, ap.ReputerAccount, req)
if err != nil {
ap.Logger.Info().Err(err).Msg("failed to send weights to allora blockchain")
} else {
ap.Logger.Info().Any("Tx Resp:", txResp).Msg("successfully sent weights to allora blockchain")
}
ap.SendWeightsWithRetry(ctx, req, 3, 0)
}

func parseFloatToUint64Weights(input string) (uint64, error) {
Expand Down
6 changes: 0 additions & 6 deletions cmd/node/types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
cosmossdk_io_math "cosmossdk.io/math"
types "github.com/allora-network/allora-chain/x/emissions"
"github.com/blocklessnetwork/b7s/config"
"github.com/blocklessnetwork/b7s/models/blockless"
Expand Down Expand Up @@ -41,11 +40,6 @@ type AppChainConfig struct {
ReconnectSeconds uint64 // seconds to wait for reconnection
}

type WorkerInference struct {
Worker string `json:"worker"`
Inference cosmossdk_io_math.Uint `json:"inference"`
}

type WeightsResponse struct {
Value string `json:"value"`
}
Expand Down

0 comments on commit ff67a71

Please sign in to comment.