diff --git a/core/parallel_state_processor.go b/core/parallel_state_processor.go index e65af95c2a..fa1069ef22 100644 --- a/core/parallel_state_processor.go +++ b/core/parallel_state_processor.go @@ -613,30 +613,34 @@ func (p *ParallelStateProcessor) runSlotLoop(slotIndex int, slotType int32) { nextIdx := p.mergedTxIndex.Load() + 1 if nextIdx < int32(len(p.allTxReqs)) { nextMergeReq := p.allTxReqs[nextIdx] - if atomic.CompareAndSwapInt32(&nextMergeReq.runnable, 1, 0) { - // execute. - timeBeforeExec := time.Now() - res := p.executeInSlot(slotIndex, nextMergeReq) - totalExecuteTxDur += time.Since(timeBeforeExec) - if res != nil { - res.resultSendTime = time.Now() - p.txResultChan <- res + if nextMergeReq.runnable == 1 { + if atomic.CompareAndSwapInt32(&nextMergeReq.runnable, 1, 0) { + // execute. + timeBeforeExec := time.Now() + res := p.executeInSlot(slotIndex, nextMergeReq) + totalExecuteTxDur += time.Since(timeBeforeExec) + if res != nil { + res.resultSendTime = time.Now() + p.txResultChan <- res + } } } } - // try the next req in loop sequence. - if !atomic.CompareAndSwapInt32(&txReq.runnable, 1, 0) { - continue - } - timeBeforeExec := time.Now() - res := p.executeInSlot(slotIndex, txReq) - totalExecuteTxDur += time.Since(timeBeforeExec) - if res == nil { - continue + if txReq.runnable == 1 { + // try the next req in loop sequence. + if !atomic.CompareAndSwapInt32(&txReq.runnable, 1, 0) { + continue + } + timeBeforeExec := time.Now() + res := p.executeInSlot(slotIndex, txReq) + totalExecuteTxDur += time.Since(timeBeforeExec) + if res == nil { + continue + } + res.resultSendTime = time.Now() + p.txResultChan <- res } - res.resultSendTime = time.Now() - p.txResultChan <- res } totalTryRunTxDur += time.Since(innerLoopBeforeTime) // switched to the other slot. @@ -666,29 +670,33 @@ func (p *ParallelStateProcessor) runSlotLoop(slotIndex int, slotType int32) { nextIdx := p.mergedTxIndex.Load() + 1 if nextIdx < int32(len(p.allTxReqs)) { nextMergeReq := p.allTxReqs[nextIdx] - if atomic.CompareAndSwapInt32(&nextMergeReq.runnable, 1, 0) { - // execute. - timeBeforeExec := time.Now() - res := p.executeInSlot(slotIndex, nextMergeReq) - totalExecuteTxDur += time.Since(timeBeforeExec) - if res != nil { - res.resultSendTime = time.Now() - p.txResultChan <- res + if nextMergeReq.runnable == 1 { + if atomic.CompareAndSwapInt32(&nextMergeReq.runnable, 1, 0) { + // execute. + timeBeforeExec := time.Now() + res := p.executeInSlot(slotIndex, nextMergeReq) + totalExecuteTxDur += time.Since(timeBeforeExec) + if res != nil { + res.resultSendTime = time.Now() + p.txResultChan <- res + } } } } - if !atomic.CompareAndSwapInt32(&stealTxReq.runnable, 1, 0) { - continue - } - timeBeforeExec := time.Now() - res := p.executeInSlot(slotIndex, stealTxReq) - totalExecuteTxDur += time.Since(timeBeforeExec) - if res == nil { - continue + if stealTxReq.runnable == 1 { + if !atomic.CompareAndSwapInt32(&stealTxReq.runnable, 1, 0) { + continue + } + timeBeforeExec := time.Now() + res := p.executeInSlot(slotIndex, stealTxReq) + totalExecuteTxDur += time.Since(timeBeforeExec) + if res == nil { + continue + } + res.resultSendTime = time.Now() + p.txResultChan <- res } - res.resultSendTime = time.Now() - p.txResultChan <- res } totalTryRunTxDur += time.Since(stealLoopBeforeTime) } @@ -750,17 +758,19 @@ func (p *ParallelStateProcessor) runQuickMergeSlotLoop(slotIndex int, slotType i if txReq.conflictIndex.Load() > p.mergedTxIndex.Load() { break } - if !atomic.CompareAndSwapInt32(&txReq.runnable, 1, 0) { - continue - } - log.Info("QuickMergeSlot Run", "slotIdx", slotIndex, "tx", txReq.txIndex) - timeBeforeExec := time.Now() - res := p.executeInSlot(slotIndex, txReq) - totalExecuteTxDur += time.Since(timeBeforeExec) - if res != nil { - executed-- - res.resultSendTime = time.Now() - p.txResultChan <- res + if txReq.runnable == 1 { + if !atomic.CompareAndSwapInt32(&txReq.runnable, 1, 0) { + continue + } + log.Info("QuickMergeSlot Run", "slotIdx", slotIndex, "tx", txReq.txIndex) + timeBeforeExec := time.Now() + res := p.executeInSlot(slotIndex, txReq) + totalExecuteTxDur += time.Since(timeBeforeExec) + if res != nil { + executed-- + res.resultSendTime = time.Now() + p.txResultChan <- res + } } } totalTryRunTxDur += time.Since(innerLoopBeforeTime)