This repository has been archived by the owner on Sep 25, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
admin.go
67 lines (61 loc) · 1.88 KB
/
admin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package limestone
import (
"context"
"fmt"
"reflect"
"github.com/ridge/limestone/client"
"github.com/ridge/limestone/kafka"
"github.com/ridge/limestone/meta"
"github.com/ridge/limestone/tlog"
"github.com/ridge/limestone/wire"
"github.com/ridge/must/v2"
"go.uber.org/zap"
)
// Bootstrap initializes a Kafka-based Limestone database:
// 1. writes an administrative transaction creating the given entities into the
// transaction log topic by the given name,
// 2. publishes a manifest referencing that topic.
//
// The administrative transaction created by Bootstrap bypassing restrictions on
// what producer can write to what section.
func Bootstrap(ctx context.Context, c kafka.Client, dbVersion int, topic string, obj ...any) error {
logger := tlog.Get(ctx)
if txn := adminTransaction(obj); txn != nil {
logger.Debug("Publishing bootstrap transaction", zap.Object("txn", *txn))
if err := client.PublishKafkaTransaction(ctx, c, topic, *txn); err != nil {
return fmt.Errorf("failed to bootstrap Limestone: %w", err)
}
}
manifest := wire.Manifest{
Version: dbVersion,
Topic: topic,
}
logger.Debug("Publishing bootstrap manifest", zap.Object("manifest", manifest))
if err := client.PublishKafkaManifest(ctx, c, manifest); err != nil {
return fmt.Errorf("failed to bootstrap Limestone: %w", err)
}
return nil
}
func adminTransaction(obj []any) *wire.Transaction {
cache := map[reflect.Type]meta.Struct{}
changes := wire.Changes{}
for _, o := range obj {
t := reflect.TypeOf(o)
s, ok := cache[t]
if !ok {
s = meta.Survey(t)
cache[t] = s
}
diff := must.OK1(wire.Encode(s, nil, o, nil))
byID := changes[s.DBName]
if byID == nil {
byID = wire.KindChanges{}
changes[s.DBName] = byID
}
byID[reflect.ValueOf(o).FieldByIndex(s.Identity().Index).String()] = diff
}
if len(changes) == 0 {
return nil
}
return &wire.Transaction{Changes: changes}
}