forked from streamingfast/bstream
-
Notifications
You must be signed in to change notification settings - Fork 0
/
eternalsource.go
131 lines (106 loc) · 3.4 KB
/
eternalsource.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// Copyright 2019 dfuse Platform Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bstream
import (
"fmt"
"time"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/shutter"
"go.uber.org/zap"
)
type EternalSourceOption = func(s *EternalSource)
func EternalSourceWithLogger(logger *zap.Logger) EternalSourceOption {
return func(s *EternalSource) {
s.logger = logger
}
}
type EternalSourceStartBackAtBlock func() (BlockRef, error)
var eternalRestartWaitTime = time.Second * 2
type EternalSource struct {
*shutter.Shutter
sourceFromRefFactory SourceFromRefFactory
h Handler
startBackAt EternalSourceStartBackAtBlock
currentSource Source
restartDelay time.Duration
logger *zap.Logger
}
func NewEternalSource(sf SourceFromRefFactory, h Handler, opts ...EternalSourceOption) *EternalSource {
es := &EternalSource{
sourceFromRefFactory: sf,
h: h,
restartDelay: eternalRestartWaitTime,
logger: zlog,
}
for _, opt := range opts {
opt(es)
}
es.Shutter = shutter.New()
es.Shutter.OnTerminating(func(err error) {
if es.currentSource != nil {
es.currentSource.Shutdown(err)
}
})
return es
}
func NewDelegatingEternalSource(sf SourceFromRefFactory, startBackAt EternalSourceStartBackAtBlock, h Handler, opts ...EternalSourceOption) *EternalSource {
es := NewEternalSource(sf, h, opts...)
es.startBackAt = startBackAt
return es
}
func (s *EternalSource) SetLogger(logger *zap.Logger) {
s.logger = logger
}
func (s *EternalSource) Run() {
var lastProcessedBlockRef BlockRef = BlockRefEmpty
handler := s.h
// When `startBackAt` is **not** defined, we simply use an handler that record the last processed block ref that is feed upon restart
if s.startBackAt == nil {
handler = HandlerFunc(func(blk *pbbstream.Block, obj interface{}) error {
err := s.h.ProcessBlock(blk, obj)
if err != nil {
return err
}
lastProcessedBlockRef = NewBlockRef(blk.Id, blk.Number)
return nil
})
}
var err error
for {
if s.IsTerminating() {
return
}
s.logger.Info("starting run loop")
if s.startBackAt != nil {
lastProcessedBlockRef, err = s.startBackAt()
if err != nil {
s.onEternalSourceTermination(fmt.Errorf("failed to get start at block ref: %w", err))
return
}
}
s.logger.Debug("calling sourceFromRefFactory", zap.Stringer("last_processed_block", lastProcessedBlockRef))
src := s.sourceFromRefFactory(lastProcessedBlockRef, handler)
s.currentSource = src // we'll lock you some day
src.Run()
<-src.Terminating()
s.onEternalSourceTermination(src.Err())
}
}
func (s *EternalSource) onEternalSourceTermination(err error) {
if err != nil {
s.logger.Info("eternal source failed", zap.Error(err))
}
s.logger.Info("sleeping before restarting underlying source", zap.Duration("wait_time", s.restartDelay))
time.Sleep(s.restartDelay)
}