Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
islamaliev committed Jun 20, 2024
1 parent 597352d commit 49fa643
Show file tree
Hide file tree
Showing 28 changed files with 359 additions and 55 deletions.
1 change: 1 addition & 0 deletions cli/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func setContextDocEncryptionKey(cmd *cobra.Command, shouldEncrypt bool, txn data
if txn != nil {
ctx = encryption.ContextWithStore(ctx, txn)

Check failure on line 172 in cli/utils.go

View workflow job for this annotation

GitHub Actions / Lint GoLang job

ineffectual assignment to ctx (ineffassign)
}
ctx = encryption.SetContextConfig(cmd.Context(), encryption.DocEncConfig{IsEncrypted: true})
cmd.SetContext(ctx)
}

Expand Down
19 changes: 6 additions & 13 deletions internal/core/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ type Block struct {
Delta crdt.CRDT
// Links are the links to other blocks in the DAG.
Links []DAGLink
// TODO: see if it's better to keep it here instead of inside of delta
IsEncrypted *bool
}

// IPLDSchemaBytes returns the IPLD schema representation for the block.
Expand All @@ -111,8 +113,9 @@ type Block struct {
func (b Block) IPLDSchemaBytes() []byte {
return []byte(`
type Block struct {
delta CRDT
links [ DAGLink ]
delta CRDT
links [ DAGLink ]
isEncrypted optional Bool
}`)
}

Expand Down Expand Up @@ -143,19 +146,9 @@ func New(delta core.Delta, links []DAGLink, heads ...cid.Cid) *Block {

blockLinks = append(blockLinks, links...)

var crdtDelta crdt.CRDT
switch delta := delta.(type) {
case *crdt.LWWRegDelta:
crdtDelta = crdt.CRDT{LWWRegDelta: delta}
case *crdt.CompositeDAGDelta:
crdtDelta = crdt.CRDT{CompositeDAGDelta: delta}
case *crdt.CounterDelta:
crdtDelta = crdt.CRDT{CounterDelta: delta}
}

return &Block{
Links: blockLinks,
Delta: crdtDelta,
Delta: crdt.NewCRDT(delta),
}
}

Expand Down
13 changes: 12 additions & 1 deletion internal/core/crdt/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,38 @@ import (
// tasks that all the CRDTs need to implement anyway
type baseCRDT struct {
store datastore.DSReaderWriter
key core.DataStoreKey
// encryptionKeyStore
key core.DataStoreKey

// schemaVersionKey is the schema version datastore key at the time of commit.
//
// It can be used to identify the collection datastructure state at the time of commit.
schemaVersionKey core.CollectionSchemaVersionKey

fieldName string

// isEncrypted is a flag to indicate if the CRDT is encrypted
//isEncrypted bool
}

// IsEncrypted returns true if the CRDT is encrypted
/*func (base baseCRDT) IsEncrypted() bool {
return base.isEncrypted
}*/

func newBaseCRDT(
store datastore.DSReaderWriter,
key core.DataStoreKey,
schemaVersionKey core.CollectionSchemaVersionKey,
fieldName string,
isEncrypted bool,
) baseCRDT {
return baseCRDT{
store: store,
key: key,
schemaVersionKey: schemaVersionKey,
fieldName: fieldName,
//isEncrypted: isEncrypted,
}
}

Expand Down
4 changes: 2 additions & 2 deletions internal/core/crdt/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ func newSeededDS() datastore.DSReaderWriter {
}

func exampleBaseCRDT() baseCRDT {
return newBaseCRDT(newSeededDS(), core.DataStoreKey{}, core.CollectionSchemaVersionKey{}, "")
return newBaseCRDT(newSeededDS(), core.DataStoreKey{}, core.CollectionSchemaVersionKey{}, "", false)
}

func TestBaseCRDTNew(t *testing.T) {
base := newBaseCRDT(newDS(), core.DataStoreKey{}, core.CollectionSchemaVersionKey{}, "")
base := newBaseCRDT(newDS(), core.DataStoreKey{}, core.CollectionSchemaVersionKey{}, "", false)
if base.store == nil {
t.Error("newBaseCRDT needs to init store")
}
Expand Down
31 changes: 30 additions & 1 deletion internal/core/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,35 @@ func (delta *CompositeDAGDelta) IPLDSchemaBytes() []byte {
}`)
}

func (delta *CompositeDAGDelta) GetData() []byte {
return nil
}

func (delta *CompositeDAGDelta) SetData([]byte) {
}

func (delta *CompositeDAGDelta) GetDocID() []byte {
return delta.DocID
}

func (delta *CompositeDAGDelta) GetFieldName() string {
return delta.FieldName
}

func (delta *CompositeDAGDelta) GetSchemaVersionID() string {
return delta.SchemaVersionID
}

func (delta *CompositeDAGDelta) Clone() core.Delta {
return &CompositeDAGDelta{
DocID: delta.DocID,
FieldName: delta.FieldName,
Priority: delta.Priority,
SchemaVersionID: delta.SchemaVersionID,
Status: delta.Status,
}
}

// GetPriority gets the current priority for this delta.
func (delta *CompositeDAGDelta) GetPriority() uint64 {
return delta.Priority
Expand All @@ -77,7 +106,7 @@ func NewCompositeDAG(
key core.DataStoreKey,
fieldName string,
) CompositeDAG {
return CompositeDAG{newBaseCRDT(store, key, schemaVersionKey, fieldName)}
return CompositeDAG{newBaseCRDT(store, key, schemaVersionKey, fieldName, false)}
}

// Value is a no-op for a CompositeDAG.
Expand Down
32 changes: 31 additions & 1 deletion internal/core/crdt/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,36 @@ func (delta *CounterDelta) IPLDSchemaBytes() []byte {
}`)
}

func (delta *CounterDelta) GetData() []byte {
return nil
}

func (delta *CounterDelta) SetData([]byte) {
}

func (delta *CounterDelta) GetDocID() []byte {
return delta.DocID
}

func (delta *CounterDelta) GetFieldName() string {
return delta.FieldName
}

func (delta *CounterDelta) GetSchemaVersionID() string {
return delta.SchemaVersionID
}

func (delta *CounterDelta) Clone() core.Delta {
return &CounterDelta{
DocID: delta.DocID,
FieldName: delta.FieldName,
Priority: delta.Priority,
Nonce: delta.Nonce,
SchemaVersionID: delta.SchemaVersionID,
Data: delta.Data,
}
}

// GetPriority gets the current priority for this delta.
func (delta *CounterDelta) GetPriority() uint64 {
return delta.Priority
Expand Down Expand Up @@ -93,7 +123,7 @@ func NewCounter(
allowDecrement bool,
kind client.ScalarKind,
) Counter {
return Counter{newBaseCRDT(store, key, schemaVersionKey, fieldName), allowDecrement, kind}
return Counter{newBaseCRDT(store, key, schemaVersionKey, fieldName, false), allowDecrement, kind}
}

// Value gets the current counter value
Expand Down
13 changes: 13 additions & 0 deletions internal/core/crdt/ipld_union.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ type CRDT struct {
CounterDelta *CounterDelta
}

// NewCRDT returns a new CRDT.
func NewCRDT(delta core.Delta) CRDT {
switch d := delta.(type) {
case *LWWRegDelta:
return CRDT{LWWRegDelta: d}
case *CompositeDAGDelta:
return CRDT{CompositeDAGDelta: d}
case *CounterDelta:
return CRDT{CounterDelta: d}
}
return CRDT{}
}

// IPLDSchemaBytes returns the IPLD schema representation for the CRDT.
//
// This needs to match the [CRDT] struct or [mustSetSchema] will panic on init.
Expand Down
62 changes: 50 additions & 12 deletions internal/core/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/internal/core"
"github.com/sourcenetwork/defradb/internal/db/base"
"github.com/sourcenetwork/defradb/internal/encryption"
)

// LWWRegDelta is a single delta operation for an LWWRegister
Expand All @@ -34,6 +33,8 @@ type LWWRegDelta struct {
// It can be used to identify the collection datastructure state at the time of commit.
SchemaVersionID string
Data []byte
// TODO: move this one layer above
//IsEncrypted bool
}

var _ core.Delta = (*LWWRegDelta)(nil)
Expand All @@ -44,14 +45,44 @@ var _ core.Delta = (*LWWRegDelta)(nil)
func (delta LWWRegDelta) IPLDSchemaBytes() []byte {
return []byte(`
type LWWRegDelta struct {
docID Bytes
fieldName String
priority Int
schemaVersionID String
data Bytes
docID Bytes
fieldName String
priority Int
schemaVersionID String
data Bytes
}`)
}

func (delta *LWWRegDelta) GetData() []byte {
return delta.Data
}

func (delta *LWWRegDelta) SetData(data []byte) {
delta.Data = data
}

func (delta *LWWRegDelta) GetDocID() []byte {
return delta.DocID
}

func (delta *LWWRegDelta) GetFieldName() string {
return delta.FieldName
}

func (delta *LWWRegDelta) GetSchemaVersionID() string {
return delta.SchemaVersionID
}

func (delta *LWWRegDelta) Clone() core.Delta {
return &LWWRegDelta{
DocID: delta.DocID,
FieldName: delta.FieldName,
Priority: delta.Priority,
SchemaVersionID: delta.SchemaVersionID,
Data: delta.Data,
}
}

// GetPriority gets the current priority for this delta.
func (delta *LWWRegDelta) GetPriority() uint64 {
return delta.Priority
Expand All @@ -76,8 +107,9 @@ func NewLWWRegister(
schemaVersionKey core.CollectionSchemaVersionKey,
key core.DataStoreKey,
fieldName string,
isEncrypted bool,
) LWWRegister {
return LWWRegister{newBaseCRDT(store, key, schemaVersionKey, fieldName)}
return LWWRegister{newBaseCRDT(store, key, schemaVersionKey, fieldName, isEncrypted)}
}

// Value gets the current register value
Expand All @@ -99,6 +131,7 @@ func (reg LWWRegister) Set(value []byte) *LWWRegDelta {
DocID: []byte(reg.key.DocID),
FieldName: reg.fieldName,
SchemaVersionID: reg.schemaVersionKey.SchemaVersionID,
//IsEncrypted: reg.isEncrypted,
}
}

Expand All @@ -114,11 +147,16 @@ func (reg LWWRegister) Merge(ctx context.Context, delta core.Delta) error {

data := d.Data

var err error
data, err = encryption.DecryptDoc(ctx, string(d.DocID), 0, data)
if err != nil {
return err
}
/*if d.IsEncrypted {
plainText, err := encryption.DecryptDoc(ctx, string(d.DocID), 0, data)
if err != nil {
return err
}
if plainText == nil {
return nil
}
data = plainText
}*/

return reg.setValue(ctx, data, d.GetPriority())
}
Expand Down
2 changes: 1 addition & 1 deletion internal/core/crdt/lwwreg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func newMockStore() datastore.DSReaderWriter {
func setupLWWRegister() LWWRegister {
store := newMockStore()
key := core.DataStoreKey{DocID: "AAAA-BBBB"}
return NewLWWRegister(store, core.CollectionSchemaVersionKey{}, key, "")
return NewLWWRegister(store, core.CollectionSchemaVersionKey{}, key, "", false)
}

func setupLoadedLWWRegster(ctx context.Context) LWWRegister {
Expand Down
6 changes: 6 additions & 0 deletions internal/core/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,10 @@ package core
type Delta interface {
GetPriority() uint64
SetPriority(uint64)
GetData() []byte
SetData([]byte)
GetDocID() []byte
GetFieldName() string
GetSchemaVersionID() string
Clone() Delta
}
1 change: 1 addition & 0 deletions internal/db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ func (c *collection) save(
fieldDescription.Kind,
fieldKey,
fieldDescription.Name,
false,
)
if err != nil {
return cid.Undef, err
Expand Down
3 changes: 0 additions & 3 deletions internal/db/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,6 @@ func SetContextTxn(ctx context.Context, txn datastore.Txn) context.Context {
return context.WithValue(ctx, txnContextKey{}, txn)
}

// TryGetContextTxn returns an identity and a bool indicating if the
// identity was retrieved from the given context.

// GetContextIdentity returns the identity from the given context.
//
// If an identity does not exist `NoIdentity` is returned.
Expand Down
1 change: 1 addition & 0 deletions internal/db/fetcher/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ func (vf *VersionedFetcher) processBlock(
kind,
dsKey,
fieldName,
false,
)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 49fa643

Please sign in to comment.