Skip to content

Commit

Permalink
Implement rollback within Update (#43)
Browse files Browse the repository at this point in the history
* implement rollback within Update; #42

* add TestDeleteAndUpdate

* do not introduce failures during cursor reads

* add transactional squares test

* add OK method

* fix races

* don't make cursors read dirty records
  • Loading branch information
Preetam authored Dec 31, 2017
1 parent 6905356 commit 9ff6c4c
Show file tree
Hide file tree
Showing 6 changed files with 455 additions and 81 deletions.
8 changes: 8 additions & 0 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,11 @@ func (rc *recordCache) purge() {
purged++
}
}

func (rc *recordCache) flushOffsets(offsets []int64) {
rc.lock.Lock()
for _, offset := range offsets {
delete(rc.cache, offset)
}
rc.lock.Unlock()
}
20 changes: 10 additions & 10 deletions cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (c *Collection) NewCursor() (*Cursor, error) {
}, nil
}

head, err := c.readRecord(c.Next[0])
head, err := c.readRecord(c.Next[0], false)
if err != nil {
return nil, err
}
Expand All @@ -44,7 +44,7 @@ func (c *Collection) NewCursor() (*Cursor, error) {
cur.current.lock.RLock()
for (cur.current.Deleted != 0 && cur.current.Deleted <= cur.snapshot) ||
(cur.current.Offset >= cur.snapshot) {
rec, err = cur.collection.readRecord(atomic.LoadInt64(&cur.current.Next[0]))
rec, err = cur.collection.readRecord(atomic.LoadInt64(&cur.current.Next[0]), false)
if err != nil {
cur.current.lock.RUnlock()
cur.current = nil
Expand Down Expand Up @@ -85,7 +85,7 @@ func (c *Cursor) Next() bool {
}

c.current.lock.RLock()
rec, err := c.collection.readRecord(atomic.LoadInt64(&c.current.Next[0]))
rec, err := c.collection.readRecord(atomic.LoadInt64(&c.current.Next[0]), false)
if err != nil {
c.current.lock.RUnlock()
if atomic.LoadInt64(&c.current.Next[0]) != 0 {
Expand All @@ -98,9 +98,9 @@ func (c *Cursor) Next() bool {
c.current = rec

c.current.lock.RLock()
for (c.current.Deleted != 0 && c.current.Deleted <= c.snapshot) ||
for (atomic.LoadInt64(&c.current.Deleted) != 0 && atomic.LoadInt64(&c.current.Deleted) <= c.snapshot) ||
(c.current.Offset >= c.snapshot) {
rec, err = c.collection.readRecord(atomic.LoadInt64(&c.current.Next[0]))
rec, err = c.collection.readRecord(atomic.LoadInt64(&c.current.Next[0]), false)
if err != nil {
c.current.lock.RUnlock()
if atomic.LoadInt64(&c.current.Next[0]) != 0 {
Expand Down Expand Up @@ -146,7 +146,7 @@ func (c *Cursor) Seek(key string) {
var err error
offset := int64(0)
for level := maxLevels - 1; level >= 0; level-- {
offset, err = c.collection.findLastLessThanOrEqual(key, offset, level, false)
offset, err = c.collection.findLastLessThanOrEqual(key, offset, level, false, false)
if err != nil {
c.err = err
return
Expand All @@ -162,7 +162,7 @@ func (c *Cursor) Seek(key string) {
return
}
}
rec, err := c.collection.readRecord(offset)
rec, err := c.collection.readRecord(offset, false)
if err != nil {
c.err = err
return
Expand All @@ -175,7 +175,7 @@ func (c *Cursor) Seek(key string) {
if rec.Key >= key {
if (rec.Deleted > 0 && rec.Deleted <= c.snapshot) || (rec.Offset >= c.snapshot) {
oldRec := rec
rec, err = c.collection.nextRecord(rec, 0)
rec, err = c.collection.nextRecord(rec, 0, false)
if err != nil {
if atomic.LoadInt64(&c.current.Next[0]) != 0 {
c.err = err
Expand All @@ -192,7 +192,7 @@ func (c *Cursor) Seek(key string) {
}
if (rec.Deleted > 0 && rec.Deleted <= c.snapshot) || (rec.Offset >= c.snapshot) {
oldRec := rec
rec, err = c.collection.nextRecord(rec, 0)
rec, err = c.collection.nextRecord(rec, 0, false)
if err != nil {
if atomic.LoadInt64(&c.current.Next[0]) != 0 {
c.err = err
Expand All @@ -207,7 +207,7 @@ func (c *Cursor) Seek(key string) {
c.current = rec
}
oldRec := rec
rec, err = c.collection.nextRecord(rec, 0)
rec, err = c.collection.nextRecord(rec, 0, false)
if err != nil {
if atomic.LoadInt64(&c.current.Next[0]) != 0 {
c.err = err
Expand Down
28 changes: 19 additions & 9 deletions lm2.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ var (
// is invalid. The collection should be closed and reopened.
ErrInternal = errors.New("lm2: internal error")

ErrRolledBack = errors.New("lm2: rolled back")

fileVersion = [8]byte{'l', 'm', '2', '_', '0', '0', '1', '\n'}
)

Expand Down Expand Up @@ -120,13 +122,15 @@ func (c *Collection) setDirty(offset int64, rec *record) {
c.dirty[offset] = rec
}

func (c *Collection) readRecord(offset int64) (*record, error) {
func (c *Collection) readRecord(offset int64, dirty bool) (*record, error) {
if offset == 0 {
return nil, errors.New("lm2: invalid record offset 0")
}

if rec := c.getDirty(offset); rec != nil {
return rec, nil
if dirty {
if rec := c.getDirty(offset); rec != nil {
return rec, nil
}
}

c.cache.lock.RLock()
Expand All @@ -139,8 +143,8 @@ func (c *Collection) readRecord(offset int64) (*record, error) {
c.cache.lock.RUnlock()

recordHeaderBytes := [recordHeaderSize]byte{}
_, err := c.readAt(recordHeaderBytes[:], offset)
if err != nil {
n, err := c.readAt(recordHeaderBytes[:], offset)
if err != nil && n != recordHeaderSize {
return nil, fmt.Errorf("lm2: partial read (%s)", err)
}

Expand All @@ -151,8 +155,8 @@ func (c *Collection) readRecord(offset int64) (*record, error) {
}

keyValBuf := make([]byte, int(header.KeyLen)+int(header.ValLen))
_, err = c.readAt(keyValBuf, offset+recordHeaderSize)
if err != nil {
n, err = c.readAt(keyValBuf, offset+recordHeaderSize)
if err != nil && n != len(keyValBuf) {
return nil, fmt.Errorf("lm2: partial read (%s)", err)
}

Expand All @@ -171,15 +175,15 @@ func (c *Collection) readRecord(offset int64) (*record, error) {
return rec, nil
}

func (c *Collection) nextRecord(rec *record, level int) (*record, error) {
func (c *Collection) nextRecord(rec *record, level int, dirty bool) (*record, error) {
if rec == nil {
return nil, errors.New("lm2: invalid record")
}
if atomic.LoadInt64(&rec.Next[level]) == 0 {
// There's no next record.
return nil, nil
}
nextRec, err := c.readRecord(atomic.LoadInt64(&rec.Next[level]))
nextRec, err := c.readRecord(atomic.LoadInt64(&rec.Next[level]), dirty)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -421,3 +425,9 @@ func (c *Collection) CompactFunc(f func(key, value string) (string, string, bool
newCollection.Close()
return os.Rename(newCollection.f.Name(), c.f.Name())
}

// OK returns true if the internal state of the collection is valid.
// If false is returned you should close and reopen the collection.
func (c *Collection) OK() bool {
return atomic.LoadUint32(&c.internalState) == 0
}
Loading

0 comments on commit 9ff6c4c

Please sign in to comment.