Skip to content
This repository has been archived by the owner on Sep 4, 2024. It is now read-only.

Commit

Permalink
Kenny/ora 1390 base define validation policy on reputernonces (#123)
Browse files Browse the repository at this point in the history
## Todo

- [ ] Unit test
- [ ] e2e test locally

---------

Co-authored-by: Diego Campo <xmariachi@gmail.com>
  • Loading branch information
kpeluso and xmariachi authored May 27, 2024
1 parent b9a71a0 commit 620044f
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 40 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ To build the image for the head:
docker build --pull -f docker/Dockerfile_head -t allora-inference-base:dev-head --build-arg "GH_TOKEN=${YOUR_GH_TOKEN}" --build-arg "BLS_EXTENSION_VER=${BLS_EXTENSION_VERSION}" .
```

Then to build the image for the worker:
To build the image for the worker:

```
docker build --pull -f docker/Dockerfile_worker -t allora-inference-base:dev-worker --build-arg "GH_TOKEN=${YOUR_GH_TOKEN}" --build-arg "BLS_EXTENSION_VER=${BLS_EXTENSION_VERSION}" .
Expand Down
228 changes: 197 additions & 31 deletions cmd/node/appchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,12 +358,142 @@ func (ap *AppChain) SendWorkerModeData(ctx context.Context, topicId uint64, resu
}()
}

// Can only look up the topic stakes of this many reputers at a time
const DEFAULT_MAX_REPUTERS_FOR_STAKE_QUERY = uint64(100)

// Only this number times MaxLimit (whose default is given above) of reputer stakes can be gathered at once
const MAX_NUMBER_STAKE_QUERIES_PER_REQUEST = uint64(3)

// Get the stake of each reputer in the given topic
func (ap *AppChain) getStakePerReputer(ctx context.Context, topicId uint64, reputerAddrs []*string) (map[string]cosmossdk_io_math.Int, error) {
maxReputers := DEFAULT_MAX_REPUTERS_FOR_STAKE_QUERY
params, err := ap.QueryClient.Params(ctx, &types.QueryParamsRequest{})
if err != nil {
ap.Logger.Error().Err(err).Uint64("topic", topicId).Msg("could not get chain params")
}
if err == nil {
maxReputers = params.Params.MaxLimit
}

numberRequestsForStake := MAX_NUMBER_STAKE_QUERIES_PER_REQUEST
var stakesPerReputer = make(map[string]cosmossdk_io_math.Int) // This will be populated with each request/loop below
for i := uint64(0); i < numberRequestsForStake; i++ {
// Dereference only the needed reputer addresses to get the actual strings
addresses := make([]string, 0)
start := i * maxReputers
end := (i + 1) * maxReputers
if end > uint64(len(reputerAddrs)) {
end = uint64(len(reputerAddrs))
}
if start >= end {
break
}
for _, addr := range reputerAddrs[start:end] {
if addr == nil {
return nil, fmt.Errorf("nil address in reputerAddrs")
}
addresses = append(addresses, *addr)
}
res, err := ap.QueryClient.GetMultiReputerStakeInTopic(ctx, &types.QueryMultiReputerStakeInTopicRequest{
TopicId: topicId,
Addresses: addresses,
})
if err != nil {
ap.Logger.Error().Err(err).Uint64("topic", topicId).Msg("could not get reputer stakes from the chain")
return nil, err
}

// Create a map of reputer addresses to their stakes
for _, stake := range res.Amounts {
stakesPerReputer[stake.Reputer] = stake.Amount
}
}

return stakesPerReputer, err
}

func (ap *AppChain) argmaxBlockByStake(
blockToReputer *map[int64][]string,
stakesPerReputer map[string]cosmossdk_io_math.Int,
) int64 {
// Find the current block height with the highest voting power
firstIter := true
highestVotingPower := cosmossdk_io_math.ZeroInt()
blockOfMaxPower := int64(-1)
for block, reputersWhoVotedForBlock := range *blockToReputer {
// Calc voting power of this candidate block by total voting reputer stake
blockVotingPower := cosmossdk_io_math.ZeroInt()
for _, reputerAddr := range reputersWhoVotedForBlock {
blockVotingPower = blockVotingPower.Add(stakesPerReputer[reputerAddr])
}

// Decide if voting power exceeds that of current front-runner
if firstIter || blockVotingPower.GT(highestVotingPower) {
blockOfMaxPower = block
}

firstIter = false
}

return blockOfMaxPower
}

