diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index 0f58ed32d..9dd1cd319 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -1382,6 +1382,11 @@ func (s *SyncService) sync(getLatest indexGetter, getNext nextGetter, syncer ran if nextIndex == *latestIndex+1 { return latestIndex, nil } + //TODO synchronize up to 10000 blocks at a time + if *latestIndex-nextIndex > 10000 { + *latestIndex = nextIndex + 10000 + } + if err := syncer(nextIndex, *latestIndex); err != nil { return nil, err } @@ -1528,19 +1533,93 @@ func (s *SyncService) syncTransactions(backend Backend) (*uint64, error) { return index, nil } +func (s *SyncService) getTransactions(start, end, offset uint64, txs []*types.Transaction, backend Backend) error { + var wg sync.WaitGroup + var returnErr error + var mu sync.Mutex + if start-offset < 0 { + return fmt.Errorf("offset %d is small than start %d", offset, start) + } + wg.Add(int(end - start)) + + for i := start; i < end; i++ { + go func(index uint64) { + defer wg.Done() + tx, err := s.client.GetTransaction(index, backend) + if err != nil { + log.Warn("cannot fetch transaction by index,will retry 5 times", "index", index) + //retry 5 times + retry := 1 + for retry <= 5 { + log.Warn("cannot fetch transaction by index", "index", index, "retry time", retry) + tx, err = s.client.GetTransaction(index, backend) + if err == nil || retry == 5 { + break + } + retry++ + time.Sleep(10 * time.Millisecond) + } + if err != nil && returnErr == nil { + mu.Lock() + returnErr = err + mu.Unlock() + } + } + txs[index-offset] = tx + }(i) + } + wg.Wait() + return returnErr +} + // syncTransactionRange will sync a range of transactions from // start to end (inclusive) from a specific Backend func (s *SyncService) syncTransactionRange(start, end uint64, backend Backend) error { log.Info("Syncing transaction range", "start", start, "end", end, "backend", backend.String()) - for i := start; i <= end; i++ { - tx, err := s.client.GetTransaction(i, backend) + rangeTxs := make([]*types.Transaction, (end-start)+1) + concurrency := 100 + segment := int(end-start) / concurrency + for i := 0; i <= segment; i++ { + subStart := start + uint64(i*concurrency) + subEnd := subStart + uint64(concurrency) + if subEnd > end { + subEnd = end + 1 + } + err := s.getTransactions(subStart, subEnd, start, rangeTxs, backend) if err != nil { - return fmt.Errorf("cannot fetch transaction %d: %w", i, err) + log.Error("fetch transaction err", "subStart", subStart, "subEnd", subEnd, "err", err) + break } - if err := s.applyTransaction(tx); err != nil { - return fmt.Errorf("Cannot apply transaction: %w", err) + } + + for i := start; i <= end; i++ { + tx := rangeTxs[i-start] + var err error + var previousTxIsNil bool + if tx == nil && previousTxIsNil { + return fmt.Errorf("tx index is not contiguous and needs to be re-fetched index %d", i) + } + if tx == nil { + previousTxIsNil = true + tx, err = s.client.GetTransaction(i, backend) + if err != nil { + return fmt.Errorf("cannot fetch transaction %d: %w", i, err) + } + } + if err = s.applyTransaction(tx); err != nil { + return fmt.Errorf("cannot apply transaction: %w", err) } } + + //for i := start; i <= end; i++ { + // tx, err := s.client.GetTransaction(i, backend) + // if err != nil { + // return fmt.Errorf("cannot fetch transaction %d: %w", i, err) + // } + // if err := s.applyTransaction(tx); err != nil { + // return fmt.Errorf("Cannot apply transaction: %w", err) + // } + //} return nil }