This repository has been archived by the owner on Sep 25, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
submit.go
69 lines (61 loc) · 1.76 KB
/
submit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package limestone
import (
"context"
"encoding/json"
"fmt"
"reflect"
"github.com/ridge/limestone/tlog"
"github.com/ridge/limestone/typeddb"
"github.com/ridge/limestone/wire"
"github.com/ridge/must/v2"
"go.uber.org/zap"
)
func (db *DB) prepareDiff(eid typeddb.EID, change typeddb.Change) wire.Diff {
must.OK(eid.Kind.ValidateRequired(change.After))
return must.OK1(wire.Encode(eid.Kind.Struct, change.Before, change.After, func(index int, v1, v2 reflect.Value) error {
producers := eid.Kind.Fields[index].Producers
switch {
case len(producers) == 0:
return fmt.Errorf("writing by %s is denied", *db.source)
case !producers[db.source.Producer]:
return fmt.Errorf("writing by %s is denied (allowed for %s)", *db.source, producers)
}
return nil
}))
}
func (db *DB) prepareChanges(changes map[typeddb.EID]typeddb.Change) wire.Changes {
res := wire.Changes{}
for eid, change := range changes {
diff := db.prepareDiff(eid, change)
if diff == nil {
continue
}
byID := res[eid.Kind.DBName]
if byID == nil {
byID = wire.KindChanges{}
res[eid.Kind.DBName] = byID
}
byID[eid.ID] = diff
}
return res
}
func (db *DB) submit(ctx context.Context, tc typeddb.TransactionControl) error {
changes := db.prepareChanges(tc.Changes())
if len(changes) == 0 {
return nil
}
wireTransaction := wire.Transaction{
Source: *db.source,
Session: db.session,
Changes: changes,
}
annotations := tc.Annotations()
if len(annotations) != 0 {
wireTransaction.Audit = json.RawMessage(must.OK1(json.Marshal(annotations)))
}
tlog.Get(ctx).Debug("Submitting transaction", zap.Object("txn", wireTransaction))
if err := db.connection.Submit(ctx, wireTransaction); err != nil {
return fmt.Errorf("failed to submit transaction: %w", err)
}
return nil
}