func (ap *AppChain) argmaxBlockByCount(
blockToReputer *map[int64][]string,
) int64 {
// Find the current block height with the highest voting power
firstIter := true
highestVotingPower := cosmossdk_io_math.ZeroInt()
blockOfMaxPower := int64(-1)
for block, reputersWhoVotedForBlock := range *blockToReputer {
// Calc voting power of this candidate block by total reputer count
blockVotingPower := cosmossdk_io_math.NewInt(int64(len(reputersWhoVotedForBlock)))

// Decide if voting power exceeds that of current front-runner
if firstIter || blockVotingPower.GT(highestVotingPower) {
blockOfMaxPower = block
}

firstIter = false
}

return blockOfMaxPower
}

// Take stake-weighted vote of what the reputer leader thinks the current and eval block heights should be
func (ap *AppChain) getStakeWeightedBlockHeights(
ctx context.Context,
topicId uint64,
blockCurrentToReputer, blockEvalToReputer *map[int64][]string,
reputerAddrs []*string,
) (int64, int64, error) {
useWeightedVote := true
stakesPerReputer, err := ap.getStakePerReputer(ctx, topicId, reputerAddrs)
if err != nil {
ap.Logger.Error().Err(err).Uint64("topic", topicId).Msg("error getting reputer stakes from the chain => using unweighted vote")
// This removes a strict requirement for the reputer leader to have the correct stake
// at the cost of potentially allowing sybil attacks, though Blockless itself somewhat mitigates this
useWeightedVote = false
}

// Find the current and ev block height with the highest voting power
if useWeightedVote {
return ap.argmaxBlockByStake(blockCurrentToReputer, stakesPerReputer), ap.argmaxBlockByStake(blockEvalToReputer, stakesPerReputer), nil
} else {
return ap.argmaxBlockByCount(blockCurrentToReputer), ap.argmaxBlockByCount(blockEvalToReputer), nil
}
}

