forked from streamingfast/bstream
-
Notifications
You must be signed in to change notification settings - Fork 0
/
reader.go
148 lines (127 loc) · 4.7 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// Copyright 2019 dfuse Platform Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bstream
import (
"fmt"
"io"
"os"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/dbin"
proto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)
// DBinBlockReader reads the dbin format where each element is assumed to be a `Block`.
type DBinBlockReader struct {
src *dbin.Reader
Header *dbin.Header
}
func NewDBinBlockReader(reader io.Reader) (out *DBinBlockReader, err error) {
return NewDBinBlockReaderWithValidation(reader, nil)
}
func NewDBinBlockReaderWithValidation(reader io.Reader, validateHeaderFunc func(contentType string) error) (out *DBinBlockReader, err error) {
dbinReader := dbin.NewReader(reader)
header, err := dbinReader.ReadHeader()
if err != nil {
return nil, fmt.Errorf("unable to read file header: %s", err)
}
if validateHeaderFunc != nil {
err = validateHeaderFunc(header.ContentType)
if err != nil {
return nil, err
}
}
return &DBinBlockReader{
src: dbinReader,
Header: header,
}, nil
}
func (l *DBinBlockReader) Read() (*pbbstream.Block, error) {
return readMessage(l, func(message []byte) (*pbbstream.Block, error) {
blk := new(pbbstream.Block)
if err := proto.Unmarshal(message, blk); err != nil {
return nil, fmt.Errorf("unable to read block proto: %s", err)
}
if err := supportLegacy(blk); err != nil {
return nil, fmt.Errorf("support legacy block: %s", err)
}
return blk, nil
})
}
// ReadAsBlockMeta reads the next message as a BlockMeta instead of as a Block leading
// to reduce memory constaint since the payload are "skipped". There is a memory pressure
// since we need to load the full block.
//
// But at least it's not persisent memory.
func (l *DBinBlockReader) ReadAsBlockMeta() (*pbbstream.BlockMeta, error) {
return readMessage(l, func(message []byte) (*pbbstream.BlockMeta, error) {
meta := new(pbbstream.BlockMeta)
err := proto.UnmarshalOptions{DiscardUnknown: true}.Unmarshal(message, meta)
if err != nil {
return nil, fmt.Errorf("unable to read block proto: %s", err)
}
if err := supportLegacyMeta(meta); err != nil {
return nil, fmt.Errorf("support legacy block meta: %s", err)
}
return meta, nil
})
}
func readMessage[T any](reader *DBinBlockReader, decoder func(message []byte) (T, error)) (out T, err error) {
message, err := reader.src.ReadMessage()
if len(message) > 0 {
return decoder(message)
}
if err == io.EOF {
return out, err
}
// In all other cases, we are in an error path
return out, fmt.Errorf("failed reading next dbin message: %s", err)
}
func supportLegacy(b *pbbstream.Block) error {
if b.Payload == nil {
b.Payload = &anypb.Any{}
switch b.PayloadKind {
case pbbstream.Protocol_EOS:
b.Payload.TypeUrl = "type.googleapis.com/sf.antelope.type.v1.Block"
case pbbstream.Protocol_ETH:
b.Payload.TypeUrl = "type.googleapis.com/sf.ethereum.type.v2.Block"
case pbbstream.Protocol_COSMOS:
b.Payload.TypeUrl = "type.googleapis.com/sf.cosmos.type.v1.Block"
case pbbstream.Protocol_SOLANA:
if _, ok := os.LookupEnv("ACCEPT_SOLANA_LEGACY_BLOCK_FORMAT"); ok {
b.Payload.TypeUrl = "type.googleapis.com/sf.solana.type.v1.Block"
break
}
return fmt.Errorf("old block format from Solana protocol not supported, migrate your blocks")
case pbbstream.Protocol_NEAR:
return fmt.Errorf("old block format from NEAR protocol not supported, migrate your blocks")
}
b.Payload.Value = b.PayloadBuffer
if b.Number > GetProtocolFirstStreamableBlock {
b.ParentNum = b.Number - 1
}
}
return nil
}
func supportLegacyMeta(b *pbbstream.BlockMeta) error {
if b.ParentNum == 0 {
// Boy, we cannot know with just parent num if it's a legacy block or not. This is because
// the parent num could be legitimately 0, and we would not know if it's filled or not.
// So, we use a hackish heuristic here, we check the block number, and if the difference
// between the two is greater than 15, we assume parent number should have been filled.
if b.Number > GetProtocolFirstStreamableBlock+15 {
return fmt.Errorf("old block format without a properly populated parent num are not supported, migrate your blocks")
}
}
return nil
}