Skip to content

Commit

Permalink
log
Browse files Browse the repository at this point in the history
  • Loading branch information
sunny2022da committed Oct 18, 2024
1 parent 3a61364 commit a9fbae1
Showing 1 changed file with 17 additions and 15 deletions.
32 changes: 17 additions & 15 deletions core/parallel_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ func (p *ParallelStateProcessor) confirmTxResults(statedb *state.StateDB, merger
}
p.stateDBContextMutex.Unlock()
*CumulativeGasUsed += receipt.GasUsed
log.Debug("ProcessParallel tx result out of order", "resultTxIndex", resultTxIndex, "CumulativeGasUsed", *CumulativeGasUsed)
log.Debug("ProcessParallel tx result", "resultTxIndex", resultTxIndex, "CumulativeGasUsed", *CumulativeGasUsed)
result.receipt.CumulativeGasUsed = *CumulativeGasUsed
p.mergedTxIndex.Store(int32(resultTxIndex))
}
Expand Down Expand Up @@ -1146,6 +1146,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat

if ug, ok := cumulativeGasUsedPerMergeWorker.Load(unconfirmedResult.slotIndex); !ok {
cumulativeGasUsedPerMergeWorker.Store(unconfirmedResult.slotIndex, unconfirmedResult.result.UsedGas)
log.Debug("get complete merge worker signal", "merger", unconfirmedResult.slotIndex, "usedGas", unconfirmedResult.result.UsedGas)
mergedCount++
} else {
log.Debug("get merge worker complete signal more than once", "merger", unconfirmedResult.slotIndex, "usedGas", ug)
Expand Down Expand Up @@ -1279,7 +1280,7 @@ func (p *ParallelStateProcessor) handlePendingResultLoop(index int, EnableParall

// if all merged, notify the main routine. continue to wait for next block.
if p.mergedTxIndex.Load()+1 == int32(txCount) {
log.Debug("handlePendingResult merged all, send complete result")
log.Debug("handlePendingResult merged all, send complete result", "mergeWorkerIdx", index, "CumulativeGasUsed", CumulativeGasUsed)
p.txResultChan <- &ParallelTxResult{txReq: nil, slotIndex: index,
result: &ExecutionResult{UsedGas: CumulativeGasUsed}}
// clear the pending chan.
Expand All @@ -1292,12 +1293,12 @@ func (p *ParallelStateProcessor) handlePendingResultLoop(index int, EnableParall

// busy waiting.
for {
log.Debug("busy waiting for pending result", "mergedIndex", p.mergedTxIndex.Load(), "allTxCount", txCount)
log.Debug("busy waiting for pending result", "mergeWorkerIdx", index, "mergedIndex", p.mergedTxIndex.Load(), "allTxCount", txCount)
nextTxIndex := int(p.mergedTxIndex.Load()) + 1
if OutOfOrderMerge || parallelMergeEnabled {
// skip those already merged.
for {
log.Debug("OOOMerge check loop", "nextTxIndex", nextTxIndex, "txCount", txCount)
log.Debug("OOOMerge check loop", "mergeWorkerIdx", index, "nextTxIndex", nextTxIndex, "txCount", txCount)
if nextTxIndex == txCount {
// reach the last, update the mergedTxIndex if needed
if p.mergedTxIndex.Load() != int32(nextTxIndex-1) {
Expand All @@ -1318,7 +1319,7 @@ func (p *ParallelStateProcessor) handlePendingResultLoop(index int, EnableParall
if p.receipts[nextTxIndex-1] == nil {
// no receipts, there is an error at applyMessage.
log.Error("handlePendingResultLoop", "unexpected receipt", nil,
"mergeWorker", "index", "txIndex", nextTxIndex-1, "mergedIndex", p.mergedTxIndex.Load())
"mergeWorker", index, "txIndex", nextTxIndex-1, "mergedIndex", p.mergedTxIndex.Load())
break
}
if p.receipts[nextTxIndex-1].CumulativeGasUsed == 0 {
Expand All @@ -1332,7 +1333,7 @@ func (p *ParallelStateProcessor) handlePendingResultLoop(index int, EnableParall
}
p.stateDBContextMutex.Unlock()
CumulativeGasUsed += receipt.GasUsed
log.Debug("update CumulativeGasUse txCount", "TxIndex", nextTxIndex-1, "CumulativeGasUsed", CumulativeGasUsed)
log.Debug("update CumulativeGasUse txCount", "mergeWorkerIdx", index, "TxIndex", nextTxIndex-1, "CumulativeGasUsed", CumulativeGasUsed)

p.receipts[nextTxIndex-1].CumulativeGasUsed = CumulativeGasUsed
}
Expand Down Expand Up @@ -1397,10 +1398,10 @@ func (p *ParallelStateProcessor) handlePendingResultLoop(index int, EnableParall
}
}

log.Debug("handlePendingResult break from OOO Loop to do result merge", "nextTxIndex", nextTxIndex, "txCount", txCount)
log.Debug("handlePendingResult break from OOO Loop to do result merge", "mergeWorkerIdx", index, "nextTxIndex", nextTxIndex, "txCount", txCount)
// all merged, clear the resultAppendChan, and send result.
if nextTxIndex == txCount {
log.Debug("handlePendingResult merged all in wait loop - send complete Result", "error", p.error,
log.Debug("handlePendingResult merged all in wait loop - send complete Result", "mergeWorkerIdx", index, "error", p.error,
"nextTxIndex", nextTxIndex, "mergedIndex", p.mergedTxIndex.Load(), "CumulativeGasUsed", CumulativeGasUsed)
p.txResultChan <- &ParallelTxResult{txReq: nil, result: &ExecutionResult{UsedGas: CumulativeGasUsed}}
// clear the pending chan.
Expand All @@ -1413,14 +1414,14 @@ func (p *ParallelStateProcessor) handlePendingResultLoop(index int, EnableParall
nextToMergeIndex := nextTxIndex
nextToMergeResult, ok := p.pendingConfirmResults.Load(nextTxIndex)
if !ok {
log.Debug("handlePendingResult - can not load next form pendingConfirmResult", "txIndex", nextTxIndex)
log.Debug("handlePendingResult - can not load next form pendingConfirmResult", "mergeWorkerIdx", index, "txIndex", nextTxIndex)
if !OutOfOrderMerge {
break
}
// we trust DAG. so try to see whether we can merge one.
nextToMergeResult, ok = p.pendingConfirmResults.Load(receivedTxIdx)
if !ok {
log.Debug("handlePendingResult - can not load form pendingConfirmResult", "txIndex", receivedTxIdx)
log.Debug("handlePendingResult - can not load form pendingConfirmResult", "mergeWorkerIdx", index, "txIndex", receivedTxIdx)
break
}
// try to see whether the dependency is already merged
Expand All @@ -1436,8 +1437,8 @@ func (p *ParallelStateProcessor) handlePendingResultLoop(index int, EnableParall
}
p.pendingConfirmResults.Delete(nextToMergeIndex)

log.Debug("Start to check result", "TxIndex", int(nextToMergeIndex), "stateDBTx", stateDB.TxIndex(),
"merger", index, "gp", gp.String())
log.Debug("Start to check result", "mergeWorkerIdx", index, "TxIndex", int(nextToMergeIndex), "stateDBTx", stateDB.TxIndex(),
"gp", gp.String())

result := p.confirmTxResults(stateDB, index, gp, nextToMergeIndex, nextToMergeResult.(*ParallelTxResult), &CumulativeGasUsed)
if result == nil {
Expand All @@ -1447,15 +1448,16 @@ func (p *ParallelStateProcessor) handlePendingResultLoop(index int, EnableParall
"merger", index,
"mergedIndex", p.mergedTxIndex.Load(),
"confirmedIndex", result.txReq.txIndex,
"result.err", result.err)
"result.err", result.err,
"CumulativeGasUsed", CumulativeGasUsed)
}
p.resultMutex.Lock()
// update tx result
if result.err != nil {
log.Debug("handlePendingResultLoop result has error", "txIndex", result.txReq.txIndex)
log.Debug("handlePendingResultLoop result has error", "mergeWorkerIdx", index, "txIndex", result.txReq.txIndex)
if result.slotDB.BaseTxIndex() >= int(p.mergedTxIndex.Load()) || OutOfOrderMerge {
// should skip the merge phase
log.Error("ProcessParallel a failed tx", "resultSlotIndex", result.slotIndex,
log.Error("ProcessParallel a failed tx", "mergeWorkerIdx", index, "resultSlotIndex", result.slotIndex,
"resultTxIndex", result.txReq.txIndex, "result.err", result.err)
pos := result.txReq.txIndex
p.commonTxs[pos] = result.txReq.tx
Expand Down

0 comments on commit a9fbae1

Please sign in to comment.