// Sending Losses to the AppChain
func (ap *AppChain) SendReputerModeData(ctx context.Context, topicId uint64, results aggregate.Results) {
// Aggregate the forecast from reputer leader
var valueBundles []*types.ReputerValueBundle
var reputerAddrs []*string
var reputerAddrSet = make(map[string]bool) // Prevents duplicate reputer addresses from being counted in vote tally
var nonceCurrent *types.Nonce
var nonceEval *types.Nonce
var blockCurrentToReputer = make(map[int64][]string) // map blockHeight to addresses of reputers who sent data for current block height
var blockEvalToReputer = make(map[int64][]string) // map blockHeight to addresses of reputers who sent data for eval block height

for _, result := range results {
if len(result.Peers) > 0 {
Expand All @@ -382,46 +512,82 @@ func (ap *AppChain) SendReputerModeData(ctx context.Context, topicId uint64, res
ap.Logger.Info().Str("Reputer Address", res.Address).Msg("Reputer Address")
}

// Parse the result from the reputer to get the inference and forecasts
// Parse the result from the worker to get the inference and forecasts
var value ReputerDataResponse
err = json.Unmarshal([]byte(result.Result.Stdout), &value)
if err != nil {
ap.Logger.Warn().Err(err).Str("peer", peer.String()).Msg("error extracting WorkerDataBundle from stdout, ignoring bundle.")
continue
}
if nonceCurrent == nil {
nonceCurrent = &types.Nonce{BlockHeight: value.BlockHeight}
}
if nonceEval == nil {
nonceEval = &types.Nonce{BlockHeight: value.BlockHeightEval}
}
if _, ok := reputerAddrSet[res.Address]; !ok {
reputerAddrSet[res.Address] = true

// Here reputer leader can choose to validate data further to ensure set is correct and act accordingly
if value.ReputerValueBundle == nil {
ap.Logger.Warn().Str("peer", peer.String()).Msg("ReputerValueBundle is nil from stdout, ignoring bundle.")
continue
}
if value.ReputerValueBundle.ValueBundle == nil {
ap.Logger.Warn().Str("peer", peer.String()).Msg("ValueBundle is nil from stdout, ignoring bundle.")
continue
}
if value.ReputerValueBundle.ValueBundle.TopicId != topicId {
ap.Logger.Warn().Str("peer", peer.String()).Msg("ReputerValueBundle topicId does not match with request topicId, ignoring bundle.")
continue
}
// Append the WorkerDataBundle (only) to the WorkerDataBundles slice
valueBundles = append(valueBundles, value.ReputerValueBundle)
// Parse the result from the reputer to get the losses
// Parse the result from the worker to get the inferences and forecasts
var value ReputerDataResponse
err = json.Unmarshal([]byte(result.Result.Stdout), &value)
if err != nil {
ap.Logger.Warn().Err(err).Str("peer", peer.String()).Str("Value", result.Result.Stdout).Msg("error extracting ReputerDataResponse from stdout, ignoring bundle.")
continue
}

// Here reputer leader can choose to validate data further to ensure set is correct and act accordingly
if value.ReputerValueBundle == nil {
ap.Logger.Warn().Str("peer", peer.String()).Msg("ReputerValueBundle is nil from stdout, ignoring bundle.")
continue
}
if value.ReputerValueBundle.ValueBundle == nil {
ap.Logger.Warn().Str("peer", peer.String()).Msg("ValueBundle is nil from stdout, ignoring bundle.")
continue
}
if value.ReputerValueBundle.ValueBundle.TopicId != topicId {
ap.Logger.Warn().Str("peer", peer.String()).Msg("ReputerValueBundle topicId does not match with request topicId, ignoring bundle.")
continue
}
// Append the WorkerDataBundle (only) to the WorkerDataBundles slice
valueBundles = append(valueBundles, value.ReputerValueBundle)
reputerAddrs = append(reputerAddrs, &res.Address)
blockCurrentToReputer[value.BlockHeight] = append(blockCurrentToReputer[value.BlockHeight], res.Address)
blockEvalToReputer[value.BlockHeightEval] = append(blockEvalToReputer[value.BlockHeightEval], res.Address)
}
} else {
ap.Logger.Warn().Msg("No peers in the result, ignoring")
}
}

if nonceCurrent == nil || nonceEval == nil {
ap.Logger.Error().Uint64("topic", topicId).Msg("No valid ReputerDataBundles with nonces found, not sending data to the chain")
if len(reputerAddrs) == 0 {
ap.Logger.Warn().Msg("No reputer addresses found, not sending data to the chain")
return
}

blockCurrentHeight, blockEvalHeight, err := ap.getStakeWeightedBlockHeights(ctx, topicId, &blockCurrentToReputer, &blockEvalToReputer, reputerAddrs)
if err != nil {
ap.Logger.Error().Err(err).Msg("could not get stake-weighted block heights, not sending data to the chain")
return
}
if blockCurrentHeight == -1 || blockEvalHeight == -1 {
ap.Logger.Error().Msg("could not get stake-weighted block heights, not sending data to the chain")
return
}
if blockCurrentHeight < blockEvalHeight {
ap.Logger.Error().Int64("blockCurrentHeight", blockCurrentHeight).Int64("blockEvalHeight", blockEvalHeight).Msg("blockCurrentHeight < blockEvalHeight, not sending data to the chain")
return
}
nonceCurrent = &types.Nonce{BlockHeight: blockCurrentHeight}
nonceEval = &types.Nonce{BlockHeight: blockEvalHeight}

// Remove those bundles that do not come from the current block height
var valueBundlesFiltered []*types.ReputerValueBundle

for _, valueBundle := range valueBundles {
if valueBundle.ValueBundle.ReputerRequestNonce.ReputerNonce.BlockHeight == blockCurrentHeight && valueBundle.ValueBundle.ReputerRequestNonce.WorkerNonce.BlockHeight == blockEvalHeight {
ap.Logger.Debug().
Str("reputer", valueBundle.ValueBundle.Reputer).
Str("nonce reputer", strconv.FormatInt(valueBundle.ValueBundle.ReputerRequestNonce.ReputerNonce.BlockHeight, 10)).
Str("nonce worker", strconv.FormatInt(valueBundle.ValueBundle.ReputerRequestNonce.WorkerNonce.BlockHeight, 10)).
Msg("Valid nonce, adding to valueBundlesFiltered")
valueBundlesFiltered = append(valueBundlesFiltered, valueBundle)
} else {
ap.Logger.Warn().
Str("reputer", valueBundle.ValueBundle.Reputer).
Str("nonce reputer", strconv.FormatInt(valueBundle.ValueBundle.ReputerRequestNonce.ReputerNonce.BlockHeight, 10)).
Str("nonce worker", strconv.FormatInt(valueBundle.ValueBundle.ReputerRequestNonce.WorkerNonce.BlockHeight, 10)).
Msg("Rejected Bundle, non-matching nonces.")
}
}

// Make 1 request per worker
req := &types.MsgInsertBulkReputerPayload{
Expand Down
6 changes: 3 additions & 3 deletions docker/Dockerfile_worker
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ RUN if [ -n $BLS_EXTENSION_VER]; then \
COPY --from=builder /src/dist/allora-node /usr/local/bin/allora-node
COPY --from=builder /src/dist/allora-keys /usr/local/bin/allora-keys

# Smoke test
RUN /app/runtime/bls-runtime --help && \
/app/runtime/extensions/allora-inference-extension --help
# # Smoke test
# RUN /app/runtime/bls-runtime --help && \
# /app/runtime/extensions/allora-inference-extension --help

RUN groupadd -g 1001 ${USERNAME} \
&& useradd -m -d ${APP_PATH} -u 1001 -g 1001 ${USERNAME} \
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22.2

require (
cosmossdk.io/math v1.3.0
github.com/allora-network/allora-chain v0.1.0-dev.a53b6d4.0.20240523180314-287ef36181fe
github.com/allora-network/allora-chain v0.1.0-dev.a53b6d4.0.20240526151956-574b2f09d09e
github.com/allora-network/b7s v0.0.2-0.20240418175046-eca9bfd68831
github.com/cockroachdb/pebble v1.1.0
github.com/cosmos/cosmos-sdk v0.50.5
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/allora-network/allora-chain v0.1.0-dev.a53b6d4.0.20240520155716-eb79d5859957 h1:9xJ3KMib5e4p+SXtl8Z0zLMNcJcijBMqVQKXIkLjUjk=
github.com/allora-network/allora-chain v0.1.0-dev.a53b6d4.0.20240520155716-eb79d5859957/go.mod h1:7UrL7qr/wLTnBBfTGZHHui9tjDfx89FvDj22YD2TVow=
github.com/allora-network/allora-chain v0.1.0-dev.a53b6d4.0.20240523180314-287ef36181fe h1:fbf7tNwCdhoYjUSMFSbRp3TCRJPcG6NLyWA0Q2Py/vk=
github.com/allora-network/allora-chain v0.1.0-dev.a53b6d4.0.20240523180314-287ef36181fe/go.mod h1:7UrL7qr/wLTnBBfTGZHHui9tjDfx89FvDj22YD2TVow=
github.com/allora-network/allora-chain v0.1.0-dev.a53b6d4.0.20240526151956-574b2f09d09e h1:8SQlhMZDDPm51KqDiGu5GEU/ZhYA+ZAgWOegfrxmiOI=
github.com/allora-network/allora-chain v0.1.0-dev.a53b6d4.0.20240526151956-574b2f09d09e/go.mod h1:7UrL7qr/wLTnBBfTGZHHui9tjDfx89FvDj22YD2TVow=
github.com/allora-network/b7s v0.0.2-0.20240418175046-eca9bfd68831 h1:4s9e1sjeHlqG4SWoV29vcf/WGX9KeATx1V38X4k2f+I=
github.com/allora-network/b7s v0.0.2-0.20240418175046-eca9bfd68831/go.mod h1:rJJrdC5Y83LEDFxo/iJp3JJpi8I6TJncOTigMWk8ieE=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
Expand Down

0 comments on commit 620044f

Please sign in to comment.