Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R]request block concurrently #1386

Closed
wants to merge 9 commits into from
88 changes: 83 additions & 5 deletions l2geth/rollup/sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1382,6 +1382,11 @@ func (s *SyncService) sync(getLatest indexGetter, getNext nextGetter, syncer ran
if nextIndex == *latestIndex+1 {
return latestIndex, nil
}
//TODO synchronize up to 10000 blocks at a time
if *latestIndex-nextIndex > 10000 {
*latestIndex = nextIndex + 10000
}

if err := syncer(nextIndex, *latestIndex); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1528,19 +1533,92 @@ func (s *SyncService) syncTransactions(backend Backend) (*uint64, error) {
return index, nil
}

func (s *SyncService) getTransactions(start, end, offset uint64, txs []*types.Transaction, backend Backend) error {
var wg sync.WaitGroup
var returnErr error
byteflyfunny marked this conversation as resolved.
Show resolved Hide resolved
var mu sync.Mutex
if start-offset < 0 {
return fmt.Errorf("offset %d is small than start %d", offset, start)
}
wg.Add(int(end - start))

for i := start; i < end; i++ {
go func(index uint64) {
defer wg.Done()
tx, err := s.client.GetTransaction(index, backend)
if err != nil {
log.Warn("cannot fetch transaction by index,will retry 5 times", "index", index)
//retry 5 times
retry := 1
for retry <= 5 {
log.Warn("cannot fetch transaction by index", "index", index, "retry time", retry)
tx, err = s.client.GetTransaction(index, backend)
if err == nil || retry == 5 {
break
}
retry++
time.Sleep(10 * time.Millisecond)
}
if err != nil && returnErr == nil {
mu.Lock()
returnErr = err
mu.Unlock()
}
}
txs[index-offset] = tx
}(i)
}
wg.Wait()
return returnErr
}

// syncTransactionRange will sync a range of transactions from
// start to end (inclusive) from a specific Backend
func (s *SyncService) syncTransactionRange(start, end uint64, backend Backend) error {
log.Info("Syncing transaction range", "start", start, "end", end, "backend", backend.String())
for i := start; i <= end; i++ {
tx, err := s.client.GetTransaction(i, backend)
rangeTxs := make([]*types.Transaction, (end-start)+1)
concurrency := 100
segment := int(end-start) / concurrency
for i := 0; i <= segment; i++ {
subStart := start + uint64(i*concurrency)
subEnd := subStart + uint64(concurrency)
if subEnd > end {
subEnd = end + 1
}
err := s.getTransactions(subStart, subEnd, start, rangeTxs, backend)
if err != nil {
return fmt.Errorf("cannot fetch transaction %d: %w", i, err)
log.Error("fetch transaction err", "subStart", subStart, "subEnd", subEnd, "err", err)
}
if err := s.applyTransaction(tx); err != nil {
return fmt.Errorf("Cannot apply transaction: %w", err)
}

for i := start; i <= end; i++ {
tx := rangeTxs[i-start]
var err error
var previousTxIsNil bool
if tx == nil && previousTxIsNil {
return fmt.Errorf("tx index is not contiguous and needs to be re-fetched index %d", i)
}
if tx == nil {
previousTxIsNil = true
tx, err = s.client.GetTransaction(i, backend)
if err != nil {
return fmt.Errorf("cannot fetch transaction %d: %w", i, err)
}
}
if err = s.applyTransaction(tx); err != nil {
return fmt.Errorf("cannot apply transaction: %w", err)
}
}

//for i := start; i <= end; i++ {
// tx, err := s.client.GetTransaction(i, backend)
// if err != nil {
// return fmt.Errorf("cannot fetch transaction %d: %w", i, err)
// }
// if err := s.applyTransaction(tx); err != nil {
// return fmt.Errorf("Cannot apply transaction: %w", err)
// }
//}
return nil
}

Expand Down
Loading