-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbatches.go
94 lines (72 loc) · 1.95 KB
/
batches.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package yadb
import (
"fmt"
"sync"
"sync/atomic"
"time"
log "github.com/sirupsen/logrus"
clickhouse "github.com/undiabler/clickhouse-driver"
)
// BatchWriter write records into table in async way with batching
type BatchWriter struct {
columns clickhouse.Columns
bulkItems int
ticker time.Duration
table string
work chan clickhouse.Row
done chan bool
closeFlag *int32
getConn func() clickhouse.Connector
}
// TODO: threadsafe workers map
var (
workers = map[*BatchWriter]bool{}
wg sync.WaitGroup
)
// NewBatchWriter return BatchWriter with working goroutine inside
func NewBatchWriter(table string, columns []string, bulkItems int, ticker time.Duration) (*BatchWriter, error) {
if bulkItems <= 1 {
return nil, fmt.Errorf("Bulk must be greater than 1. Have %d", bulkItems)
}
if len(columns) == 0 {
return nil, fmt.Errorf("No columns for request")
}
bw := new(BatchWriter)
bw.columns = columns
bw.bulkItems = bulkItems
bw.ticker = ticker
bw.table = table
bw.closeFlag = new(int32)
atomic.StoreInt32(bw.closeFlag, 1)
bw.work = make(chan clickhouse.Row, bulkItems)
bw.done = make(chan bool)
wg.Add(1)
// TODO: this is unsafe
workers[bw] = true
go bw.worker(ticker, &wg)
return bw, nil
}
// SetConn defines callback that should return Connector to run MultiInsert query
func (bw *BatchWriter) SetConn(f func() clickhouse.Connector) {
bw.getConn = f
}
// IsClosed check if BatchWriter is closed
func (bw *BatchWriter) IsClosed() bool {
return atomic.LoadInt32(bw.closeFlag) == 0
}
// Close close all chans, stop working goroutine.
func (bw *BatchWriter) Close() {
if atomic.SwapInt32(bw.closeFlag, 0) == 1 {
close(bw.work)
}
}
// CloseAll stop any working BatchWriters. Safely wait for inserting all queued records.
func CloseAll() {
log.Debug("Clickhouse goroutines exiting...")
// close all registered workers
for worker := range workers {
worker.Close()
}
// wait all goroutines to exit safely
wg.Wait()
}