forked from streamingfast/bstream
-
Notifications
You must be signed in to change notification settings - Fork 0
/
single_block_fetcher.go
116 lines (104 loc) · 2.61 KB
/
single_block_fetcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package bstream
import (
"context"
"fmt"
"strings"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/dstore"
"go.uber.org/zap"
)
func FetchBlockFromOneBlockStore(
ctx context.Context,
num uint64,
id string,
store dstore.Store,
) (*pbbstream.Block, error) {
if obfs, err := listOneBlocks(ctx, num, num+1, store); err == nil {
canonicalID := NormalizeBlockID(id)
for _, obf := range obfs {
if strings.HasSuffix(canonicalID, obf.ID) {
data, err := obf.Data(ctx, OneBlockDownloaderFromStore(store))
if err != nil {
return nil, err
}
return decodeOneblockfileData(data)
}
}
}
return nil, dstore.ErrNotFound
}
func FetchBlockMetaFromOneBlockStore(
ctx context.Context,
num uint64,
id string,
store dstore.Store,
) (*pbbstream.BlockMeta, error) {
if obfs, err := listOneBlocks(ctx, num, num+1, store); err == nil {
canonicalID := NormalizeBlockID(id)
for _, obf := range obfs {
if strings.HasSuffix(canonicalID, obf.ID) {
data, err := obf.Data(ctx, OneBlockDownloaderFromStore(store))
if err != nil {
return nil, err
}
return decodeOneblockfileToBlockMeta(data)
}
}
}
return nil, dstore.ErrNotFound
}
// FetchBlockMetaByHashFromOneBlockStore fetches a block meta by its hash from a single block store.
// It will list all the blocks in the store and find the one that matches the hash. If the
// block is not found, it returns `nil, nil`.
func FetchBlockMetaByHashFromOneBlockStore(
ctx context.Context,
id string,
store dstore.Store,
) (*pbbstream.BlockMeta, error) {
canonicalID := NormalizeBlockID(id)
isBlockHash := func(file *OneBlockFile) bool {
return NormalizeBlockID(file.ID) == canonicalID
}
obf, err := findOneBlockFile(ctx, store, isBlockHash)
if err != nil {
return nil, fmt.Errorf("find one block file: %w", err)
}
if obf == nil {
return nil, nil
}
data, err := obf.Data(ctx, OneBlockDownloaderFromStore(store))
if err != nil {
return nil, fmt.Errorf("download one block data: %w", err)
}
return decodeOneblockfileToBlockMeta(data)
}
func FetchBlockFromMergedBlocksStore(
ctx context.Context,
num uint64,
store dstore.Store,
) (*pbbstream.Block, error) {
var foundBlock *pbbstream.Block
h := HandlerFunc(func(blk *pbbstream.Block, _ interface{}) error {
if blk.Number < num {
return nil
}
if blk.Number > num {
return dstore.StopIteration
}
foundBlock = blk
return nil
})
fs := NewFileSource(
store,
num,
h,
zap.NewNop(),
FileSourceWithStopBlock(num),
)
fs.Run()
<-fs.Terminated()
if foundBlock != nil {
return foundBlock, nil
}
return nil, dstore.ErrNotFound
}