Skip to content

Commit

Permalink
Merge pull request #33 from KyberNetwork/TRD-212-add-backfill-by-dune…
Browse files Browse the repository at this point in the history
…-feature

add backfill by dune
  • Loading branch information
ngocthanh1389 authored Mar 25, 2024
2 parents 6a6a005 + fea3889 commit 6884c5c
Show file tree
Hide file tree
Showing 22 changed files with 488 additions and 77 deletions.
40 changes: 13 additions & 27 deletions cmd/tradelogs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
"os"
"time"

"github.com/KyberNetwork/tradelogs/pkg/dune"
"github.com/KyberNetwork/tradelogs/pkg/parser"
"github.com/KyberNetwork/tradelogs/pkg/parser/oneinch"
"github.com/KyberNetwork/tradelogs/pkg/rpcnode"
"github.com/KyberNetwork/tradelogs/pkg/tracecall"

libapp "github.com/KyberNetwork/tradelogs/internal/app"
"github.com/KyberNetwork/tradelogs/internal/bigquery"
"github.com/KyberNetwork/tradelogs/internal/dbutil"
backfill "github.com/KyberNetwork/tradelogs/internal/server/backfill"
tradelogs "github.com/KyberNetwork/tradelogs/internal/server/tradelogs"
Expand Down Expand Up @@ -92,8 +92,7 @@ func run(c *cli.Context) error {
}
traceCalls := tracecall.NewCache(rpcClient)

