Skip to content

Commit

Permalink
Merge pull request #34 from KyberNetwork/update_backfill_oneinch
Browse files Browse the repository at this point in the history
update backfill oneinch
  • Loading branch information
ngocthanh1389 authored Apr 2, 2024
2 parents 6884c5c + dd39077 commit 94c3a34
Show file tree
Hide file tree
Showing 20 changed files with 3,300 additions and 239 deletions.
7 changes: 6 additions & 1 deletion cmd/tradelogs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/KyberNetwork/tradelogs/pkg/tracecall"

libapp "github.com/KyberNetwork/tradelogs/internal/app"
"github.com/KyberNetwork/tradelogs/internal/bigquery"
"github.com/KyberNetwork/tradelogs/internal/dbutil"
backfill "github.com/KyberNetwork/tradelogs/internal/server/backfill"
tradelogs "github.com/KyberNetwork/tradelogs/internal/server/tradelogs"
Expand Down Expand Up @@ -109,8 +110,12 @@ func run(c *cli.Context) error {
l.Errorw("Error while init worker")
return err
}
bigQueryWorker, err := bigquery.NewWorker(libapp.BigqueryProjectIDFFromCli(c), s, parsers)
if err != nil {
l.Errorw("error when init big query worker", "error", err)
}

httpBackfill := backfill.New(c.String(libapp.HTTPBackfillServerFlag.Name),
httpBackfill := backfill.New(c.String(libapp.HTTPBackfillServerFlag.Name), bigQueryWorker,
backfill.NewDuneWoker(dune.NewClient(c.String(libapp.DuneURLFlag.Name), c.String(libapp.DuneKeyFlag.Name), httpClient), s),
parsers)
go func() {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ require (
github.com/apache/arrow/go/v12 v12.0.0 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/bits-and-blooms/bitset v1.13.0 // indirect
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 // indirect
github.com/bytedance/sonic v1.10.2 // indirect
github.com/cespare/cp v1.1.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ github.com/bits-and-blooms/bitset v1.13.0 h1:bAQ9OPNFYbGHV6Nez0tmNI0RiEu7/hxlYJR
github.com/bits-and-blooms/bitset v1.13.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/btcsuite/btcd/btcec/v2 v2.3.2 h1:5n0X6hX0Zk+6omWcihdYvdAlGf2DfasC0GMf7DClJ3U=
github.com/btcsuite/btcd/btcec/v2 v2.3.2/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 h1:KdUfX2zKommPRa+PD0sWZUyXe9w277ABlgELO7H04IM=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.10.0-rc/go.mod h1:ElCzW+ufi8qKqNW0FY314xriJhyJhuoJ3gFZdAHF7NM=
github.com/bytedance/sonic v1.10.2 h1:GQebETVBxYB7JGWJtLBi07OVzWwt+8dWA00gEVW2ZFE=
Expand Down
17 changes: 9 additions & 8 deletions internal/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"

"cloud.google.com/go/bigquery"
Expand Down Expand Up @@ -36,12 +37,12 @@ type Worker struct {
client *bigquery.Client
storage *storage.Storage
parserTopicMap map[string]parser.Parser
parserNameMap map[string]parser.Parser
parserNameMap []parser.Parser
state WorkerState
}

func NewWorker(
projectID string, storage *storage.Storage, parserNameMap map[string]parser.Parser,
projectID string, storage *storage.Storage, parsers []parser.Parser,
) (*Worker, error) {
client, err := bigquery.NewClient(
context.Background(),
Expand All @@ -52,7 +53,7 @@ func NewWorker(
}

parserTopicMap := make(map[string]parser.Parser)
for _, ps := range parserNameMap {
for _, ps := range parsers {
for _, topic := range ps.Topics() {
parserTopicMap[topic] = ps
}
Expand All @@ -63,7 +64,7 @@ func NewWorker(
client: client,
storage: storage,
parserTopicMap: parserTopicMap,
parserNameMap: parserNameMap,
parserNameMap: parsers,
state: stateStopped,
}, nil
}
Expand Down Expand Up @@ -219,11 +220,11 @@ func (w *Worker) topicsFromExchanges(exchanges []string) ([]string, error) {
}

for _, ex := range exchanges {
parser, ok := w.parserNameMap[ex]
if !ok {
return nil, fmt.Errorf("%w: %s", ErrNoParser, ex)
for _, ps := range w.parserNameMap {
if strings.EqualFold(ps.Exchange(), ex) {
topics = append(topics, ps.Topics()...)
}
}
topics = append(topics, parser.Topics()...)
}
return topics, nil
}
Expand Down
228 changes: 160 additions & 68 deletions internal/server/backfill/dune_worker.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package server

import (
"encoding/json"
"time"

"github.com/KyberNetwork/tradelogs/pkg/dune"
"github.com/KyberNetwork/tradelogs/pkg/parser"
"github.com/KyberNetwork/tradelogs/pkg/storage"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
Expand All @@ -13,8 +13,10 @@ import (
)

const (
limit = 100
retry = 5
limit = 100
retry = 5
OneInchV5EventHash = "0xc3b639f02b125bfa160e50739b8c44eb2d1b6908e2b6d5925c6d770f2ca78127"
OneInchV6EventHash = "0xfec331350fce78ba658e082a71da20ac9f8d798a99b3c79681c8440cbfe77e07"
)

type DuneWorker struct {
Expand All @@ -29,75 +31,74 @@ func NewDuneWoker(c *dune.Client, s *storage.Storage) *DuneWorker {
}
}

func (d *DuneWorker) backfill(l *zap.SugaredLogger,
queryID int64, from, to uint64, ps parser.Parser, topic string) error {
l.Infow("start backfill data", "query", queryID, "topic", topic, "from", from, "to", to)
queryRes, err := d.client.ExecuteQuery(queryID, topic, from, to)
if err != nil {
return err
}
// func (d *DuneWorker) backfill(l *zap.SugaredLogger,
// queryID int64, from, to uint64, ps parser.Parser, topic string) error {
// l.Infow("start backfill data", "query", queryID, "topic", topic, "from", from, "to", to)
// queryRes, err := d.client.ExecuteQuery(queryID, topic, from, to)
// if err != nil {
// return err
// }

l.Infow("executing query", "data", queryRes)
errCount := 0
for {
time.Sleep(5 * time.Second)
state, err := d.client.ExecuteState(queryRes.ExecutionID)
if err != nil {
l.Errorw("error when get state", "error", err)
errCount++
if errCount > retry {
return err
}
continue
}
l.Infow("execute query", "state", state)
if state.IsExecutionFinished {
break
}
}
l.Infow("executed query")
// l.Infow("executing query", "data", queryRes)
// errCount := 0
// for {
// time.Sleep(5 * time.Second)
// state, err := d.client.ExecuteState(queryRes.ExecutionID)
// if err != nil {
// l.Errorw("error when get state", "error", err)
// errCount++
// if errCount > retry {
// return err
// }
// continue
// }
// l.Infow("execute query", "state", state)
// if state.IsExecutionFinished {
// break
// }
// }
// l.Infow("executed query")

var (
progress uint64 = 0
data []storage.TradeLog
)
errCount = 0
// var progress uint64 = 0
// errCount = 0

for {
time.Sleep(5 * time.Second)
logs, rowCount, err := d.client.GetLastestExecuteResult(queryID, limit, progress)
if err != nil {
l.Errorw("error when collect data", "error", err)
errCount++
if errCount > retry {
return err
}
continue
}
// for {
// data := []storage.TradeLog{}

l.Infow("collect data", "progress", progress, "len", len(logs), "total", rowCount)
for _, l := range logs {
ts, err := time.Parse("2006-01-02 15:04:05.999 UTC", l.BlockTime)
if err != nil {
return err
}
parsedLog, err := ps.Parse(DuneLogToETHLog(l), uint64(ts.Unix()))
if err != nil {
return err
}
data = append(data, parsedLog)
}
l.Infow("parsed data", "len", len(data))
if err = d.s.Insert(data); err != nil {
return err
}
progress += limit + 1
if progress >= rowCount {
break
}
}
return nil
}
// time.Sleep(5 * time.Second)
// logs, rowCount, err := d.client.GetLastestExecuteResult(queryID, limit, progress)
// if err != nil {
// l.Errorw("error when collect data", "error", err)
// errCount++
// if errCount > retry {
// return err
// }
// continue
// }

// l.Infow("collect data", "progress", progress, "len", len(logs), "total", rowCount)
// for _, l := range logs {
// ts, err := time.Parse("2006-01-02 15:04:05.999 UTC", l.BlockTime)
// if err != nil {
// return err
// }
// parsedLog, err := ps.Parse(DuneLogToETHLog(l), uint64(ts.Unix()))
// if err != nil {
// return err
// }
// data = append(data, parsedLog)
// }
// l.Infow("parsed data", "len", len(data))
// if err = d.s.Insert(data); err != nil {
// return err
// }
// progress += limit + 1
// if progress >= rowCount {
// break
// }
// }
// return nil
// }

func DuneLogToETHLog(log dune.DuneLog) types.Log {
data, err := hexutil.Decode(log.Data)
Expand Down Expand Up @@ -128,3 +129,94 @@ func DuneLogToETHLog(log dune.DuneLog) types.Log {
Index: log.Index,
}
}

func (d *DuneWorker) backfillOneInch(l *zap.SugaredLogger, queryID int64, version string) error {
var progress uint64 = 0
errCount := 0
for {
data := []storage.TradeLog{}
logs := []dune.OneInchDuneLog{}
time.Sleep(time.Second)
rowCount, err := d.client.GetLastestExecuteResult(queryID, limit, progress, &logs)
if err != nil {
l.Errorw("error when collect data", "error", err)
errCount++
if errCount > retry {
return err
}
continue
}

l.Infow("collected data", "progress", progress, "len", len(logs), "total", rowCount)
for _, l := range logs {
parse, err := OneInchDuneLogToTrade(l, version)
if err != nil {
return err
}
data = append(data, parse)
}
if err = d.s.Insert(data); err != nil {
return err
}
progress += limit + 1
if progress >= rowCount {
break
}
}
return nil
}

type OneInchOrderV6 struct {
Maker storage.BigInt `json:"maker"`
MakerAsset storage.BigInt `json:"makerAsset"`
TakerAsset storage.BigInt `json:"takerAsset"`
}

type OneInchOrderV5 struct {
Maker string `json:"maker"`
MakerAsset string `json:"makerAsset"`
TakerAsset string `json:"takerAsset"`
}

func OneInchDuneLogToTrade(l dune.OneInchDuneLog, version string) (storage.TradeLog, error) {
ts, err := time.Parse("2006-01-02 15:04:05.999 UTC", l.BlockTime)
if err != nil {
return storage.TradeLog{}, err
}
maker, makerAsset, takerAsset, eventHash := "", "", "", ""
switch version {
case "v5":
order := OneInchOrderV5{}
if err := json.Unmarshal([]byte(l.Order), &order); err != nil {
return storage.TradeLog{}, err
}
maker = order.Maker
makerAsset = order.MakerAsset
takerAsset = order.TakerAsset
eventHash = OneInchV5EventHash
default:
order := OneInchOrderV6{}
if err := json.Unmarshal([]byte(l.Order), &order); err != nil {
return storage.TradeLog{}, err
}
maker = order.Maker.Hex()
makerAsset = order.MakerAsset.Hex()
takerAsset = order.TakerAsset.Hex()
eventHash = OneInchV6EventHash
}

return storage.TradeLog{
OrderHash: l.Output2,
Maker: maker,
MakerToken: makerAsset,
TakerToken: takerAsset,
MakerTokenAmount: l.Output0,
TakerTokenAmount: l.Output1,
ContractAddress: l.ContractAddress,
BlockNumber: l.BlockNumber,
TxHash: l.TxHash,
LogIndex: l.EventIndex,
Timestamp: uint64(ts.UnixMilli()),
EventHash: eventHash,
}, nil
}
Loading

0 comments on commit 94c3a34

Please sign in to comment.