From 433d674ddf1a4233b514ade89467d5a31f566578 Mon Sep 17 00:00:00 2001 From: I know Date: Thu, 21 Dec 2023 17:11:37 +0800 Subject: [PATCH 1/8] request block concurrently --- l2geth/rollup/sync_service.go | 34 ++++++++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index 0f58ed32d..17915a3a5 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -1532,15 +1532,41 @@ func (s *SyncService) syncTransactions(backend Backend) (*uint64, error) { // start to end (inclusive) from a specific Backend func (s *SyncService) syncTransactionRange(start, end uint64, backend Backend) error { log.Info("Syncing transaction range", "start", start, "end", end, "backend", backend.String()) + rangeTxs := &sync.Map{} + var wg sync.WaitGroup + wg.Add(int(end - start + 1)) for i := start; i <= end; i++ { - tx, err := s.client.GetTransaction(i, backend) - if err != nil { - return fmt.Errorf("cannot fetch transaction %d: %w", i, err) + go func() { + defer wg.Done() + tx, _ := s.client.GetTransaction(i, backend) + rangeTxs.Store(i, tx) + }() + } + wg.Wait() + + for i := start; i <= end; i++ { + tx, ok := rangeTxs.Load(i) + var err error + if !ok { + tx, err = s.client.GetTransaction(i, backend) + if err != nil { + return fmt.Errorf("cannot fetch transaction %d: %w", i, err) + } } - if err := s.applyTransaction(tx); err != nil { + if err = s.applyTransaction(tx.(*types.Transaction)); err != nil { return fmt.Errorf("Cannot apply transaction: %w", err) } } + + //for i := start; i <= end; i++ { + // tx, err := s.client.GetTransaction(i, backend) + // if err != nil { + // return fmt.Errorf("cannot fetch transaction %d: %w", i, err) + // } + // if err := s.applyTransaction(tx); err != nil { + // return fmt.Errorf("Cannot apply transaction: %w", err) + // } + //} return nil } From 7893bd096cfae850a3072822d1d513b150700129 Mon Sep 17 00:00:00 2001 From: I know Date: Thu, 21 Dec 2023 18:22:32 +0800 Subject: [PATCH 2/8] request block concurrently --- l2geth/rollup/sync_service.go | 45 +++++++++++++++++++++++++---------- 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index 17915a3a5..082416bd6 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -1528,24 +1528,45 @@ func (s *SyncService) syncTransactions(backend Backend) (*uint64, error) { return index, nil } -// syncTransactionRange will sync a range of transactions from -// start to end (inclusive) from a specific Backend -func (s *SyncService) syncTransactionRange(start, end uint64, backend Backend) error { - log.Info("Syncing transaction range", "start", start, "end", end, "backend", backend.String()) - rangeTxs := &sync.Map{} +func (s *SyncService) getTransactions(start, end uint64, txs map[uint64]*types.Transaction, backend Backend) error { var wg sync.WaitGroup + var returnErr error wg.Add(int(end - start + 1)) - for i := start; i <= end; i++ { - go func() { + + for i := start; i < end; i++ { + go func(index uint64) { defer wg.Done() - tx, _ := s.client.GetTransaction(i, backend) - rangeTxs.Store(i, tx) - }() + tx, err := s.client.GetTransaction(index, backend) + returnErr = err + txs[index] = tx + }(i) } wg.Wait() + return returnErr +} + +// syncTransactionRange will sync a range of transactions from +// start to end (inclusive) from a specific Backend +func (s *SyncService) syncTransactionRange(start, end uint64, backend Backend) error { + log.Info("Syncing transaction range", "start", start, "end", end, "backend", backend.String()) + rangeTxs := make(map[uint64]*types.Transaction) + concurrency := 50 + segment := int(end-start) / concurrency + for i := 0; i <= segment; i++ { + subStart := start + uint64(i*concurrency) + subEnd := subStart + uint64(concurrency) + if subEnd > end { + subEnd = end + 1 + } + + err := s.getTransactions(subStart, subEnd, rangeTxs, backend) + if err != nil { + return fmt.Errorf("cannot fetch transaction %d: %w", i, err) + } + } for i := start; i <= end; i++ { - tx, ok := rangeTxs.Load(i) + tx, ok := rangeTxs[i] var err error if !ok { tx, err = s.client.GetTransaction(i, backend) @@ -1553,7 +1574,7 @@ func (s *SyncService) syncTransactionRange(start, end uint64, backend Backend) e return fmt.Errorf("cannot fetch transaction %d: %w", i, err) } } - if err = s.applyTransaction(tx.(*types.Transaction)); err != nil { + if err = s.applyTransaction(tx); err != nil { return fmt.Errorf("Cannot apply transaction: %w", err) } } From 216c06aa86f2b6e4423dd008861b4c2bbccb61e2 Mon Sep 17 00:00:00 2001 From: I know Date: Fri, 22 Dec 2023 10:17:43 +0800 Subject: [PATCH 3/8] request block concurrently --- l2geth/rollup/sync_service.go | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index 082416bd6..12065bff4 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -1528,17 +1528,22 @@ func (s *SyncService) syncTransactions(backend Backend) (*uint64, error) { return index, nil } -func (s *SyncService) getTransactions(start, end uint64, txs map[uint64]*types.Transaction, backend Backend) error { +func (s *SyncService) getTransactions(start, end, offset uint64, txs []*types.Transaction, backend Backend) error { var wg sync.WaitGroup var returnErr error - wg.Add(int(end - start + 1)) + if start-offset < 0 { + return fmt.Errorf("offset %d is small than start %d", offset, start) + } + wg.Add(int(end - start)) for i := start; i < end; i++ { go func(index uint64) { defer wg.Done() tx, err := s.client.GetTransaction(index, backend) - returnErr = err - txs[index] = tx + if err != nil { + returnErr = err + } + txs[index-offset] = tx }(i) } wg.Wait() @@ -1549,7 +1554,7 @@ func (s *SyncService) getTransactions(start, end uint64, txs map[uint64]*types.T // start to end (inclusive) from a specific Backend func (s *SyncService) syncTransactionRange(start, end uint64, backend Backend) error { log.Info("Syncing transaction range", "start", start, "end", end, "backend", backend.String()) - rangeTxs := make(map[uint64]*types.Transaction) + rangeTxs := make([]*types.Transaction, (end-start)+1) concurrency := 50 segment := int(end-start) / concurrency for i := 0; i <= segment; i++ { @@ -1559,23 +1564,23 @@ func (s *SyncService) syncTransactionRange(start, end uint64, backend Backend) e subEnd = end + 1 } - err := s.getTransactions(subStart, subEnd, rangeTxs, backend) + err := s.getTransactions(subStart, subEnd, start, rangeTxs, backend) if err != nil { return fmt.Errorf("cannot fetch transaction %d: %w", i, err) } } for i := start; i <= end; i++ { - tx, ok := rangeTxs[i] + tx := rangeTxs[i-start] var err error - if !ok { + if tx == nil { tx, err = s.client.GetTransaction(i, backend) if err != nil { return fmt.Errorf("cannot fetch transaction %d: %w", i, err) } } if err = s.applyTransaction(tx); err != nil { - return fmt.Errorf("Cannot apply transaction: %w", err) + return fmt.Errorf("cannot apply transaction: %w", err) } } From 237c9554c0466eb77725a36c14ec7e3e9b6e4c8e Mon Sep 17 00:00:00 2001 From: I know Date: Fri, 22 Dec 2023 15:37:32 +0800 Subject: [PATCH 4/8] request block concurrently --- l2geth/rollup/sync_service.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index 12065bff4..11dc95fa2 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -1382,6 +1382,11 @@ func (s *SyncService) sync(getLatest indexGetter, getNext nextGetter, syncer ran if nextIndex == *latestIndex+1 { return latestIndex, nil } + //TODO + if *latestIndex-nextIndex > 10000 { + *latestIndex = nextIndex + 10000 + } + if err := syncer(nextIndex, *latestIndex); err != nil { return nil, err } From 3352a046417c7a0fd238753d144de083af0eddfa Mon Sep 17 00:00:00 2001 From: I know Date: Thu, 4 Jan 2024 15:10:02 +0800 Subject: [PATCH 5/8] request block concurrently --- l2geth/rollup/sync_service.go | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index 11dc95fa2..55ca7b272 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -1382,7 +1382,7 @@ func (s *SyncService) sync(getLatest indexGetter, getNext nextGetter, syncer ran if nextIndex == *latestIndex+1 { return latestIndex, nil } - //TODO + //TODO synchronize up to 10000 blocks at a time if *latestIndex-nextIndex > 10000 { *latestIndex = nextIndex + 10000 } @@ -1536,6 +1536,7 @@ func (s *SyncService) syncTransactions(backend Backend) (*uint64, error) { func (s *SyncService) getTransactions(start, end, offset uint64, txs []*types.Transaction, backend Backend) error { var wg sync.WaitGroup var returnErr error + var mu sync.Mutex if start-offset < 0 { return fmt.Errorf("offset %d is small than start %d", offset, start) } @@ -1546,7 +1547,22 @@ func (s *SyncService) getTransactions(start, end, offset uint64, txs []*types.Tr defer wg.Done() tx, err := s.client.GetTransaction(index, backend) if err != nil { - returnErr = err + log.Warn("cannot fetch transaction by index,will retry 5 times", "index", index) + //retry 5 times + retry := 1 + for retry <= 5 { + log.Warn("cannot fetch transaction by index", "index", index, "retry time", retry) + tx, err = s.client.GetTransaction(index, backend) + if err == nil || retry == 5 { + break + } + retry++ + } + if err != nil && returnErr == nil { + mu.Lock() + returnErr = err + mu.Unlock() + } } txs[index-offset] = tx }(i) @@ -1568,17 +1584,21 @@ func (s *SyncService) syncTransactionRange(start, end uint64, backend Backend) e if subEnd > end { subEnd = end + 1 } - err := s.getTransactions(subStart, subEnd, start, rangeTxs, backend) if err != nil { - return fmt.Errorf("cannot fetch transaction %d: %w", i, err) + log.Error("fetch transaction err", "subStart", subStart, "subEnd", subEnd, "err", err) } } for i := start; i <= end; i++ { tx := rangeTxs[i-start] var err error + var previousTxIsNil bool + if tx == nil && previousTxIsNil { + return fmt.Errorf("tx index is not contiguous and needs to be re-fetched index %d", i) + } if tx == nil { + previousTxIsNil = true tx, err = s.client.GetTransaction(i, backend) if err != nil { return fmt.Errorf("cannot fetch transaction %d: %w", i, err) From 195f86475f6a8dc96e21dec782027984ddccb902 Mon Sep 17 00:00:00 2001 From: I know Date: Fri, 5 Jan 2024 14:23:06 +0800 Subject: [PATCH 6/8] add sleep time during retry to get block --- l2geth/rollup/sync_service.go | 1 + 1 file changed, 1 insertion(+) diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index 55ca7b272..9ae673f5e 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -1557,6 +1557,7 @@ func (s *SyncService) getTransactions(start, end, offset uint64, txs []*types.Tr break } retry++ + time.Sleep(10 * time.Millisecond) } if err != nil && returnErr == nil { mu.Lock() From c0334b581f631a8b1ee9ef817e6bbdb0cf7b904f Mon Sep 17 00:00:00 2001 From: I know Date: Mon, 8 Jan 2024 17:10:45 +0800 Subject: [PATCH 7/8] debug 100 concurrency --- l2geth/rollup/sync_service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index 9ae673f5e..7748808ab 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -1577,7 +1577,7 @@ func (s *SyncService) getTransactions(start, end, offset uint64, txs []*types.Tr func (s *SyncService) syncTransactionRange(start, end uint64, backend Backend) error { log.Info("Syncing transaction range", "start", start, "end", end, "backend", backend.String()) rangeTxs := make([]*types.Transaction, (end-start)+1) - concurrency := 50 + concurrency := 100 segment := int(end-start) / concurrency for i := 0; i <= segment; i++ { subStart := start + uint64(i*concurrency) From 6c8e92025fce1f41eb0fdbd7048a035600af6d91 Mon Sep 17 00:00:00 2001 From: I know Date: Tue, 9 Jan 2024 11:40:42 +0800 Subject: [PATCH 8/8] break out of the for loop and stop subsequent request calls --- l2geth/rollup/sync_service.go | 1 + 1 file changed, 1 insertion(+) diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index 7748808ab..9dd1cd319 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -1588,6 +1588,7 @@ func (s *SyncService) syncTransactionRange(start, end uint64, backend Backend) e err := s.getTransactions(subStart, subEnd, start, rangeTxs, backend) if err != nil { log.Error("fetch transaction err", "subStart", subStart, "subEnd", subEnd, "err", err) + break } }