w, err := worker.New(l, s, listener,
kyberswap.MustNewParser(),
parsers := []parser.Parser{kyberswap.MustNewParser(),
zxotc.MustNewParser(),
zxrfq.MustNewParser(),
tokenlon.MustNewParser(),
Expand All @@ -103,35 +102,22 @@ func run(c *cli.Context) error {
kyberswaprfq.MustNewParser(),
hashflowv3.MustNewParser(),
oneinch.MustNewParser(traceCalls),
)
}

w, err := worker.New(l, s, listener, parsers...)
if err != nil {
l.Errorw("Error while init worker")
return err
}
parserMap := map[string]parser.Parser{
"kyberswap": kyberswap.MustNewParser(),
"zxotc": zxotc.MustNewParser(),
"zxrfq": zxrfq.MustNewParser(),
"tokenlon": tokenlon.MustNewParser(),
"paraswap": paraswap.MustNewParser(),
"hashflow": hashflow.MustNewParser(),
"native": native.MustNewParser(),
"kyberswaprfq": kyberswaprfq.MustNewParser(),
"hashflowv3": hashflowv3.MustNewParser(),
"1inch": oneinch.MustNewParser(traceCalls),
}

backfillWorker, err := bigquery.NewWorker(libapp.BigqueryProjectIDFFromCli(c), s, parserMap)
if err != nil {
l.Errorw("Error while init backfillWorker")
} else {
httpBackfill := backfill.New(c.String(libapp.HTTPBackfillServerFlag.Name), backfillWorker)
go func() {
if err := httpBackfill.Run(); err != nil {
panic(err)
}
}()
}
httpBackfill := backfill.New(c.String(libapp.HTTPBackfillServerFlag.Name),
backfill.NewDuneWoker(dune.NewClient(c.String(libapp.DuneURLFlag.Name), c.String(libapp.DuneKeyFlag.Name), httpClient), s),
parsers)
go func() {
if err := httpBackfill.Run(); err != nil {
panic(err)
}
}()

httpTradelogs := tradelogs.New(l, s, c.String(libapp.HTTPServerFlag.Name))
go func() {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/joho/godotenv v1.4.0
github.com/lib/pq v1.10.9
github.com/pkg/errors v0.9.1
github.com/rs/xid v1.5.0
github.com/shopspring/decimal v1.3.1
github.com/stretchr/testify v1.8.4
github.com/urfave/cli v1.22.14
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,8 @@ github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZV
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik=
github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
Expand Down
13 changes: 13 additions & 0 deletions internal/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,22 @@ var HTTPBackfillServerFlag = cli.StringFlag{
Value: "localhost:8081",
}

var DuneURLFlag = cli.StringFlag{
Name: "dune-url",
EnvVar: "DUNE_URL",
Value: "https://api.dune.com/api",
}

var DuneKeyFlag = cli.StringFlag{
Name: "dune-key",
EnvVar: "DUNE_KEY",
}

func HTTPServerFlags() []cli.Flag {
return []cli.Flag{
HTTPServerFlag,
HTTPBackfillServerFlag,
DuneURLFlag,
DuneKeyFlag,
}
}
130 changes: 130 additions & 0 deletions internal/server/backfill/dune_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package server

import (
"time"

"github.com/KyberNetwork/tradelogs/pkg/dune"
"github.com/KyberNetwork/tradelogs/pkg/parser"
"github.com/KyberNetwork/tradelogs/pkg/storage"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"go.uber.org/zap"
)

const (
limit = 100
retry = 5
)

type DuneWorker struct {
client *dune.Client
s *storage.Storage
}

func NewDuneWoker(c *dune.Client, s *storage.Storage) *DuneWorker {
return &DuneWorker{
client: c,
s: s,
}
}

func (d *DuneWorker) backfill(l *zap.SugaredLogger,
queryID int64, from, to uint64, ps parser.Parser, topic string) error {
l.Infow("start backfill data", "query", queryID, "topic", topic, "from", from, "to", to)
queryRes, err := d.client.ExecuteQuery(queryID, topic, from, to)
if err != nil {
return err
}

l.Infow("executing query", "data", queryRes)
errCount := 0
for {
time.Sleep(5 * time.Second)
state, err := d.client.ExecuteState(queryRes.ExecutionID)
if err != nil {
l.Errorw("error when get state", "error", err)
errCount++
if errCount > retry {
return err
}
continue
}
l.Infow("execute query", "state", state)
if state.IsExecutionFinished {
break
}
}
l.Infow("executed query")

var (
progress uint64 = 0
data []storage.TradeLog
)
errCount = 0

for {
time.Sleep(5 * time.Second)
logs, rowCount, err := d.client.GetLastestExecuteResult(queryID, limit, progress)
if err != nil {
l.Errorw("error when collect data", "error", err)
errCount++
if errCount > retry {
return err
}
continue
}

l.Infow("collect data", "progress", progress, "len", len(logs), "total", rowCount)
for _, l := range logs {
ts, err := time.Parse("2006-01-02 15:04:05.999 UTC", l.BlockTime)
if err != nil {
return err
}
parsedLog, err := ps.Parse(DuneLogToETHLog(l), uint64(ts.Unix()))
if err != nil {
return err
}
data = append(data, parsedLog)
}
l.Infow("parsed data", "len", len(data))
if err = d.s.Insert(data); err != nil {
return err
}
progress += limit + 1
if progress >= rowCount {
break
}
}
return nil
}

func DuneLogToETHLog(log dune.DuneLog) types.Log {
data, err := hexutil.Decode(log.Data)
if err != nil {
return types.Log{}
}
topics := []common.Hash{}
if log.Topic0 != "" {
topics = append(topics, common.HexToHash(log.Topic0))
}
if log.Topic1 != "" {
topics = append(topics, common.HexToHash(log.Topic1))
}
if log.Topic2 != "" {
topics = append(topics, common.HexToHash(log.Topic2))
}
if log.Topic3 != "" {
topics = append(topics, common.HexToHash(log.Topic3))
}
return types.Log{
Address: common.HexToAddress(log.ContractAddress),
Topics: topics,
Data: data,
BlockNumber: log.BlockNumber,
TxHash: common.HexToHash(log.TxHash),
TxIndex: log.TxIndex,
BlockHash: common.HexToHash(log.BlockHash),
Index: log.Index,
}
}
99 changes: 50 additions & 49 deletions internal/server/backfill/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,35 @@ import (
"fmt"
"net/http"
"strings"
"time"

"github.com/KyberNetwork/tradelogs/internal/bigquery"
"github.com/KyberNetwork/tradelogs/pkg/parser"
"github.com/KyberNetwork/tradelogs/pkg/storage"
"github.com/gin-contrib/pprof"
"github.com/gin-gonic/gin"
"github.com/rs/xid"
"go.uber.org/zap"
)

// Server to serve the service.
type Server struct {
l *zap.SugaredLogger
r *gin.Engine
bq *bigquery.Worker
dune *DuneWorker
bindAddr string
parsers []parser.Parser
}

// New returns a new server.
func New(bindAddr string, bq *bigquery.Worker) *Server {
func New(bindAddr string, dune *DuneWorker, parsers []parser.Parser) *Server {
engine := gin.New()
engine.Use(gin.Recovery())

server := &Server{
l: zap.S(),
r: engine,
bq: bq,
dune: dune,
bindAddr: bindAddr,
parsers: parsers,
}

gin.SetMode(gin.ReleaseMode)
Expand All @@ -50,7 +53,6 @@ func (s *Server) Run() error {
func (s *Server) register() {
pprof.Register(s.r, "/debug")
s.r.POST("/backfill", s.backfill)
s.r.POST("/backfill-1inch", s.backfillOneinch)
}

func responseErr(c *gin.Context, err error) {
Expand All @@ -60,60 +62,59 @@ func responseErr(c *gin.Context, err error) {
})
}

func responseOK(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"success": true,
})
}

func (s *Server) backfill(c *gin.Context) {
var (
query struct {
FromTime int64 `form:"from_time" json:"from_time,omitempty"` // milliseconds
ToTime int64 `form:"to_time" json:"to_time,omitempty"` // milliseconds
Exchanges string `form:"exchanges" json:"exchanges,omitempty"`
}
err = c.ShouldBind(&query)
)
if err != nil {
var params storage.BackfillQuery
if err := c.BindJSON(&params); err != nil {
responseErr(c, err)
return
}

var exchanges []string
if query.Exchanges != "" {
exchanges = strings.Split(query.Exchanges, ",")
}
s.l.Infow("Request backfill", "query", query)
if query.FromTime == 0 || query.ToTime == 0 {
err = s.bq.BackFillAllData(exchanges)
} else {
err = s.bq.BackFillPartialData(query.FromTime/1000, query.ToTime/1000, exchanges)
}
if err != nil {
responseErr(c, err)
if params.FromBlock > params.ToBlock {
responseErr(c, fmt.Errorf("from block is greater than to block"))
return
}

c.JSON(http.StatusOK, gin.H{
"success": true,
"time": time.Now().UnixMilli(),
})
}

func (s *Server) backfillOneinch(c *gin.Context) {
var (
query BackFillOneInchRequest
)
if err := c.ShouldBind(&query); err != nil {
responseErr(c, err)
if params.EventHash == "" && params.Exchange == "" {
responseErr(c, fmt.Errorf("empty event hash or exchange"))
return
}

tradeLogs := query.ToTradeLogs()
l := s.l.With("reqID", xid.New().String())
l.Infow("receive backfill params", "params", params)
if params.EventHash != "" {
for _, p := range s.parsers {
for _, t := range p.Topics() {
if strings.EqualFold(params.EventHash, t) {
if err := s.dune.backfill(l, params.QueryID, params.FromBlock, params.ToBlock, p, t); err != nil {
l.Errorw("error when backfill", "error", err)
responseErr(c, err)
return
}
responseOK(c)
return
}
}
}
}

err := s.bq.BackfillOneInchRFQ(tradeLogs)
if err != nil {
responseErr(c, err)
return
for _, p := range s.parsers {
if strings.EqualFold(params.Exchange, p.Exchange()) {
for _, t := range p.Topics() {
if err := s.dune.backfill(l, params.QueryID, params.FromBlock, params.ToBlock, p, t); err != nil {
l.Errorw("error when backfill", "error", err)
responseErr(c, err)
return
}
}
responseOK(c)
return
}
}
c.JSON(http.StatusOK, gin.H{
"message": "Backfill 1inch rfq orders successfully",
"success": true,
"time": time.Now().UnixMilli(),
})

}
Loading

0 comments on commit 6884c5c

Please sign in to comment.