Skip to content

Commit

Permalink
oracle/watermark
Browse files Browse the repository at this point in the history
  • Loading branch information
arjunsk committed Jun 9, 2024
1 parent 74e5c88 commit 0a3e843
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 28 deletions.
2 changes: 1 addition & 1 deletion pkg/db/a_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

type Db struct {
stopped atomic.Bool
scheduler *txn.Scheduler
scheduler *txn.Oracle
executor *txn.Executor
mvStore *txn.MvStore
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/txn/b_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package txn
type Txn struct {
rw bool
beginTs uint64
scheduler *Scheduler
scheduler *Oracle
executor *Executor
discarded bool

Expand All @@ -13,7 +13,7 @@ type Txn struct {
snapshot *Snapshot
}

func NewTxn(rw bool, beginTs uint64, snap *Snapshot, scheduler *Scheduler, executor *Executor) *Txn {
func NewTxn(rw bool, beginTs uint64, snap *Snapshot, scheduler *Oracle, executor *Executor) *Txn {
return &Txn{
rw: rw,
beginTs: beginTs,
Expand Down
32 changes: 18 additions & 14 deletions pkg/txn/c_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,22 @@ import (
"sync"
)

type Scheduler struct {
type Oracle struct {
sync.Mutex
nextTs uint64

// `readTsMarker` marks the visibility(ts) of read operations in a `newTransaction` to other transactions.
// Here we don't need TsWaiter as such, as we are not using the `WaitFor` API. However,
// Here we don't need WaterMark as such, as we are not using the `WaitFor` API. However,
// we are using the `DoneTill` API to get the last completed readTs to remove the old readyToCommitTxns.
readTsMarker *TsWaiter
readTsMarker *WaterMark
// `commitVisibilityWaiter` blocks `newTransaction` to ensure previous commits are visible to new reads.
commitVisibilityWaiter *TsWaiter
commitVisibilityWaiter *WaterMark

readyToCommitTxns []ReadyToCommitTxn
}

func NewScheduler() *Scheduler {
scheduler := &Scheduler{
func NewScheduler() *Oracle {
scheduler := &Oracle{
nextTs: 1,
readTsMarker: NewTsWaiter(),
commitVisibilityWaiter: NewTsWaiter(),
Expand All @@ -31,26 +31,30 @@ func NewScheduler() *Scheduler {
return scheduler
}

func (o *Scheduler) Stop() {
func (o *Oracle) Stop() {
o.readTsMarker.Stop()
o.commitVisibilityWaiter.Stop()
}

func (o *Scheduler) NewReadTs() uint64 {
func (o *Oracle) NewReadTs() uint64 {
o.Lock()
defer o.Unlock()

beginTimestamp := o.nextTs - 1
o.readTsMarker.Begin(beginTimestamp)

// Wait for all txns which have no conflicts, have been assigned a commit
// timestamp and are going through the write to value log and LSM tree
// process. Not waiting here could mean that some txns which have been
// committed would not be read.
err := o.commitVisibilityWaiter.WaitFor(context.Background(), beginTimestamp)
if err != nil {
panic(err)
}
return beginTimestamp
}

func (o *Scheduler) NewCommitTs(transaction *Txn) (uint64, error) {
func (o *Oracle) NewCommitTs(transaction *Txn) (uint64, error) {
o.Lock()
defer o.Unlock()

Expand All @@ -76,15 +80,15 @@ func (o *Scheduler) NewCommitTs(transaction *Txn) (uint64, error) {
return commitTs, nil
}

func (o *Scheduler) DoneRead(transaction *Txn) {
func (o *Oracle) DoneRead(transaction *Txn) {
o.readTsMarker.Done(transaction.snapshot.ts)
}

func (o *Scheduler) DoneCommit(commitTs uint64) {
func (o *Oracle) DoneCommit(commitTs uint64) {
o.commitVisibilityWaiter.Done(commitTs)
}

func (o *Scheduler) hasConflictFor(txn *Txn) bool {
func (o *Oracle) hasConflictFor(txn *Txn) bool {
currTxnBeginTs := txn.snapshot.ts

for _, readyToCommitTxn := range o.readyToCommitTxns {
Expand All @@ -101,7 +105,7 @@ func (o *Scheduler) hasConflictFor(txn *Txn) bool {
return false
}

func (o *Scheduler) gcOldReadyToCommitTxns() {
func (o *Oracle) gcOldReadyToCommitTxns() {
updatedReadyToCommitTxns := o.readyToCommitTxns[:0]
lastActiveReadTs := o.readTsMarker.DoneTill()

Expand All @@ -119,7 +123,7 @@ type ReadyToCommitTxn struct {
txn *Txn
}

func (o *Scheduler) addReadyToCommitTxn(txn *Txn, commitTs uint64) {
func (o *Oracle) addReadyToCommitTxn(txn *Txn, commitTs uint64) {
o.readyToCommitTxns = append(o.readyToCommitTxns, ReadyToCommitTxn{
commitTs: commitTs,
txn: txn,
Expand Down
22 changes: 11 additions & 11 deletions pkg/txn/d_ts_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ type Event struct {
waitCh chan struct{}
}

type TsWaiter struct {
type WaterMark struct {
eventCh chan Event
stopCh chan struct{}
txnTracker *TransactionTracker
}

func NewTsWaiter() *TsWaiter {
waiter := &TsWaiter{
func NewTsWaiter() *WaterMark {
waiter := &WaterMark{
eventCh: make(chan Event),
stopCh: make(chan struct{}),
txnTracker: NewTransactionTracker(),
Expand All @@ -35,15 +35,15 @@ func NewTsWaiter() *TsWaiter {
return waiter
}

func (w *TsWaiter) Begin(timestamp uint64) {
func (w *WaterMark) Begin(timestamp uint64) {
w.eventCh <- Event{typ: BeginEvent, ts: timestamp}
}

func (w *TsWaiter) Done(ts uint64) {
func (w *WaterMark) Done(ts uint64) {
w.eventCh <- Event{typ: DoneEvent, ts: ts}
}

func (w *TsWaiter) WaitFor(ctx context.Context, ts uint64) error {
func (w *WaterMark) WaitFor(ctx context.Context, ts uint64) error {
if w.DoneTill() >= ts {
return nil
}
Expand All @@ -59,15 +59,15 @@ func (w *TsWaiter) WaitFor(ctx context.Context, ts uint64) error {
}
}

func (w *TsWaiter) Stop() {
func (w *WaterMark) Stop() {
w.stopCh <- struct{}{}
}

func (w *TsWaiter) DoneTill() uint64 {
func (w *WaterMark) DoneTill() uint64 {
return w.txnTracker.GlobalDoneTill()
}

func (w *TsWaiter) Run() {
func (w *WaterMark) Run() {
for {
select {
case event := <-w.eventCh:
Expand All @@ -92,7 +92,7 @@ func (w *TsWaiter) Run() {
}
}

func (w *TsWaiter) processWaitEvent(event Event) {
func (w *WaterMark) processWaitEvent(event Event) {
doneTill := w.DoneTill()
if doneTill >= event.ts {
close(event.waitCh)
Expand All @@ -101,7 +101,7 @@ func (w *TsWaiter) processWaitEvent(event Event) {
}
}

func (w *TsWaiter) processClose() {
func (w *WaterMark) processClose() {
close(w.eventCh)
close(w.stopCh)

Expand Down

0 comments on commit 0a3e843

Please sign in to comment.