diff --git a/triedb/pathdb/nodebufferlist.go b/triedb/pathdb/nodebufferlist.go index 212889222..9419bc4e4 100644 --- a/triedb/pathdb/nodebufferlist.go +++ b/triedb/pathdb/nodebufferlist.go @@ -489,7 +489,7 @@ func (nf *nodebufferlist) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, nf.traverseReverse(commitFunc) persistID := nf.persistID + nf.base.layers - err := nf.base.flush(nf.db, nf.clean, persistID) + err := nf.base.flush(nf.db, nf.clean, persistID, false) if err != nil { log.Crit("failed to flush base node buffer to disk", "error", err) } @@ -742,13 +742,13 @@ func (nf *nodebufferlist) diffToBase() { } // backgroundFlush flush base node buffer to disk. -func (nf *nodebufferlist) backgroundFlush() { +func (nf *nodebufferlist) backgroundFlush(splitBatches bool) { nf.flushMux.Lock() defer nf.flushMux.Unlock() nf.baseMux.RLock() persistID := nf.persistID + nf.base.layers nf.baseMux.RUnlock() - err := nf.base.flush(nf.db, nf.clean, persistID) + err := nf.base.flush(nf.db, nf.clean, persistID, splitBatches) if err != nil { log.Error("failed to flush base node buffer to disk", "error", err) return @@ -817,7 +817,11 @@ func (nf *nodebufferlist) loop() { } nf.diffToBase() if nf.base.size >= nf.base.limit { - nf.backgroundFlush() + if nf.base.size >= 4*1024*1024*1024 { + nf.backgroundFlush(true) + } else { + nf.backgroundFlush(false) + } } nf.isFlushing.Swap(false) } @@ -1084,7 +1088,7 @@ func (mf *multiDifflayer) empty() bool { // flush persists the in-memory dirty trie node into the disk if the configured // memory threshold is reached. Note, all data must be written atomically. -func (mf *multiDifflayer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error { +func (mf *multiDifflayer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, splitBatches bool) error { // Ensure the target state id is aligned with the internal counter. head := rawdb.ReadPersistentStateID(db) if head+mf.layers != id { @@ -1094,6 +1098,55 @@ func (mf *multiDifflayer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, start = time.Now() batch = db.NewBatchWithSize(int(float64(mf.size) * DefaultBatchRedundancyRate)) ) + + //start + if splitBatches { + totalKeys := len(mf.nodes) + if totalKeys == 0 { + return nil + } + + log.Info("total keys", "total num", totalKeys) + const numBatches = 3 + + batchKeyLimit := totalKeys / numBatches + if totalKeys%numBatches != 0 { + batchKeyLimit++ + } + + totalNodes := 0 + + for len(mf.nodes) > 0 { + batch := db.NewBatch() + + writtenKeys, err := writeNodesInBatchByKeys(batch, mf.nodes, clean, batchKeyLimit) + if err != nil { + return err + } + + totalNodes += len(writtenKeys) + + rawdb.WritePersistentStateID(batch, id) + + if err := batch.Write(); err != nil { + return err + } + + removeWrittenKeys(mf.nodes, writtenKeys) + + if len(writtenKeys) == 0 { + log.Info("success") + break + } + } + + commitTimeTimer.UpdateSince(start) + log.Info("Persisted pathdb nodes in batches", "nodes", totalNodes, "state_id", id, "elapsed", common.PrettyDuration(time.Since(start))) + return nil + } + + //end + nodes := writeNodes(batch, mf.nodes, clean) rawdb.WritePersistentStateID(batch, id) @@ -1160,3 +1213,29 @@ func (mf *multiDifflayer) revert(db ethdb.KeyValueReader, nodes map[common.Hash] mf.updateSize(delta) return nil } + +func writeNodesInBatchByKeys(batch ethdb.Batch, nodes map[common.Hash]map[string]*trienode.Node, clean *fastcache.Cache, batchKeyLimit int) ([]common.Hash, error) { + writtenKeys := make([]common.Hash, 0, batchKeyLimit) + + for hash, nodeMap := range nodes { + written := writeNodes(batch, map[common.Hash]map[string]*trienode.Node{hash: nodeMap}, clean) + if written == 0 { + return nil, fmt.Errorf("failed to write node for key: %v", hash) + } + + writtenKeys = append(writtenKeys, hash) + + if len(writtenKeys) >= batchKeyLimit { + log.Info("writtenKeys access limit") + break + } + } + + return writtenKeys, nil +} + +func removeWrittenKeys(nodes map[common.Hash]map[string]*trienode.Node, writtenKeys []common.Hash) { + for _, key := range writtenKeys { + delete(nodes, key) + } +}