Skip to content

Commit

Permalink
Merge pull request #37 from KyberNetwork/TRD-257-add-message-queue-to…
Browse files Browse the repository at this point in the history
…-handle-synchronization-between-evmlistener-tradecall

add queue for error parse log to retry
  • Loading branch information
ngocthanh1389 authored Apr 9, 2024
2 parents 7b20cd7 + c88ea65 commit dac2d5a
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 14 deletions.
81 changes: 69 additions & 12 deletions internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,29 @@ package worker

import (
"context"
"fmt"
"time"

etype "github.com/KyberNetwork/evmlistener/pkg/types"
"github.com/KyberNetwork/tradelogs/pkg/convert"
"github.com/KyberNetwork/tradelogs/pkg/evmlistenerclient"
"github.com/KyberNetwork/tradelogs/pkg/parser"
"github.com/KyberNetwork/tradelogs/pkg/storage"
"github.com/ethereum/go-ethereum/common/lru"
"go.uber.org/zap"
)

type EVMLog struct {
log etype.Log
ts uint64
}

type Worker struct {
listener *evmlistenerclient.Client
l *zap.SugaredLogger
s *storage.Storage
p map[string]parser.Parser
errLogs lru.BasicLRU[string, EVMLog]
}

func New(l *zap.SugaredLogger, s *storage.Storage, listener *evmlistenerclient.Client, parsers ...parser.Parser) (*Worker, error) {
Expand All @@ -29,10 +39,12 @@ func New(l *zap.SugaredLogger, s *storage.Storage, listener *evmlistenerclient.C
l: l,
s: s,
p: p,
errLogs: lru.NewBasicLRU[string, EVMLog](1000),
}, nil
}

func (w *Worker) Run(ctx context.Context) error {
retryTimer := time.NewTicker(evmlistenerclient.BlockTime)
for {
m, err := w.listener.GConsume(ctx)
if err != nil {
Expand All @@ -51,16 +63,35 @@ func (w *Worker) Run(ctx context.Context) error {
w.l.Errorw("Error when ack msg", "error", err)
return err
}
select {
case <-retryTimer.C:
if err := w.retryParseLog(); err != nil {
w.l.Errorw("error when retry parse log", "err", err)
return err
}
default:
}
}
}
func (w *Worker) processMessages(m []evmlistenerclient.Message) error {
var (
insertOrders []storage.TradeLog
deleteBlocks []uint64
)

for _, message := range m {
var (
insertOrders []storage.TradeLog
deleteBlocks []uint64
)
for _, block := range message.NewBlocks {
for _, block := range message.RevertedBlocks {
deleteBlocks = append(deleteBlocks, block.Number.Uint64())
for _, k := range w.errLogs.Keys() {
l, ok := w.errLogs.Peek(k)
if !ok {
continue
}
if l.log.BlockHash == block.Hash {
w.errLogs.Remove(k)
}
}
}
for _, log := range block.Logs {
if len(log.Topics) == 0 {
continue
Expand All @@ -72,22 +103,48 @@ func (w *Worker) processMessages(m []evmlistenerclient.Message) error {
order, err := ps.Parse(convert.ToETHLog(log), block.Timestamp)
if err != nil {
w.l.Errorw("error when parse log", "log", log, "order", order, "err", err)
w.errLogs.Add(fmt.Sprintf("%d-%d", log.BlockNumber, log.Index), EVMLog{
log: log,
ts: block.Timestamp,
})
continue
}
insertOrders = append(insertOrders, order)
}
}
for _, block := range message.RevertedBlocks {
deleteBlocks = append(deleteBlocks, block.Number.Uint64())

if err := w.s.Delete(deleteBlocks); err != nil {
return err
}
if err := w.s.Insert(insertOrders); err != nil {
return err
}
}

err := w.s.Delete(deleteBlocks)
if err != nil {
return err
return nil
}

func (w *Worker) retryParseLog() error {
insertOrders := []storage.TradeLog{}
for _, k := range w.errLogs.Keys() {
l, ok := w.errLogs.Peek(k)
if !ok {
continue
}
ps := w.p[l.log.Topics[0]]
if ps == nil {
continue
}
order, err := ps.Parse(convert.ToETHLog(l.log), l.ts)
if err != nil {
continue
}
w.l.Infow("retry log successfully", "key", k, "parser", ps.Exchange())
w.errLogs.Remove(k)
insertOrders = append(insertOrders, order)
}
err = w.s.Insert(insertOrders)
if err != nil {

if err := w.s.Insert(insertOrders); err != nil {
return err
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/evmlistenerclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (
)

var (
blockTime = 12 * time.Second
BlockTime = 12 * time.Second
)

type Config struct {
Expand Down Expand Up @@ -120,7 +120,7 @@ func (c *Client) Consume(
streams, err := c.client.XRead(ctx, &redis.XReadArgs{
Streams: []string{c.config.Topic, id},
Count: 1,
Block: blockTime,
Block: BlockTime,
}).Result()

if err != nil {
Expand Down

0 comments on commit dac2d5a

Please sign in to comment.