Skip to content

Commit

Permalink
Merge branch 'main' into indexfile-tsreader
Browse files Browse the repository at this point in the history
  • Loading branch information
friendlymatthew authored Feb 13, 2024
2 parents 1610349 + 876410c commit cf1965b
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 62 deletions.
11 changes: 5 additions & 6 deletions pkg/appendable/index_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (i *IndexFile) IsEmpty() (bool, error) {
}

func (i *IndexFile) IndexFieldNames() ([]string, error) {
var fieldNames []string
uniqueFieldNames := make(map[string]bool)

mp := i.tree
Expand All @@ -118,15 +119,13 @@ func (i *IndexFile) IndexFieldNames() ([]string, error) {
return nil, fmt.Errorf("failed to unmarshal metadata: %w", err)
}

uniqueFieldNames[metadata.FieldName] = true
if _, ok := uniqueFieldNames[metadata.FieldName]; !ok {
uniqueFieldNames[metadata.FieldName] = true
fieldNames = append(fieldNames, metadata.FieldName)
}
mp = next
}

var fieldNames []string
for fieldName := range uniqueFieldNames {
fieldNames = append(fieldNames, fieldName)
}

return fieldNames, nil
}

Expand Down
44 changes: 20 additions & 24 deletions pkg/btree/bptree.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"io"
"slices"
)

// MetaPage is an abstract interface over the root page of a btree
Expand Down Expand Up @@ -51,12 +52,7 @@ func (t *BPTree) Find(key []byte) (MemoryPointer, bool, error) {
if err != nil {
return MemoryPointer{}, false, err
}
n := path[0].node
i, found := n.bsearch(key)
if found {
return n.Pointer(i), true, nil
}
return MemoryPointer{}, false, nil
return path[0].node.Pointer(path[0].index), path[0].found, nil
}

func (t *BPTree) readNode(ptr MemoryPointer) (*BPTreeNode, error) {
Expand All @@ -73,38 +69,36 @@ func (t *BPTree) readNode(ptr MemoryPointer) (*BPTreeNode, error) {
type TraversalRecord struct {
node *BPTreeNode
index int
found bool
// the offset is useful so we know which page to free when we split
ptr MemoryPointer
}

// traverse returns the path from root to leaf in reverse order (leaf first)
// the last element is always the node passed in
func (t *BPTree) traverse(key []byte, node *BPTreeNode, ptr MemoryPointer) ([]TraversalRecord, error) {
if node.leaf() {
return []TraversalRecord{{node: node, ptr: ptr}}, nil
}
for i, k := range node.Keys {
if bytes.Compare(key, k.Value) < 0 {
child, err := t.readNode(node.Pointer(i))
if err != nil {
return nil, err
}
path, err := t.traverse(key, child, node.Pointer(i))
if err != nil {
return nil, err
}
return append(path, TraversalRecord{node: node, index: i, ptr: ptr}), nil
// binary search node.Keys to find the first key greater than key (or gte if leaf)
index, found := slices.BinarySearchFunc(node.Keys, ReferencedValue{Value: key}, func(e ReferencedValue, t ReferencedValue) int {
if cmp := bytes.Compare(e.Value, t.Value); cmp == 0 && !node.leaf() {
return -1
} else {
return cmp
}
})

if node.leaf() {
return []TraversalRecord{{node: node, index: index, found: found, ptr: ptr}}, nil
}
child, err := t.readNode(node.Pointer(-1))

child, err := t.readNode(node.Pointer(index))
if err != nil {
return nil, err
}
path, err := t.traverse(key, child, node.Pointer(-1))
path, err := t.traverse(key, child, node.Pointer(index))
if err != nil {
return nil, err
}
return append(path, TraversalRecord{node: node, index: len(node.Keys), ptr: ptr}), nil
return append(path, TraversalRecord{node: node, index: index, found: found, ptr: ptr}), nil
}

func (t *BPTree) Insert(key ReferencedValue, value MemoryPointer) error {
Expand Down Expand Up @@ -135,7 +129,9 @@ func (t *BPTree) Insert(key ReferencedValue, value MemoryPointer) error {

// insert the key into the leaf
n := path[0].node
j, _ := n.bsearch(key.Value)
j, _ := slices.BinarySearchFunc(n.Keys, key, func(e ReferencedValue, t ReferencedValue) int {
return bytes.Compare(e.Value, t.Value)
})
if j == len(n.Keys) {
n.Keys = append(n.Keys, key)
n.leafPointers = append(n.leafPointers, value)
Expand Down
17 changes: 0 additions & 17 deletions pkg/btree/node.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package btree

import (
"bytes"
"encoding/binary"
"fmt"
"io"
Expand Down Expand Up @@ -174,19 +173,3 @@ func (n *BPTreeNode) ReadFrom(r io.Reader) (int64, error) {
}
return pageSizeBytes, nil
}

func (n *BPTreeNode) bsearch(key []byte) (int, bool) {
i, j := 0, len(n.Keys)-1
for i <= j {
m := (i + j) / 2
cmp := bytes.Compare(key, n.Keys[m].Value)
if cmp == 0 {
return m, true
} else if cmp < 0 {
j = m - 1
} else {
i = m + 1
}
}
return i, false
}
19 changes: 4 additions & 15 deletions pkg/handlers/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,30 +35,20 @@ func (c CSVHandler) Synchronize(f *appendable.IndexFile, df []byte) error {
if err != nil {
return fmt.Errorf("failed to read metadata: %w", err)
}
isHeader := false

isEmpty, err := f.IsEmpty()
fieldNames, err := f.IndexFieldNames()
if err != nil {
return fmt.Errorf("failed to check if index file is empty: %w", err)
}

if isEmpty {
isHeader = true
} else {
fieldNames, err := f.IndexFieldNames()
if err != nil {
return fmt.Errorf("failed to retrieve index field names: %w", err)
}
headers = fieldNames
return fmt.Errorf("failed to retrieve index field names: %w", err)
}
headers = fieldNames

for {
i := bytes.IndexByte(df[metadata.ReadOffset:], '\n')
if i == -1 {
break
}

if isHeader {
if len(headers) == 0 {
slog.Info("Parsing CSV headers")
dec := csv.NewReader(bytes.NewReader(df[metadata.ReadOffset : metadata.ReadOffset+uint64(i)]))
headers, err = dec.Read()
Expand All @@ -67,7 +57,6 @@ func (c CSVHandler) Synchronize(f *appendable.IndexFile, df []byte) error {
return fmt.Errorf("failed to parse CSV header: %w", err)
}
metadata.ReadOffset += uint64(i) + 1
isHeader = false
continue
}

Expand Down

0 comments on commit cf1965b

Please sign in to comment.