diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 2100bb6..69a55a3 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -2,19 +2,29 @@ package worker import ( "context" + "fmt" + "time" + etype "github.com/KyberNetwork/evmlistener/pkg/types" "github.com/KyberNetwork/tradelogs/pkg/convert" "github.com/KyberNetwork/tradelogs/pkg/evmlistenerclient" "github.com/KyberNetwork/tradelogs/pkg/parser" "github.com/KyberNetwork/tradelogs/pkg/storage" + "github.com/ethereum/go-ethereum/common/lru" "go.uber.org/zap" ) +type EVMLog struct { + log etype.Log + ts uint64 +} + type Worker struct { listener *evmlistenerclient.Client l *zap.SugaredLogger s *storage.Storage p map[string]parser.Parser + errLogs lru.BasicLRU[string, EVMLog] } func New(l *zap.SugaredLogger, s *storage.Storage, listener *evmlistenerclient.Client, parsers ...parser.Parser) (*Worker, error) { @@ -29,10 +39,12 @@ func New(l *zap.SugaredLogger, s *storage.Storage, listener *evmlistenerclient.C l: l, s: s, p: p, + errLogs: lru.NewBasicLRU[string, EVMLog](1000), }, nil } func (w *Worker) Run(ctx context.Context) error { + retryTimer := time.NewTicker(evmlistenerclient.BlockTime) for { m, err := w.listener.GConsume(ctx) if err != nil { @@ -51,16 +63,35 @@ func (w *Worker) Run(ctx context.Context) error { w.l.Errorw("Error when ack msg", "error", err) return err } + select { + case <-retryTimer.C: + if err := w.retryParseLog(); err != nil { + w.l.Errorw("error when retry parse log", "err", err) + return err + } + default: + } } } func (w *Worker) processMessages(m []evmlistenerclient.Message) error { - var ( - insertOrders []storage.TradeLog - deleteBlocks []uint64 - ) - for _, message := range m { + var ( + insertOrders []storage.TradeLog + deleteBlocks []uint64 + ) for _, block := range message.NewBlocks { + for _, block := range message.RevertedBlocks { + deleteBlocks = append(deleteBlocks, block.Number.Uint64()) + for _, k := range w.errLogs.Keys() { + l, ok := w.errLogs.Peek(k) + if !ok { + continue + } + if l.log.BlockHash == block.Hash { + w.errLogs.Remove(k) + } + } + } for _, log := range block.Logs { if len(log.Topics) == 0 { continue @@ -72,22 +103,48 @@ func (w *Worker) processMessages(m []evmlistenerclient.Message) error { order, err := ps.Parse(convert.ToETHLog(log), block.Timestamp) if err != nil { w.l.Errorw("error when parse log", "log", log, "order", order, "err", err) + w.errLogs.Add(fmt.Sprintf("%d-%d", log.BlockNumber, log.Index), EVMLog{ + log: log, + ts: block.Timestamp, + }) continue } insertOrders = append(insertOrders, order) } } - for _, block := range message.RevertedBlocks { - deleteBlocks = append(deleteBlocks, block.Number.Uint64()) + + if err := w.s.Delete(deleteBlocks); err != nil { + return err + } + if err := w.s.Insert(insertOrders); err != nil { + return err } } - err := w.s.Delete(deleteBlocks) - if err != nil { - return err + return nil +} + +func (w *Worker) retryParseLog() error { + insertOrders := []storage.TradeLog{} + for _, k := range w.errLogs.Keys() { + l, ok := w.errLogs.Peek(k) + if !ok { + continue + } + ps := w.p[l.log.Topics[0]] + if ps == nil { + continue + } + order, err := ps.Parse(convert.ToETHLog(l.log), l.ts) + if err != nil { + continue + } + w.l.Infow("retry log successfully", "key", k, "parser", ps.Exchange()) + w.errLogs.Remove(k) + insertOrders = append(insertOrders, order) } - err = w.s.Insert(insertOrders) - if err != nil { + + if err := w.s.Insert(insertOrders); err != nil { return err } return nil diff --git a/pkg/evmlistenerclient/client.go b/pkg/evmlistenerclient/client.go index 12291eb..b273100 100644 --- a/pkg/evmlistenerclient/client.go +++ b/pkg/evmlistenerclient/client.go @@ -16,7 +16,7 @@ const ( ) var ( - blockTime = 12 * time.Second + BlockTime = 12 * time.Second ) type Config struct { @@ -120,7 +120,7 @@ func (c *Client) Consume( streams, err := c.client.XRead(ctx, &redis.XReadArgs{ Streams: []string{c.config.Topic, id}, Count: 1, - Block: blockTime, + Block: BlockTime, }).Result() if err != nil {