Skip to content

Commit

Permalink
Get method bug correction regarding invalid tombstone check logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Gaetano Padula committed Oct 31, 2024
1 parent 41cb816 commit 611d78e
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 7 deletions.
53 changes: 53 additions & 0 deletions bloomfilter/bloomfilter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ package bloomfilter

import (
"fmt"
"os"
"testing"
"time"
)
Expand Down Expand Up @@ -219,3 +220,55 @@ func TestCheck2(t *testing.T) {

t.Logf("Time to check 10k keys in bloomfilter: %v", time.Since(tt))
}

func TestCheck3(t *testing.T) {
tt := time.Now()
bf := NewBloomFilter(1000000, 8)

for i := 0; i < 10000; i++ {
key := []byte("key" + fmt.Sprintf("%d", i))
bf.Add(key)
}

t.Logf("Time to add 10k keys to bloomfilter: %v", time.Since(tt))

serialized, err := bf.Serialize()
if err != nil {
t.Fatalf("Failed to serialize BloomFilter: %v", err)
}

// We write to file
f, err := os.OpenFile("bloomfilter.test", os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
t.Fatalf("Failed to open file: %v", err)
}

defer os.Remove("bloomfilter.test")

f.WriteAt(serialized, 0)

// We read from file
serialized = make([]byte, len(serialized))

_, err = f.ReadAt(serialized, 0)
if err != nil {
t.Fatalf("Failed to read from file: %v", err)
}

bf, err = Deserialize(serialized)
if err != nil {
t.Fatalf("Failed to deserialize BloomFilter: %v", err)
}

tt = time.Now()

// check all keys
for i := 0; i < 10000; i++ {
key := []byte("key" + fmt.Sprintf("%d", i))
if !bf.Check(key) {
t.Fatalf("Expected key %s to be present in BloomFilter, got not present", key)
}
}

t.Logf("Time to check 10k keys in bloomfilter: %v", time.Since(tt))
}
26 changes: 19 additions & 7 deletions k4.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type K4 struct {
type SSTable struct {
pager *pager.Pager // the pager for the sstable file
lock *sync.RWMutex // read write lock for the sstable
compressed bool // whether the sstable is compressed
compressed bool // whether the sstable is compressed; this gets set when the sstable is created, the configuration is passed from K4
}

// Transaction is the structure for the transactions
Expand Down Expand Up @@ -121,7 +121,7 @@ type WALIterator struct {
pager *pager.Pager // the pager for the wal file
currentPage int // the current page
lastPage int // the last page in the wal
compressed bool // whether the wal is compressed
compressed bool // whether the wal is compressed; this gets set when the sstable is created, the configuration is passed from K4
}

// KV mainly used for serialization
Expand Down Expand Up @@ -228,13 +228,19 @@ func Open(directory string, memtableFlushThreshold int, compactionInterval int,
k4.wg.Add(1)
go k4.backgroundWalWriter() // start the background wal writer

k4.printLog("Background WAL writer started")

// Start the background flusher
k4.wg.Add(1)
go k4.backgroundFlusher() // start the background flusher
k4.printLog("Background flusher started")

// Start the background compactor
k4.wg.Add(1)
go k4.backgroundCompactor() // start the background compactor
k4.printLog("Background compactor started")

k4.printLog("K4 opened successfully")

return k4, nil
}
Expand All @@ -251,9 +257,13 @@ func (k4 *K4) Close() error {

close(k4.exit)

k4.printLog("Waiting for background operations to finish")

// wait for the background operations to finish
k4.wg.Wait()

k4.printLog("Background operations finished")

k4.printLog("Closing SSTables")

// Close SSTables
Expand Down Expand Up @@ -299,7 +309,6 @@ func (k4 *K4) printLog(msg string) {
// This function runs in the background and pops operations from the wal queue and writes
// to write ahead log. The reason we do this is to optimize write speed
func (k4 *K4) backgroundWalWriter() {
k4.printLog("Background WAL writer started")

defer k4.wg.Done() // Defer completion of routine

Expand Down Expand Up @@ -442,6 +451,7 @@ func (k4 *K4) loadSSTables() {
sstablePager, err := pager.OpenPager(k4.directory+string(os.PathSeparator)+file.Name(), os.O_RDWR, 0644)
if err != nil {
// could possibly handle this better
k4.printLog(fmt.Sprintf("Failed to open sstable: %v", err))
continue
}

Expand Down Expand Up @@ -603,6 +613,7 @@ func (it *SSTableIterator) current() ([]byte, []byte) {
// Deserialize key-value pair
key, value, err := deserializeKv(data)
if err != nil {

return nil, nil
}

Expand Down Expand Up @@ -1080,7 +1091,8 @@ func (k4 *K4) Get(key []byte) ([]byte, error) {

value, found := k4.memtable.Search(key)
if found {
if bytes.Compare(value, []byte(TOMBSTONE_VALUE)) == 0 { // Check if the value is a tombstone
// Check if the value is a tombstone
if bytes.Compare(value, []byte(TOMBSTONE_VALUE)) == 0 {
return nil, nil
}

Expand Down Expand Up @@ -1108,9 +1120,10 @@ func (k4 *K4) Get(key []byte) ([]byte, error) {
}
if value != nil {
if bytes.Compare(value, []byte(TOMBSTONE_VALUE)) == 0 { // Check if the value is a tombstone

return value, nil
return nil, nil
}

return value, nil
}
}

Expand Down Expand Up @@ -1530,7 +1543,6 @@ func (kva KeyValueArray) binarySearch(key []byte) (*KV, bool) {
// we pop it and flush it to a new SSTable
func (k4 *K4) backgroundFlusher() {
defer k4.wg.Done()
k4.printLog("Background flusher started")

for {
select {
Expand Down
2 changes: 2 additions & 0 deletions k4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ func TestMemtableFlush(t *testing.T) {
}
}

time.Sleep(1 * time.Second)

k4.Close()

k4, err = Open(dir, 1024*1024, 2, false, false)
Expand Down

0 comments on commit 611d78e

Please sign in to comment.