Skip to content

Commit

Permalink
do CompareAndSwap only necessary
Browse files Browse the repository at this point in the history
  • Loading branch information
sunny2022da committed Aug 30, 2024
1 parent 9f18e13 commit 3b4fa03
Showing 1 changed file with 58 additions and 48 deletions.
106 changes: 58 additions & 48 deletions core/parallel_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,30 +613,34 @@ func (p *ParallelStateProcessor) runSlotLoop(slotIndex int, slotType int32) {
nextIdx := p.mergedTxIndex.Load() + 1
if nextIdx < int32(len(p.allTxReqs)) {
nextMergeReq := p.allTxReqs[nextIdx]
if atomic.CompareAndSwapInt32(&nextMergeReq.runnable, 1, 0) {
// execute.
timeBeforeExec := time.Now()
res := p.executeInSlot(slotIndex, nextMergeReq)
totalExecuteTxDur += time.Since(timeBeforeExec)
if res != nil {
res.resultSendTime = time.Now()
p.txResultChan <- res
if nextMergeReq.runnable == 1 {
if atomic.CompareAndSwapInt32(&nextMergeReq.runnable, 1, 0) {
// execute.
timeBeforeExec := time.Now()
res := p.executeInSlot(slotIndex, nextMergeReq)
totalExecuteTxDur += time.Since(timeBeforeExec)
if res != nil {
res.resultSendTime = time.Now()
p.txResultChan <- res
}
}
}
}

// try the next req in loop sequence.
if !atomic.CompareAndSwapInt32(&txReq.runnable, 1, 0) {
continue
}
timeBeforeExec := time.Now()
res := p.executeInSlot(slotIndex, txReq)
totalExecuteTxDur += time.Since(timeBeforeExec)
if res == nil {
continue
if txReq.runnable == 1 {
// try the next req in loop sequence.
if !atomic.CompareAndSwapInt32(&txReq.runnable, 1, 0) {
continue
}
timeBeforeExec := time.Now()
res := p.executeInSlot(slotIndex, txReq)
totalExecuteTxDur += time.Since(timeBeforeExec)
if res == nil {
continue
}
res.resultSendTime = time.Now()
p.txResultChan <- res
}
res.resultSendTime = time.Now()
p.txResultChan <- res
}
totalTryRunTxDur += time.Since(innerLoopBeforeTime)
// switched to the other slot.
Expand Down Expand Up @@ -666,29 +670,33 @@ func (p *ParallelStateProcessor) runSlotLoop(slotIndex int, slotType int32) {
nextIdx := p.mergedTxIndex.Load() + 1
if nextIdx < int32(len(p.allTxReqs)) {
nextMergeReq := p.allTxReqs[nextIdx]
if atomic.CompareAndSwapInt32(&nextMergeReq.runnable, 1, 0) {
// execute.
timeBeforeExec := time.Now()
res := p.executeInSlot(slotIndex, nextMergeReq)
totalExecuteTxDur += time.Since(timeBeforeExec)
if res != nil {
res.resultSendTime = time.Now()
p.txResultChan <- res
if nextMergeReq.runnable == 1 {
if atomic.CompareAndSwapInt32(&nextMergeReq.runnable, 1, 0) {
// execute.
timeBeforeExec := time.Now()
res := p.executeInSlot(slotIndex, nextMergeReq)
totalExecuteTxDur += time.Since(timeBeforeExec)
if res != nil {
res.resultSendTime = time.Now()
p.txResultChan <- res
}
}
}
}

if !atomic.CompareAndSwapInt32(&stealTxReq.runnable, 1, 0) {
continue
}
timeBeforeExec := time.Now()
res := p.executeInSlot(slotIndex, stealTxReq)
totalExecuteTxDur += time.Since(timeBeforeExec)
if res == nil {
continue
if stealTxReq.runnable == 1 {
if !atomic.CompareAndSwapInt32(&stealTxReq.runnable, 1, 0) {
continue
}
timeBeforeExec := time.Now()
res := p.executeInSlot(slotIndex, stealTxReq)
totalExecuteTxDur += time.Since(timeBeforeExec)
if res == nil {
continue
}
res.resultSendTime = time.Now()
p.txResultChan <- res
}
res.resultSendTime = time.Now()
p.txResultChan <- res
}
totalTryRunTxDur += time.Since(stealLoopBeforeTime)
}
Expand Down Expand Up @@ -750,17 +758,19 @@ func (p *ParallelStateProcessor) runQuickMergeSlotLoop(slotIndex int, slotType i
if txReq.conflictIndex.Load() > p.mergedTxIndex.Load() {
break
}
if !atomic.CompareAndSwapInt32(&txReq.runnable, 1, 0) {
continue
}
log.Info("QuickMergeSlot Run", "slotIdx", slotIndex, "tx", txReq.txIndex)
timeBeforeExec := time.Now()
res := p.executeInSlot(slotIndex, txReq)
totalExecuteTxDur += time.Since(timeBeforeExec)
if res != nil {
executed--
res.resultSendTime = time.Now()
p.txResultChan <- res
if txReq.runnable == 1 {
if !atomic.CompareAndSwapInt32(&txReq.runnable, 1, 0) {
continue
}
log.Info("QuickMergeSlot Run", "slotIdx", slotIndex, "tx", txReq.txIndex)
timeBeforeExec := time.Now()
res := p.executeInSlot(slotIndex, txReq)
totalExecuteTxDur += time.Since(timeBeforeExec)
if res != nil {
executed--
res.resultSendTime = time.Now()
p.txResultChan <- res
}
}
}
totalTryRunTxDur += time.Since(innerLoopBeforeTime)
Expand Down

0 comments on commit 3b4fa03

Please sign in to comment.