-
Notifications
You must be signed in to change notification settings - Fork 0
/
collector.go
83 lines (67 loc) · 2.18 KB
/
collector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package merkle_script
import (
"encoding/hex"
"encoding/json"
"fmt"
"github.com/KYVENetwork/ksync/collectors/bundles"
"github.com/KYVENetwork/ksync/types"
"github.com/KYVENetwork/ksync/utils"
"strconv"
"time"
)
var logger = MerkleLogger("pool")
func StartBundleCollector(merkleCh chan<- []MerkleRootEntry, errorCh chan<- error, chainRest, storageRest string, pool types.PoolResponse, runtime string, targetBundleId int) {
paginationKey := ""
for {
var merkleRoots []MerkleRootEntry
bundlesPage, nextKey, err := bundles.GetFinalizedBundlesPage(chainRest, pool.Pool.Id, 10, paginationKey, false)
if err != nil {
errorCh <- fmt.Errorf("failed to get finalized bundles page: %w", err)
return
}
for _, finalizedBundle := range bundlesPage {
deflated, err := bundles.GetDataFromFinalizedBundle(finalizedBundle, storageRest)
if err != nil {
errorCh <- fmt.Errorf("failed to get data from finalized bundle: %w", err)
return
}
// parse bundle
var bundle types.Bundle
if err := json.Unmarshal(deflated, &bundle); err != nil {
errorCh <- fmt.Errorf("failed to unmarshal tendermint bundle: %w", err)
return
}
bundleId, err := strconv.Atoi(finalizedBundle.Id)
if err != nil {
errorCh <- fmt.Errorf("failed to convert ID from finalized bundle to string: %w", err)
return
}
var leafHashes [][32]byte
leafHashes = BundleToHashes(bundle, runtime)
merkleRoot := GenerateMerkleRoot(&leafHashes)
logger.Info().
Int("bundle-id", bundleId).
Int64("pool-id", pool.Pool.Id).
Str("root", hex.EncodeToString(merkleRoot[:])).
Msg("computed Merkle root")
merkleRoots = append(merkleRoots, MerkleRootEntry{
BundleId: bundleId,
MerkleRoot: merkleRoot,
})
if targetBundleId != 0 && bundleId >= targetBundleId {
logger.Info().
Int("target-height", targetBundleId).
Msg("reached target bundle")
merkleCh <- merkleRoots
return
}
}
merkleCh <- merkleRoots
if nextKey == "" {
// if there is no new page we do not continue
panic("reached latest bundle on pool, target bundle ID was higher than latest pool bundle")
}
time.Sleep(utils.RequestTimeoutMS)
paginationKey = nextKey
}
}