diff --git a/core/parallel_state_processor.go b/core/parallel_state_processor.go index 943220377..b416489c4 100644 --- a/core/parallel_state_processor.go +++ b/core/parallel_state_processor.go @@ -861,7 +861,7 @@ func (p *ParallelStateProcessor) confirmTxResults(statedb *state.StateDB, merger } p.stateDBContextMutex.Unlock() *CumulativeGasUsed += receipt.GasUsed - log.Debug("ProcessParallel tx result out of order", "resultTxIndex", resultTxIndex, "CumulativeGasUsed", *CumulativeGasUsed) + log.Debug("ProcessParallel tx result", "resultTxIndex", resultTxIndex, "CumulativeGasUsed", *CumulativeGasUsed) result.receipt.CumulativeGasUsed = *CumulativeGasUsed p.mergedTxIndex.Store(int32(resultTxIndex)) } @@ -1146,6 +1146,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat if ug, ok := cumulativeGasUsedPerMergeWorker.Load(unconfirmedResult.slotIndex); !ok { cumulativeGasUsedPerMergeWorker.Store(unconfirmedResult.slotIndex, unconfirmedResult.result.UsedGas) + log.Debug("get complete merge worker signal", "merger", unconfirmedResult.slotIndex, "usedGas", unconfirmedResult.result.UsedGas) mergedCount++ } else { log.Debug("get merge worker complete signal more than once", "merger", unconfirmedResult.slotIndex, "usedGas", ug) @@ -1279,7 +1280,7 @@ func (p *ParallelStateProcessor) handlePendingResultLoop(index int, EnableParall // if all merged, notify the main routine. continue to wait for next block. if p.mergedTxIndex.Load()+1 == int32(txCount) { - log.Debug("handlePendingResult merged all, send complete result") + log.Debug("handlePendingResult merged all, send complete result", "mergeWorkerIdx", index, "CumulativeGasUsed", CumulativeGasUsed) p.txResultChan <- &ParallelTxResult{txReq: nil, slotIndex: index, result: &ExecutionResult{UsedGas: CumulativeGasUsed}} // clear the pending chan. @@ -1292,12 +1293,12 @@ func (p *ParallelStateProcessor) handlePendingResultLoop(index int, EnableParall // busy waiting. for { - log.Debug("busy waiting for pending result", "mergedIndex", p.mergedTxIndex.Load(), "allTxCount", txCount) + log.Debug("busy waiting for pending result", "mergeWorkerIdx", index, "mergedIndex", p.mergedTxIndex.Load(), "allTxCount", txCount) nextTxIndex := int(p.mergedTxIndex.Load()) + 1 if OutOfOrderMerge || parallelMergeEnabled { // skip those already merged. for { - log.Debug("OOOMerge check loop", "nextTxIndex", nextTxIndex, "txCount", txCount) + log.Debug("OOOMerge check loop", "mergeWorkerIdx", index, "nextTxIndex", nextTxIndex, "txCount", txCount) if nextTxIndex == txCount { // reach the last, update the mergedTxIndex if needed if p.mergedTxIndex.Load() != int32(nextTxIndex-1) { @@ -1318,7 +1319,7 @@ func (p *ParallelStateProcessor) handlePendingResultLoop(index int, EnableParall if p.receipts[nextTxIndex-1] == nil { // no receipts, there is an error at applyMessage. log.Error("handlePendingResultLoop", "unexpected receipt", nil, - "mergeWorker", "index", "txIndex", nextTxIndex-1, "mergedIndex", p.mergedTxIndex.Load()) + "mergeWorker", index, "txIndex", nextTxIndex-1, "mergedIndex", p.mergedTxIndex.Load()) break } if p.receipts[nextTxIndex-1].CumulativeGasUsed == 0 { @@ -1332,7 +1333,7 @@ func (p *ParallelStateProcessor) handlePendingResultLoop(index int, EnableParall } p.stateDBContextMutex.Unlock() CumulativeGasUsed += receipt.GasUsed - log.Debug("update CumulativeGasUse txCount", "TxIndex", nextTxIndex-1, "CumulativeGasUsed", CumulativeGasUsed) + log.Debug("update CumulativeGasUse txCount", "mergeWorkerIdx", index, "TxIndex", nextTxIndex-1, "CumulativeGasUsed", CumulativeGasUsed) p.receipts[nextTxIndex-1].CumulativeGasUsed = CumulativeGasUsed } @@ -1397,10 +1398,10 @@ func (p *ParallelStateProcessor) handlePendingResultLoop(index int, EnableParall } } - log.Debug("handlePendingResult break from OOO Loop to do result merge", "nextTxIndex", nextTxIndex, "txCount", txCount) + log.Debug("handlePendingResult break from OOO Loop to do result merge", "mergeWorkerIdx", index, "nextTxIndex", nextTxIndex, "txCount", txCount) // all merged, clear the resultAppendChan, and send result. if nextTxIndex == txCount { - log.Debug("handlePendingResult merged all in wait loop - send complete Result", "error", p.error, + log.Debug("handlePendingResult merged all in wait loop - send complete Result", "mergeWorkerIdx", index, "error", p.error, "nextTxIndex", nextTxIndex, "mergedIndex", p.mergedTxIndex.Load(), "CumulativeGasUsed", CumulativeGasUsed) p.txResultChan <- &ParallelTxResult{txReq: nil, result: &ExecutionResult{UsedGas: CumulativeGasUsed}} // clear the pending chan. @@ -1413,14 +1414,14 @@ func (p *ParallelStateProcessor) handlePendingResultLoop(index int, EnableParall nextToMergeIndex := nextTxIndex nextToMergeResult, ok := p.pendingConfirmResults.Load(nextTxIndex) if !ok { - log.Debug("handlePendingResult - can not load next form pendingConfirmResult", "txIndex", nextTxIndex) + log.Debug("handlePendingResult - can not load next form pendingConfirmResult", "mergeWorkerIdx", index, "txIndex", nextTxIndex) if !OutOfOrderMerge { break } // we trust DAG. so try to see whether we can merge one. nextToMergeResult, ok = p.pendingConfirmResults.Load(receivedTxIdx) if !ok { - log.Debug("handlePendingResult - can not load form pendingConfirmResult", "txIndex", receivedTxIdx) + log.Debug("handlePendingResult - can not load form pendingConfirmResult", "mergeWorkerIdx", index, "txIndex", receivedTxIdx) break } // try to see whether the dependency is already merged @@ -1436,8 +1437,8 @@ func (p *ParallelStateProcessor) handlePendingResultLoop(index int, EnableParall } p.pendingConfirmResults.Delete(nextToMergeIndex) - log.Debug("Start to check result", "TxIndex", int(nextToMergeIndex), "stateDBTx", stateDB.TxIndex(), - "merger", index, "gp", gp.String()) + log.Debug("Start to check result", "mergeWorkerIdx", index, "TxIndex", int(nextToMergeIndex), "stateDBTx", stateDB.TxIndex(), + "gp", gp.String()) result := p.confirmTxResults(stateDB, index, gp, nextToMergeIndex, nextToMergeResult.(*ParallelTxResult), &CumulativeGasUsed) if result == nil { @@ -1447,15 +1448,16 @@ func (p *ParallelStateProcessor) handlePendingResultLoop(index int, EnableParall "merger", index, "mergedIndex", p.mergedTxIndex.Load(), "confirmedIndex", result.txReq.txIndex, - "result.err", result.err) + "result.err", result.err, + "CumulativeGasUsed", CumulativeGasUsed) } p.resultMutex.Lock() // update tx result if result.err != nil { - log.Debug("handlePendingResultLoop result has error", "txIndex", result.txReq.txIndex) + log.Debug("handlePendingResultLoop result has error", "mergeWorkerIdx", index, "txIndex", result.txReq.txIndex) if result.slotDB.BaseTxIndex() >= int(p.mergedTxIndex.Load()) || OutOfOrderMerge { // should skip the merge phase - log.Error("ProcessParallel a failed tx", "resultSlotIndex", result.slotIndex, + log.Error("ProcessParallel a failed tx", "mergeWorkerIdx", index, "resultSlotIndex", result.slotIndex, "resultTxIndex", result.txReq.txIndex, "result.err", result.err) pos := result.txReq.txIndex p.commonTxs[pos] = result.txReq.tx