diff --git a/Dockerfile-v2 b/Dockerfile-v2 index 1fe1fa4..334d2a7 100644 --- a/Dockerfile-v2 +++ b/Dockerfile-v2 @@ -8,6 +8,8 @@ COPY . . RUN go build -o parse_log ./v2/cmd/parse_log RUN go build -o backfill ./v2/cmd/backfill RUN go build -o broadcast ./v2/cmd/broadcast +RUN go build -o price_filler ./v2/cmd/price_filler +RUN go build -o tradelogs ./v2/cmd/tradelogs ## DEPLOY @@ -23,6 +25,8 @@ WORKDIR /v2 COPY --from=builder /src/parse_log /v2/parse_log COPY --from=builder /src/backfill /v2/backfill COPY --from=builder /src/broadcast /v2/broadcast +COPY --from=builder /src/price_filler /v2/price_filler +COPY --from=builder /src/tradelogs /v2/tradelogs COPY v2/cmd/migrations /v2/migrations diff --git a/v2/cmd/migrations/00004_update_price_column.up.sql b/v2/cmd/migrations/00004_update_price_column.up.sql new file mode 100644 index 0000000..5f49339 --- /dev/null +++ b/v2/cmd/migrations/00004_update_price_column.up.sql @@ -0,0 +1,25 @@ +alter table tradelogs_zerox + alter column maker_token_price drop not null; + +alter table tradelogs_zerox + alter column maker_token_price drop default; + +alter table tradelogs_zerox + alter column taker_token_price drop not null; + +alter table tradelogs_zerox + alter column taker_token_price drop default; + +alter table tradelogs_zerox + alter column maker_usd_amount drop not null; + +alter table tradelogs_zerox + alter column maker_usd_amount drop default; + +alter table tradelogs_zerox + alter column taker_usd_amount drop not null; + +alter table tradelogs_zerox + alter column taker_usd_amount drop default; + +alter table tradelogs_zerox drop column state; diff --git a/v2/cmd/price_filler/main.go b/v2/cmd/price_filler/main.go new file mode 100644 index 0000000..729cd56 --- /dev/null +++ b/v2/cmd/price_filler/main.go @@ -0,0 +1,83 @@ +package main + +import ( + "fmt" + "log" + "os" + + "github.com/KyberNetwork/go-binance/v2" + libapp "github.com/KyberNetwork/tradelogs/v2/pkg/app" + "github.com/KyberNetwork/tradelogs/v2/pkg/price_filler" + storageTypes "github.com/KyberNetwork/tradelogs/v2/pkg/storage/tradelogs/types" + zxotcStorage "github.com/KyberNetwork/tradelogs/v2/pkg/storage/tradelogs/zxotc" + "github.com/KyberNetwork/tradinglib/pkg/dbutil" + "github.com/jmoiron/sqlx" + "github.com/urfave/cli" + "go.uber.org/zap" +) + +func main() { + app := libapp.NewApp() + app.Name = "trade logs crawler service" + app.Action = run + app.Flags = append(app.Flags, libapp.PostgresSQLFlags("tradelogs_v2")...) + app.Flags = append(app.Flags, libapp.PriceFillerFlags()...) + + if err := app.Run(os.Args); err != nil { + log.Panic(err) + } +} + +func run(c *cli.Context) error { + logger, _, flush, err := libapp.NewLogger(c) + if err != nil { + return fmt.Errorf("new logger: %w", err) + } + + defer flush() + + zap.ReplaceGlobals(logger) + l := logger.Sugar() + l.Infow("Starting price filler service") + + db, err := initDB(c) + l.Infow("init db successfully") + if err != nil { + return fmt.Errorf("cannot init DB: %w", err) + } + + // trade log storages + s := []storageTypes.Storage{ + zxotcStorage.New(l, db), + } + + binanceClient := binance.NewClient(c.String(libapp.BinanceAPIKeyFlag.Name), c.String(libapp.BinanceSecretKeyFlag.Name)) + priceFiller, err := pricefiller.NewPriceFiller(l, binanceClient, s) + if err != nil { + l.Errorw("Error while init price filler") + return err + } + priceFiller.Run() + return nil +} + +func initDB(c *cli.Context) (*sqlx.DB, error) { + db, err := libapp.NewDB(map[string]interface{}{ + "host": c.String(libapp.PostgresHost.Name), + "port": c.Int(libapp.PostgresPort.Name), + "user": c.String(libapp.PostgresUser.Name), + "password": c.String(libapp.PostgresPassword.Name), + "dbname": c.String(libapp.PostgresDatabase.Name), + "sslmode": "disable", + }) + if err != nil { + return nil, err + } + + _, err = dbutil.RunMigrationUp(db.DB, c.String(libapp.PostgresMigrationPath.Name), + c.String(libapp.PostgresDatabase.Name)) + if err != nil { + return nil, err + } + return db, nil +} diff --git a/v2/cmd/tradelogs/main.go b/v2/cmd/tradelogs/main.go new file mode 100644 index 0000000..5e99704 --- /dev/null +++ b/v2/cmd/tradelogs/main.go @@ -0,0 +1,76 @@ +package main + +import ( + "fmt" + "log" + "os" + + "github.com/KyberNetwork/tradelogs/v2/internal/server" + libapp "github.com/KyberNetwork/tradelogs/v2/pkg/app" + storageTypes "github.com/KyberNetwork/tradelogs/v2/pkg/storage/tradelogs/types" + zxotcStorage "github.com/KyberNetwork/tradelogs/v2/pkg/storage/tradelogs/zxotc" + "github.com/KyberNetwork/tradinglib/pkg/dbutil" + "github.com/jmoiron/sqlx" + "github.com/urfave/cli" + "go.uber.org/zap" +) + +func main() { + app := libapp.NewApp() + app.Name = "trade logs crawler service" + app.Action = run + app.Flags = append(app.Flags, libapp.PostgresSQLFlags("tradelogs_v2")...) + app.Flags = append(app.Flags, libapp.HTTPServerFlags()...) + + if err := app.Run(os.Args); err != nil { + log.Panic(err) + } +} + +func run(c *cli.Context) error { + logger, _, flush, err := libapp.NewLogger(c) + if err != nil { + return fmt.Errorf("new logger: %w", err) + } + + defer flush() + + zap.ReplaceGlobals(logger) + l := logger.Sugar() + l.Infow("Starting trade logs server") + + db, err := initDB(c) + l.Infow("init db successfully") + if err != nil { + return fmt.Errorf("cannot init DB: %w", err) + } + + // trade log storages + storage := []storageTypes.Storage{ + zxotcStorage.New(l, db), + } + + s := server.NewTradeLogs(l, storage, c.String(libapp.HTTPTradeLogsServerFlag.Name)) + return s.Run() +} + +func initDB(c *cli.Context) (*sqlx.DB, error) { + db, err := libapp.NewDB(map[string]interface{}{ + "host": c.String(libapp.PostgresHost.Name), + "port": c.Int(libapp.PostgresPort.Name), + "user": c.String(libapp.PostgresUser.Name), + "password": c.String(libapp.PostgresPassword.Name), + "dbname": c.String(libapp.PostgresDatabase.Name), + "sslmode": "disable", + }) + if err != nil { + return nil, err + } + + _, err = dbutil.RunMigrationUp(db.DB, c.String(libapp.PostgresMigrationPath.Name), + c.String(libapp.PostgresDatabase.Name)) + if err != nil { + return nil, err + } + return db, nil +} diff --git a/v2/internal/server/tradelogs.go b/v2/internal/server/tradelogs.go new file mode 100644 index 0000000..2294022 --- /dev/null +++ b/v2/internal/server/tradelogs.go @@ -0,0 +1,91 @@ +package server + +import ( + "fmt" + "net/http" + "time" + + storageTypes "github.com/KyberNetwork/tradelogs/v2/pkg/storage/tradelogs/types" + "github.com/gin-contrib/pprof" + "github.com/gin-gonic/gin" + "go.uber.org/zap" +) + +var ( + maxTimeRange = uint64(7 * 24 * time.Hour.Milliseconds()) +) + +type TradeLogs struct { + r *gin.Engine + bindAddr string + l *zap.SugaredLogger + storage []storageTypes.Storage +} + +func NewTradeLogs(l *zap.SugaredLogger, s []storageTypes.Storage, bindAddr string) *TradeLogs { + engine := gin.New() + engine.Use(gin.Recovery()) + + server := &TradeLogs{ + r: engine, + bindAddr: bindAddr, + l: l, + storage: s, + } + + gin.SetMode(gin.ReleaseMode) + server.register() + + return server +} + +// Run runs server. +func (s *TradeLogs) Run() error { + if err := s.r.Run(s.bindAddr); err != nil { + return fmt.Errorf("run server: %w", err) + } + + return nil +} + +func (s *TradeLogs) register() { + pprof.Register(s.r, "/debug") + s.r.GET("/tradelogs", s.getTradeLogs) +} + +func (s *TradeLogs) getTradeLogs(c *gin.Context) { + var ( + query storageTypes.TradeLogsQuery + err = c.ShouldBind(&query) + ) + + if err != nil { + responseErr(c, http.StatusBadRequest, err) + return + } + + if query.ToTime < query.FromTime { + responseErr(c, http.StatusBadRequest, fmt.Errorf("to_time cannot smaller than from_time")) + return + } + + if query.ToTime-query.FromTime > maxTimeRange { + responseErr(c, http.StatusBadRequest, fmt.Errorf("max time range: %v", maxTimeRange)) + return + } + + var data []storageTypes.TradeLog + for _, storage := range s.storage { + tradeLogs, err := storage.Get(query) + if err != nil { + responseErr(c, http.StatusBadRequest, err) + return + } + data = append(data, tradeLogs...) + } + + c.JSON(http.StatusOK, gin.H{ + "success": true, + "data": data, + }) +} diff --git a/v2/mocks/Storage.go b/v2/mocks/Storage.go index 3d0dbdc..9d88575 100644 --- a/v2/mocks/Storage.go +++ b/v2/mocks/Storage.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.45.0. DO NOT EDIT. +// Code generated by mockery v2.46.2. DO NOT EDIT. package mocks @@ -30,6 +30,24 @@ func (_m *MockStorage) Delete(blocks []uint64) error { return r0 } +// Exchange provides a mock function with given fields: +func (_m *MockStorage) Exchange() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Exchange") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + // Get provides a mock function with given fields: query func (_m *MockStorage) Get(query types.TradeLogsQuery) ([]types.TradeLog, error) { ret := _m.Called(query) @@ -60,37 +78,49 @@ func (_m *MockStorage) Get(query types.TradeLogsQuery) ([]types.TradeLog, error) return r0, r1 } -// Insert provides a mock function with given fields: orders -func (_m *MockStorage) Insert(orders []types.TradeLog) error { - ret := _m.Called(orders) +// GetEmptyPrice provides a mock function with given fields: limit +func (_m *MockStorage) GetEmptyPrice(limit uint64) ([]types.TradeLog, error) { + ret := _m.Called(limit) if len(ret) == 0 { - panic("no return value specified for Insert") + panic("no return value specified for GetEmptyPrice") } - var r0 error - if rf, ok := ret.Get(0).(func([]types.TradeLog) error); ok { - r0 = rf(orders) + var r0 []types.TradeLog + var r1 error + if rf, ok := ret.Get(0).(func(uint64) ([]types.TradeLog, error)); ok { + return rf(limit) + } + if rf, ok := ret.Get(0).(func(uint64) []types.TradeLog); ok { + r0 = rf(limit) } else { - r0 = ret.Error(0) + if ret.Get(0) != nil { + r0 = ret.Get(0).([]types.TradeLog) + } } - return r0 + if rf, ok := ret.Get(1).(func(uint64) error); ok { + r1 = rf(limit) + } else { + r1 = ret.Error(1) + } + + return r0, r1 } -// Type provides a mock function with given fields: -func (_m *MockStorage) Exchange() string { - ret := _m.Called() +// Insert provides a mock function with given fields: orders +func (_m *MockStorage) Insert(orders []types.TradeLog) error { + ret := _m.Called(orders) if len(ret) == 0 { - panic("no return value specified for Exchange") + panic("no return value specified for Insert") } - var r0 string - if rf, ok := ret.Get(0).(func() string); ok { - r0 = rf() + var r0 error + if rf, ok := ret.Get(0).(func([]types.TradeLog) error); ok { + r0 = rf(orders) } else { - r0 = ret.Get(0).(string) + r0 = ret.Error(0) } return r0 diff --git a/v2/pkg/app/server.go b/v2/pkg/app/server.go index 8a537ab..0733cce 100644 --- a/v2/pkg/app/server.go +++ b/v2/pkg/app/server.go @@ -15,11 +15,18 @@ var ( EnvVar: "BROADCAST_SERVER_ADDRESS", Value: "localhost:8082", } + HTTPTradeLogsServerFlag = cli.StringFlag{ + Name: "tradelogs-server-address", + Usage: "Run the rest for tradelogs server", + EnvVar: "TRADELOGS_SERVER_ADDRESS", + Value: "localhost:8080", + } ) func HTTPServerFlags() []cli.Flag { return []cli.Flag{ HTTPBackfillServerFlag, HTTPBroadcastServerFlag, + HTTPTradeLogsServerFlag, } } diff --git a/v2/pkg/price_filler/ks_client.go b/v2/pkg/price_filler/ks_client.go new file mode 100644 index 0000000..38dba3c --- /dev/null +++ b/v2/pkg/price_filler/ks_client.go @@ -0,0 +1,146 @@ +package pricefiller + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" +) + +const ksSettingUrl = "https://ks-setting.kyberswap.com/api/v1" + +type KsClient struct { + client *http.Client + baseURL string +} + +func NewKsClient() *KsClient { + return &KsClient{ + client: &http.Client{}, + baseURL: ksSettingUrl, + } +} + +func (c *KsClient) DoRequest(ctx context.Context, method, path string, jsonData interface{}, out interface{}) error { + req, err := createRequest(ctx, method, path, jsonData) + if err != nil { + return err + } + + resp, err := c.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + bb, err := readResponse(resp.Body, out) + if err != nil { + return fmt.Errorf("readResponse error: %w, data: %s", err, string(bb)) + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("server return %d - %v", resp.StatusCode, string(bb)) + } + + return nil +} + +func createRequest(ctx context.Context, method, url string, jsonData interface{}) (*http.Request, error) { + var buf io.Reader + if jsonData != nil { + body, err := json.Marshal(jsonData) + if err != nil { + return nil, err + } + buf = bytes.NewBuffer(body) + } + req, err := http.NewRequestWithContext(ctx, method, url, buf) + if err != nil { + return nil, err + } + if jsonData != nil { + req.Header.Set("Content-Type", "application/json") + } + return req, nil +} + +func readResponse(data io.Reader, dataField interface{}) ([]byte, error) { + if dataField == nil { + return nil, fmt.Errorf("nil data") + } + bb, err := io.ReadAll(data) + if err != nil { + return nil, err + } + return bb, json.Unmarshal(bb, dataField) +} + +type TokenCatalogResp struct { + Code int64 `json:"code"` + Message string `json:"message"` + Data struct { + Tokens []TokenCatalog `json:"tokens"` + } +} + +type TokenCatalog struct { + Decimals int64 `json:"decimals"` +} + +func (c *KsClient) GetTokenCatalog(address string) (TokenCatalogResp, error) { + var resp TokenCatalogResp + err := c.DoRequest(context.Background(), http.MethodGet, + fmt.Sprintf("%s/tokens?chainIds=%d&query=%s", c.baseURL, NetworkETHChanID, address), + nil, &resp) + if err != nil { + return TokenCatalogResp{}, err + } + + if resp.Code != 0 { + return TokenCatalogResp{}, fmt.Errorf("invalid response code: %d", resp.Code) + } + + return resp, nil +} + +type ImportedToken struct { + ChainID string `json:"chainId"` + Address string `json:"address"` +} + +type ImportTokenParam struct { + Tokens []ImportedToken `json:"tokens"` +} + +type ImportTokenResp struct { + Code int64 `json:"code"` + Message string `json:"message"` + Data struct { + Tokens []struct { + Data TokenCatalog `json:"data"` + } `json:"tokens"` + } `json:"data"` +} + +func (c *KsClient) ImportToken(chainID, address string) (ImportTokenResp, error) { + param := ImportTokenParam{ + Tokens: []ImportedToken{ + { + ChainID: chainID, + Address: address, + }, + }, + } + var resp ImportTokenResp + err := c.DoRequest(context.Background(), http.MethodPost, c.baseURL+"/tokens/import", param, &resp) + if err != nil { + return ImportTokenResp{}, err + } + + if resp.Code != 0 { + return ImportTokenResp{}, fmt.Errorf("invalid response code: %d", resp.Code) + } + + return resp, nil +} diff --git a/v2/pkg/price_filler/price_fillter.go b/v2/pkg/price_filler/price_fillter.go new file mode 100644 index 0000000..4ed2593 --- /dev/null +++ b/v2/pkg/price_filler/price_fillter.go @@ -0,0 +1,281 @@ +package pricefiller + +import ( + "context" + "errors" + "strconv" + "strings" + "sync" + "time" + + "github.com/KyberNetwork/go-binance/v2" + storageTypes "github.com/KyberNetwork/tradelogs/v2/pkg/storage/tradelogs/types" + "go.uber.org/zap" +) + +const ( + NetworkETHChanID = 1 + NetworkETHChanIDString = "1" + NetworkETH = "ETH" + updateAllCoinInfoInterval = 12 * time.Hour + backfillTradeLogsPriceInterval = 10 * time.Minute + backfillTradeLogsLimit = 60 + addressETH1 = "0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee" + addressETH2 = "0x0000000000000000000000000000000000000000" + coinUSDT = "USDT" + invalidSymbolErrString = " code=-1121, msg=Invalid symbol." +) + +var ( + ErrNoPrice = errors.New("no price from binance") + ErrWeirdTokenCatalogResp = errors.New("weird token catalog response") + + mappedMultiplier = map[string]float64{ + "1MBABYDOGE": 1e-6, + "1000SATS": 1e-3, + } +) + +type CoinInfo struct { + Coin string + Network string + ContractAddress string + Decimals int64 +} + +type PriceFiller struct { + l *zap.SugaredLogger + s []storageTypes.Storage + mu sync.Mutex + ksClient *KsClient + binanceClient *binance.Client + mappedCoinInfo map[string]CoinInfo // address - coinInfo +} + +func NewPriceFiller(l *zap.SugaredLogger, binanceClient *binance.Client, + s []storageTypes.Storage) (*PriceFiller, error) { + p := &PriceFiller{ + l: l, + s: s, + ksClient: NewKsClient(), + binanceClient: binanceClient, + mappedCoinInfo: map[string]CoinInfo{ + addressETH1: { + Coin: "ETH", + Network: NetworkETH, + ContractAddress: addressETH1, + Decimals: 18, + }, + addressETH2: { + Coin: "ETH", + Network: NetworkETH, + ContractAddress: addressETH2, + Decimals: 18, + }, + }, + } + + if err := p.updateAllCoinInfo(); err != nil { + return nil, err + } + return p, nil +} + +func (p *PriceFiller) Run() { + go p.runUpdateAllCoinInfoRoutine() + p.runBackFillTradelogPriceRoutine() +} + +func (p *PriceFiller) getPrice(token string, timestamp int64) (float64, error) { + candles, err := p.binanceClient.NewKlinesService().Symbol(withAlias(token) + "USDT"). + Interval("1s").StartTime(timestamp).EndTime(timestamp).Do(context.Background()) + if err != nil { + return 0, err + } + if len(candles) == 0 { + return 0, ErrNoPrice + } + low, err := strconv.ParseFloat(candles[0].Low, 64) + if err != nil { + return 0, err + } + high, err := strconv.ParseFloat(candles[0].High, 64) + if err != nil { + return 0, err + } + multiplier := 1.0 + if m, ok := mappedMultiplier[token]; ok { + multiplier = m + } + + return multiplier * (low + high) / 2, nil +} + +func (p *PriceFiller) updateAllCoinInfo() error { + resp, err := p.binanceClient.NewAllCoinService().Do(context.Background()) + if err != nil { + p.l.Errorw("Failed to get all coins info", "err", err) + return err + } + + p.mu.Lock() + defer p.mu.Unlock() + for _, coinInfo := range resp { + for _, network := range coinInfo.NetworkList { + if network.Network == NetworkETH && network.ContractAddress != "" { + address := strings.ToLower(network.ContractAddress) + if _, ok := p.mappedCoinInfo[address]; !ok { + p.mappedCoinInfo[address] = CoinInfo{ + Coin: network.Coin, + Network: network.Network, + ContractAddress: address, + } + } + break + } + } + } + + p.l.Infow("New mapped coin info", "data", p.mappedCoinInfo) + return nil +} + +func (p *PriceFiller) runUpdateAllCoinInfoRoutine() { + ticker := time.NewTicker(updateAllCoinInfoInterval) + defer ticker.Stop() + + for range ticker.C { + if err := p.updateAllCoinInfo(); err != nil { + p.l.Errorw("Failed to updateAllCoinInfo", "err", err) + } + } +} + +func (p *PriceFiller) runBackFillTradelogPriceRoutine() { + ticker := time.NewTicker(backfillTradeLogsPriceInterval) + defer ticker.Stop() + + for ; ; <-ticker.C { + for _, s := range p.s { + tradeLogs, err := s.GetEmptyPrice(backfillTradeLogsLimit) + if err != nil { + p.l.Errorw("Failed to get tradeLogs", "exchange", s.Exchange(), "err", err) + continue + } + + p.FullFillTradeLogs(tradeLogs) + if err = s.Insert(tradeLogs); err != nil { + p.l.Errorw("Failed to insert tradeLogs", "exchange", s.Exchange(), "err", err) + continue + } + + p.l.Infow("backfill tradelog price successfully", "exchange", s.Exchange(), "number", len(tradeLogs)) + } + } +} + +func (p *PriceFiller) fullFillTradeLog(tradeLog storageTypes.TradeLog) (storageTypes.TradeLog, error) { + makerPrice, makerUsdAmount, err := p.getPriceAndAmountUsd(strings.ToLower(tradeLog.MakerToken), + tradeLog.MakerTokenAmount, int64(tradeLog.Timestamp)) + if err != nil { + if err.Error() != invalidSymbolErrString { + p.l.Errorw("Failed to getPriceAndAmountUsd for maker", "err", err) + return tradeLog, err + } + } + + tradeLog.MakerTokenPrice = &makerPrice + tradeLog.MakerUsdAmount = &makerUsdAmount + + takerPrice, takerUsdAmount, err := p.getPriceAndAmountUsd(strings.ToLower(tradeLog.TakerToken), + tradeLog.TakerTokenAmount, int64(tradeLog.Timestamp)) + if err != nil { + if err.Error() != invalidSymbolErrString { + p.l.Errorw("Failed to getPriceAndAmountUsd for taker", "err", err) + return tradeLog, err + } + } + + tradeLog.TakerTokenPrice = &takerPrice + tradeLog.TakerUsdAmount = &takerUsdAmount + + return tradeLog, nil +} + +func (p *PriceFiller) getPriceAndAmountUsd(address, rawAmt string, at int64) (float64, float64, error) { + p.mu.Lock() + coin, ok := p.mappedCoinInfo[address] + p.mu.Unlock() + if ok { + if coin.Decimals == 0 { + d, err := p.getDecimals(address) + if err != nil { + if errors.Is(err, ErrWeirdTokenCatalogResp) { + return 0, 0, nil + } + p.l.Errorw("Failed to getDecimals", "err", err, "address", address) + return 0, 0, err + } + coin.Decimals = d + p.mu.Lock() + p.mappedCoinInfo[address] = coin + p.mu.Unlock() + } + + if coin.Coin == coinUSDT { + return 1, calculateAmountUsd(rawAmt, coin.Decimals, 1), nil + } + price, err := p.getPrice(coin.Coin, int64(at)) + if err != nil { + if !errors.Is(err, ErrNoPrice) { + p.l.Errorw("Failed to getPrice", "err", err, "coin", coin.Coin, "at", at) + return 0, 0, err + } + } + + return price, calculateAmountUsd(rawAmt, coin.Decimals, price), nil + } + return 0, 0, nil +} + +func (p *PriceFiller) FullFillTradeLogs(tradeLogs []storageTypes.TradeLog) { + for idx, tradeLog := range tradeLogs { + // for the safety, sleep a bit to avoid Binance rate limit + time.Sleep(10 * time.Millisecond) + filledTradeLog, err := p.fullFillTradeLog(tradeLog) + if err != nil { + p.l.Errorw("Failed to fullFillTradeLog", "err", err, "tradeLog", tradeLog) + continue + } + tradeLogs[idx] = filledTradeLog + } +} + +func (p *PriceFiller) getDecimals(address string) (int64, error) { + resp, err := p.ksClient.GetTokenCatalog(address) + if err != nil { + p.l.Errorw("Failed to GetTokenCatalog", "err", err) + return 0, err + } + + if len(resp.Data.Tokens) == 1 { + return resp.Data.Tokens[0].Decimals, nil + } + if len(resp.Data.Tokens) > 1 { + p.l.Warnw("Weird token catalog response", "resp", resp) + return 0, ErrWeirdTokenCatalogResp + } + + // try to import token if token is not found. + newResp, err := p.ksClient.ImportToken(NetworkETHChanIDString, address) + if err != nil { + p.l.Errorw("Failed to ImportToken", "err", err) + return 0, err + } + if len(newResp.Data.Tokens) == 1 { + return newResp.Data.Tokens[0].Data.Decimals, nil + } + + p.l.Warnw("Weird ImportToken response", "resp", newResp) + return 0, ErrWeirdTokenCatalogResp +} diff --git a/v2/pkg/price_filler/utils.go b/v2/pkg/price_filler/utils.go new file mode 100644 index 0000000..a2311c4 --- /dev/null +++ b/v2/pkg/price_filler/utils.go @@ -0,0 +1,29 @@ +package pricefiller + +import ( + "math/big" + + "github.com/KyberNetwork/tradelogs/pkg/convert" +) + +var aliasCoinMap = map[string]string{ + "WETH": "ETH", + "STETH": "ETH", +} + +func withAlias(coin string) string { + if s, ok := aliasCoinMap[coin]; ok { + return s + } + return coin +} + +// calculateAmountUsd returns raw / (10**decimals) * price +func calculateAmountUsd(raw string, decimals int64, price float64) float64 { + rawAmt, ok := new(big.Int).SetString(raw, 10) + if !ok { + return 0 + } + + return convert.WeiToFloat(rawAmt, decimals) * price +} diff --git a/v2/pkg/price_filler/utils_test.go b/v2/pkg/price_filler/utils_test.go new file mode 100644 index 0000000..a25e768 --- /dev/null +++ b/v2/pkg/price_filler/utils_test.go @@ -0,0 +1,22 @@ +package pricefiller + +import ( + "math" + "testing" + + "github.com/stretchr/testify/assert" +) + +func floatEqual(f1, f2 float64) bool { + return math.Abs(f1-f2) < 1e-9 +} + +func TestCalculateAmountUsd(t *testing.T) { + // RSR + usdAmountRSR := calculateAmountUsd("336970435721651800000000", 18, 0.003863) + assert.True(t, floatEqual(usdAmountRSR, 1301.716793192741), usdAmountRSR) + + // WETH + usdtAmountWETH := calculateAmountUsd("503568522079108960", 18, 2600) + assert.True(t, floatEqual(usdtAmountWETH, 1309.2781574056833), usdtAmountWETH) +} diff --git a/v2/pkg/storage/tradelogs/types/storage.go b/v2/pkg/storage/tradelogs/types/storage.go index 1503e82..79043bc 100644 --- a/v2/pkg/storage/tradelogs/types/storage.go +++ b/v2/pkg/storage/tradelogs/types/storage.go @@ -3,6 +3,7 @@ package types type Storage interface { Insert(orders []TradeLog) error Get(query TradeLogsQuery) ([]TradeLog, error) + GetEmptyPrice(limit uint64) ([]TradeLog, error) Delete(blocks []uint64) error Exchange() string } diff --git a/v2/pkg/storage/tradelogs/types/trade_log.go b/v2/pkg/storage/tradelogs/types/trade_log.go index ccd4be0..c5119e0 100644 --- a/v2/pkg/storage/tradelogs/types/trade_log.go +++ b/v2/pkg/storage/tradelogs/types/trade_log.go @@ -20,28 +20,16 @@ type TradeLog struct { MessageSender string `db:"message_sender" json:"message_sender,omitempty"` InteractContract string `db:"interact_contract" json:"interact_contract,omitempty"` //MakerTraits string `db:"maker_traits" json:"maker_traits,omitempty"` - Expiry uint64 `db:"expiration_date" json:"expiration_date"` - MakerTokenPrice float64 `db:"maker_token_price" json:"maker_token_price"` - TakerTokenPrice float64 `db:"taker_token_price" json:"taker_token_price"` - MakerUsdAmount float64 `db:"maker_usd_amount" json:"maker_usd_amount"` - TakerUsdAmount float64 `db:"taker_usd_amount" json:"taker_usd_amount"` - State TradeLogState `db:"state" json:"state"` + Expiry uint64 `db:"expiration_date" json:"expiration_date"` + MakerTokenPrice *float64 `db:"maker_token_price" json:"maker_token_price"` + TakerTokenPrice *float64 `db:"taker_token_price" json:"taker_token_price"` + MakerUsdAmount *float64 `db:"maker_usd_amount" json:"maker_usd_amount"` + TakerUsdAmount *float64 `db:"taker_usd_amount" json:"taker_usd_amount"` } -type TradeLogState string - -const ( - TradeLogStateNew TradeLogState = "new" - TradeLogStateProcessed TradeLogState = "processed" -) - // CommonTradeLogSerialize used for exchanges only storing fields in common trade logs, // if these exchanges need to store extra fields, they have to use themselves serialize function func CommonTradeLogSerialize(o *TradeLog) []interface{} { - // set default state is new - if o.State == "" { - o.State = TradeLogStateNew - } return []interface{}{ o.OrderHash, strings.ToLower(o.Maker), @@ -62,7 +50,6 @@ func CommonTradeLogSerialize(o *TradeLog) []interface{} { o.TakerTokenPrice, o.MakerUsdAmount, o.TakerUsdAmount, - o.State, } } @@ -88,6 +75,5 @@ func CommonTradeLogColumns() []string { "taker_token_price", "maker_usd_amount", "taker_usd_amount", - "state", } } diff --git a/v2/pkg/storage/tradelogs/zxotc/storage.go b/v2/pkg/storage/tradelogs/zxotc/storage.go index a73c25d..0c067eb 100644 --- a/v2/pkg/storage/tradelogs/zxotc/storage.go +++ b/v2/pkg/storage/tradelogs/zxotc/storage.go @@ -62,8 +62,7 @@ func (s *Storage) Insert(orders []storageTypes.TradeLog) error { maker_token_price=excluded.maker_token_price, taker_token_price=excluded.taker_token_price, maker_usd_amount=excluded.maker_usd_amount, - taker_usd_amount=excluded.taker_usd_amount, - state=excluded.state + taker_usd_amount=excluded.taker_usd_amount `).ToSql() if err != nil { s.l.Errorw("Error build insert", "error", err) @@ -106,7 +105,25 @@ func (s *Storage) Get(query storageTypes.TradeLogsQuery) ([]storageTypes.TradeLo return nil, err } var results []storageTypes.TradeLog - if err := s.db.Select(&results, q, p...); err != nil { + if err = s.db.Select(&results, q, p...); err != nil { + return nil, err + } + for i := range results { + results[i].Exchange = s.Exchange() + } + return results, nil +} + +func (s *Storage) GetEmptyPrice(limit uint64) ([]storageTypes.TradeLog, error) { + builder := squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar). + Select(storageTypes.CommonTradeLogColumns()...). + From(s.tableName()).Where(squirrel.Eq{"maker_token_price": nil}).Limit(limit) + q, p, err := builder.ToSql() + if err != nil { + return nil, err + } + var results []storageTypes.TradeLog + if err = s.db.Select(&results, q, p...); err != nil { return nil, err } return results, nil diff --git a/v2/pkg/storage/tradelogs/zxotc/storage_test.go b/v2/pkg/storage/tradelogs/zxotc/storage_test.go index a75f3dd..f182564 100644 --- a/v2/pkg/storage/tradelogs/zxotc/storage_test.go +++ b/v2/pkg/storage/tradelogs/zxotc/storage_test.go @@ -52,6 +52,7 @@ func TestSimple(t *testing.T) { }, Result: []types.TradeLog{ { + Exchange: "zerox", Taker: "0xb94d9c1d39c41fbd61d37cb4235993f4eb4aa160", MakerToken: "0xd4df22556e07148e591b4c7b4f555a17188cf5cf", TakerToken: "0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee", @@ -62,7 +63,6 @@ func TestSimple(t *testing.T) { TxHash: "0xb61c3c802df945e215d6894a2bc3765e1175ebecdd7148dfc7aa5c9f599b9c13", LogIndex: 198, Timestamp: 1671614015000, - State: "new", MessageSender: "0x50f77c5640e07c304432af44fb4034cd51e36f6f", InteractContract: "0xdef1c0ded9bec7f1a1670819833240f027b25eff", }, @@ -76,6 +76,7 @@ func TestSimple(t *testing.T) { }, Result: []types.TradeLog{ { + Exchange: "zerox", OrderHash: "0x199cdfca0dad729ec73c03b980964568ee18b7164b1ad42dcfeee05ca789555b", Maker: "0xaf0b0000f0210d0f421f0009c72406703b50506b", Taker: "0x22f9dcf4647084d6c31b2765f6910cd85c178c18", @@ -88,7 +89,6 @@ func TestSimple(t *testing.T) { TxHash: "0x0e0ec48f90f388a31c637a61ac769b9d0facebff207cb6dc8cf4fc2dacefa55f", LogIndex: 202, Timestamp: 1671614111000, - State: "new", }, }, Success: true,