diff --git a/.gitignore b/.gitignore index 00ca469..4bf041a 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ local .run ./config.yaml +omni-balance.db +start.sh diff --git a/cmd/configs.go b/cmd/configs.go index b0759eb..47e4fc0 100644 --- a/cmd/configs.go +++ b/cmd/configs.go @@ -193,26 +193,26 @@ func CreateExampleConfig(exampleConfigPath string) error { Chains: []string{"etnereum", "arbitrum"}, }, }, - LiquidityProviders: []configs.LiquidityProvider{ + Providers: []configs.Provider{ { - Type: configs.CEX, - LiquidityName: "gate.io", + Type: configs.CEX, + Name: "gate.io", Config: map[string]interface{}{ "key": "", "secret": "", }, }, { - Type: configs.DEX, - LiquidityName: "uniswap", + Type: configs.DEX, + Name: "uniswap", }, { - Type: configs.Bridge, - LiquidityName: "helixbridge", + Type: configs.Bridge, + Name: "helixbridge", }, { - Type: configs.Bridge, - LiquidityName: "darwinia-bridge", + Type: configs.Bridge, + Name: "darwinia-bridge", }, }, Wallets: []configs.Wallet{ diff --git a/cmd/main.go b/cmd/main.go index 908a7b2..4fc8ddf 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -13,9 +13,9 @@ import ( "net/http" "net/url" "omni-balance/internal/daemons" + _ "omni-balance/internal/daemons/bot" _ "omni-balance/internal/daemons/cross_chain" - _ "omni-balance/internal/daemons/monitor" - _ "omni-balance/internal/daemons/rebalance" + _ "omni-balance/internal/daemons/market" _ "omni-balance/internal/daemons/token_price" "omni-balance/internal/db" "omni-balance/internal/models" @@ -66,6 +66,7 @@ func Usage(_ *cli.Context) error { } func Action(cli *cli.Context) error { + fmt.Println(cli.String("conf")) if err := initConfig(ctx, cli.Bool("placeholder"), cli.String("conf"), cli.String("port")); err != nil { return errors.Wrap(err, "init config") } @@ -104,6 +105,7 @@ func Action(cli *cli.Context) error { if err := daemons.Run(ctx, *config); err != nil { return errors.Wrap(err, "run daemons") } + utils.FinishInit() quit := make(chan os.Signal, 1) diff --git a/go.mod b/go.mod index 983932a..2b97426 100644 --- a/go.mod +++ b/go.mod @@ -115,6 +115,7 @@ require ( github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/rs/cors v1.7.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/satori/go.uuid v1.2.0 // indirect github.com/shirou/gopsutil v3.21.11+incompatible // indirect github.com/status-im/keycard-go v0.2.0 // indirect github.com/supranational/blst v0.3.11 // indirect diff --git a/go.sum b/go.sum index 7707ca0..323aef5 100644 --- a/go.sum +++ b/go.sum @@ -273,6 +273,8 @@ github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= +github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= diff --git a/internal/daemons/bot/bot.go b/internal/daemons/bot/bot.go new file mode 100644 index 0000000..69d4631 --- /dev/null +++ b/internal/daemons/bot/bot.go @@ -0,0 +1,93 @@ +package bot + +import ( + "context" + "github.com/ethereum/go-ethereum/ethclient/simulated" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "omni-balance/internal/daemons/market" + "omni-balance/utils/bot" + "omni-balance/utils/bot/balance_on_chain" + "omni-balance/utils/chains" + "omni-balance/utils/configs" + "sync" +) + +func Run(ctx context.Context, conf configs.Config) error { + existBuyTokens, err := getExistingBuyTokens() + if err != nil { + return errors.Wrap(err, "find buy tokens error") + } + var ( + clients = make(map[string]simulated.Client) + ) + for _, chain := range conf.Chains { + client, err := chains.NewTryClient(ctx, chain.RpcEndpoints) + if err != nil { + return errors.Wrap(err, "create client error") + } + clients[chain.Name] = client + } + defer func() { + for _, client := range clients { + client.(*chains.Client).Close() + } + }() + + var ( + ignoreTokens = createIgnoreTokens(existBuyTokens) + w sync.WaitGroup + ) + + for _, wallet := range conf.Wallets { + for _, token := range wallet.Tokens { + for _, chainName := range token.Chains { + if ignoreTokens.Contains(token.Name, chainName, wallet.Address) { + logrus.Debugf("ignore token %s on chain %s", token.Name, chainName) + continue + } + monitorType := token.MonitorTypes[chainName] + if monitorType == "" { + monitorType = balance_on_chain.BalanceOnChain{}.Name() + } + fn := process(ctx, conf, wallet.Address, token.Name, chainName, monitorType, clients[chainName]) + w.Add(1) + go func() { + defer w.Done() + tasks, processType, err := fn() + if err != nil { + logrus.Errorf("bot error: %s", err) + return + } + _, taskId, err := createOrder(tasks, processType) + if err != nil { + logrus.Errorf("create order error: %s", err) + return + } + market.PushTask(market.Task{ + Id: taskId, + ProcessType: processType, + }) + }() + } + } + } + w.Wait() + return nil +} + +func process(ctx context.Context, conf configs.Config, walletAddress, tokenName, chainName, monitorType string, + client simulated.Client) func() ([]bot.Task, bot.ProcessType, error) { + m := bot.GetMonitor(monitorType) + return func() ([]bot.Task, bot.ProcessType, error) { + return m.Check(ctx, bot.Params{ + Conf: conf, + Info: bot.Config{ + Wallet: conf.GetWallet(walletAddress), + TokenName: tokenName, + Chain: chainName, + }, + Client: client, + }) + } +} diff --git a/internal/daemons/bot/init.go b/internal/daemons/bot/init.go new file mode 100644 index 0000000..a016fca --- /dev/null +++ b/internal/daemons/bot/init.go @@ -0,0 +1,15 @@ +package bot + +import ( + "omni-balance/internal/daemons" + "time" +) + +func init() { + daemons.RegisterIntervalTask(daemons.Task{ + Name: "bot", + Description: "Check the balance based on the specific bot and create an order", + TaskFunc: Run, + DefaultInterval: time.Minute * 3, + }) +} diff --git a/internal/daemons/bot/util.go b/internal/daemons/bot/util.go new file mode 100644 index 0000000..e55c26d --- /dev/null +++ b/internal/daemons/bot/util.go @@ -0,0 +1,71 @@ +package bot + +import ( + uuid "github.com/satori/go.uuid" + "omni-balance/internal/db" + "omni-balance/internal/models" + "omni-balance/utils" + "omni-balance/utils/bot" + "omni-balance/utils/provider" +) + +func getExistingBuyTokens() ([]*models.Order, error) { + var existBuyTokens []*models.Order + err := db.DB().Where("status != ? ", provider.TxStatusSuccess).Find(&existBuyTokens).Error + if err != nil { + return nil, err + } + return existBuyTokens, nil +} + +func createIgnoreTokens(existBuyTokens []*models.Order) IgnoreTokens { + var ignoreTokens []IgnoreToken + for _, v := range existBuyTokens { + ignoreTokens = append(ignoreTokens, IgnoreToken{ + Name: v.TokenOutName, + Chain: v.TargetChainName, + Address: v.Wallet, + }) + } + return ignoreTokens +} + +func createOrder(tasks []bot.Task, processType bot.ProcessType) (orders []*models.Order, taskId string, err error) { + if len(tasks) == 0 { + return + } + var ( + txn = db.DB().Begin() + ) + + taskId = uuid.NewV4().String() + + for _, v := range tasks { + o := &models.Order{ + Wallet: v.Wallet, + TokenInName: v.TokenInName, + TokenOutName: v.TokenOutName, + SourceChainName: v.TokenInChainName, + TargetChainName: v.TokenOutChainName, + CurrentChainName: v.CurrentChainName, + Amount: v.Amount, + Status: v.Status, + ProviderType: v.ProviderType, + ProviderName: v.ProviderName, + Order: utils.Object2JsonRawMessage(v.Order), + TaskId: taskId, + ProcessType: string(processType), + } + + if err = txn.Create(o).Error; err != nil { + txn.Rollback() + return + } + orders = append(orders, o) + } + if err = txn.Commit().Error; err != nil { + txn.Rollback() + return + } + return +} diff --git a/internal/daemons/bot/vars.go b/internal/daemons/bot/vars.go new file mode 100644 index 0000000..ece8ed3 --- /dev/null +++ b/internal/daemons/bot/vars.go @@ -0,0 +1,25 @@ +package bot + +import ( + "fmt" + "strings" +) + +type IgnoreTokens []IgnoreToken + +type IgnoreToken struct { + Name string `json:"name"` + Chain string `json:"chain"` + Address string `json:"wallet"` +} + +func (i IgnoreTokens) Contains(name, chain, address string) bool { + key := fmt.Sprintf("%s_%s_%s", name, chain, address) + for _, token := range i { + if !strings.EqualFold(key, fmt.Sprintf("%s_%s_%s", token.Name, token.Chain, token.Address)) { + continue + } + return true + } + return false +} diff --git a/internal/daemons/market/init.go b/internal/daemons/market/init.go new file mode 100644 index 0000000..d4d97c6 --- /dev/null +++ b/internal/daemons/market/init.go @@ -0,0 +1,16 @@ +package market + +import ( + "omni-balance/internal/daemons" + "time" +) + +func init() { + daemons.RegisterIntervalTask(daemons.Task{ + Name: "market", + Description: "Look for tasks in the database that are not being processed and process them.", + TaskFunc: Run, + DefaultInterval: time.Minute * 3, + RunOnStart: true, + }) +} diff --git a/internal/daemons/market/market.go b/internal/daemons/market/market.go new file mode 100644 index 0000000..231db58 --- /dev/null +++ b/internal/daemons/market/market.go @@ -0,0 +1,271 @@ +package market + +import ( + "context" + "fmt" + "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "omni-balance/internal/daemons" + "omni-balance/internal/db" + "omni-balance/internal/models" + "omni-balance/utils" + "omni-balance/utils/bot" + "omni-balance/utils/chains" + "omni-balance/utils/configs" + "omni-balance/utils/notice" + "omni-balance/utils/provider" + "sync" + "sync/atomic" + "time" +) + +var ( + hasRunQueue = atomic.Bool{} + processTasks sync.Map +) + +func Run(ctx context.Context, conf configs.Config) error { + if !hasRunQueue.Load() { + go runFromQueue(ctx, conf) + } + hasRunQueue.Store(true) + tasks, err := models.ListNotSuccessTasks(ctx, db.DB(), func(order models.Order) bool { + if order.HasLocked() { + return false + } + return !utils.InArrayFold(order.Status.String(), []string{ + models.OrderStatusWaitTransferFromOperator.String(), + models.OrderStatusWaitCrossChain.String(), + }) + }) + if err != nil { + return errors.Wrap(err, "list tasks error") + } + for index := range tasks { + if _, ok := processTasks.Load(tasks[index].Id); ok { + continue + } + PushTask(Task{ + Id: tasks[index].Id, + ProcessType: bot.ProcessType(tasks[index].ProviderType), + }) + } + return nil +} + +func runFromQueue(ctx context.Context, conf configs.Config) { + defer func() { + if err := recover(); err != nil { + logrus.Errorf("market job error: %v, restart after 5 second", err) + time.Sleep(time.Second * 5) + runFromQueue(ctx, conf) + } + }() + for { + select { + case <-ctx.Done(): + return + + case task := <-taskQueue: + if _, ok := processTasks.Load(task.Id); ok { + return + } + orders, err := models.ListOrdersByTaskId(ctx, db.DB(), task.Id) + if err != nil { + logrus.Errorf("get orders by task id error: %s", err.Error()) + continue + } + if len(orders) == 0 { + continue + } + logrus.Debugf("Start task %s. There are %d orders", task.Id, len(orders)) + fn := func(orders []models.Order, task Task) func() { + processTasks.Store(task.Id, struct{}{}) + return func() { + defer func() { processTasks.Delete(task.Id) }() + + var taskWait sync.WaitGroup + for index := range orders { + if orders[index].HasLocked() { + continue + } + if task.ProcessType == bot.Parallel { + taskWait.Add(1) + go func(order models.Order) { + taskWait.Done() + do(ctx, order, conf) + }(orders[index]) + continue + } + do(ctx, orders[index], conf) + } + taskWait.Wait() + logrus.Infof("Finish all orders in task %s", task.Id) + } + } + utils.Go(fn(orders, task)) + } + } +} + +func do(ctx context.Context, order models.Order, conf configs.Config) { + defer utils.Recover() + log := order.GetLogs() + subCtx, cancel := context.WithCancel(utils.SetLogToCtx(ctx, log)) + defer cancel() + utils.Go(func() { + defer cancel() + var t = time.NewTicker(time.Second * 5) + defer t.Stop() + for { + select { + case <-subCtx.Done(): + return + case <-t.C: + var count int64 + _ = db.DB().Model(&models.Order{}).Where("id = ?", order.ID).Count(&count) + if count == 0 { + log.Infof("order #%d not found, exit this order rebalance", order.ID) + return + } + } + } + }) + + err := processOrder(subCtx, order, conf) + if errors.Is(err, context.Canceled) || errors.Is(ctx.Err(), context.Canceled) || errors.Is(subCtx.Err(), context.Canceled) { + return + } + if err != nil { + if addOrderError(order.ID) >= 10 { + _ = notice.Send( + provider.WithNotify(ctx, provider.WithNotifyParams{ + TaskId: order.TaskId, + OrderId: order.ID, + Receiver: common.HexToAddress(order.Wallet), + TokenOut: order.TokenOutName, + TokenOutChain: order.TargetChainName, + }), + "process order error", + fmt.Sprintf("order #%d error: %s, please check the provider configuration then restart the application.", order.ID, err), + logrus.ErrorLevel, + ) + return + } + log.Errorf(" order #%d error: %s, retry after 10 second", order.ID, err) + time.Sleep(time.Second * 10) + return + } + removeOrderError(order.ID) + order = models.GetOrder(ctx, db.DB(), order.ID) + err = notice.Send( + provider.WithNotify(ctx, provider.WithNotifyParams{ + TaskId: order.TaskId, + OrderId: order.ID, + Receiver: common.HexToAddress(order.Wallet), + CurrentBalance: order.CurrentBalance, + }), + fmt.Sprintf("rebalance %s on %s success", order.TokenOutName, order.TargetChainName), + fmt.Sprintf("rebalance %s %s from %s to %s use %s %s", + order.TokenOutName, order.Amount, order.SourceChainName, order.TargetChainName, + order.ProviderName, order.ProviderType), + logrus.InfoLevel, + ) + if err != nil { + log.Debugf("notice error: %s", err) + } + log.Infof(" order #%d success", order.ID) +} + +func processOrder(ctx context.Context, order models.Order, conf configs.Config) error { + log := utils.GetLogFromCtx(ctx) + if order.Lock(db.DB()) { + return errors.Errorf("order #%d locked, unlock time is %s", order.ID, time.Unix(order.LockTime+60*60*1, 0)) + } + defer order.UnLock(db.DB()) + var ( + orderProcess = models.GetLastOrderProcess(ctx, db.DB(), order.ID) + args = daemons.CreateSwapParams(order, orderProcess, log, conf.GetWallet(order.Wallet)) + wallet = conf.GetWallet(order.Wallet) + token = conf.GetTokenInfoOnChain(order.TokenOutName, order.TargetChainName) + chain = conf.GetChainConfig(order.TargetChainName) + client, err = chains.NewTryClient(ctx, chain.RpcEndpoints) + ) + + if err != nil { + return errors.Wrap(err, "new evm client error") + } + defer client.Close() + if wallet.IsDifferentAddress() || order.Status == models.OrderStatusWaitTransferFromOperator { + ok, err := transfer(ctx, order, args, conf, client) + if err != nil && ok { + return errors.Wrap(err, "transfer error") + } + if ok { + return nil + } + log.Infof("cannot use transfer, try other providers.") + } + + balance, err := wallet.GetExternalBalance(ctx, common.HexToAddress(token.ContractAddress), token.Decimals, client) + if err != nil { + return errors.Wrap(err, "check balance error") + } + + for _, v := range conf.GetWalletConfig(order.Wallet).Tokens { + if !utils.InArray(order.TargetChainName, v.Chains) { + continue + } + if order.TokenOutName != v.Name { + continue + } + if !balance.GreaterThan(balance) { + break + } + log.Infof("%s balance on %s is enough, skip", v.Name, order.TargetChainName) + if err := order.Success(db.DB(), "", nil, balance); err != nil { + return errors.Wrap(err, "update order success error") + } + return nil + } + + providerObj, err := getBestProvider(ctx, order, conf) + if err != nil { + return errors.Wrap(err, "get provider error") + } + + if err := order.SaveProvider(db.DB(), providerObj.Type(), providerObj.Name()); err != nil { + return errors.Wrap(err, "save provider error") + } + + log.Infof("start #%d %s on %s use %s provider", order.ID, order.TokenOutName, + order.TargetChainName, providerObj.Name()) + result, providerErr := providerObj.Swap(ctx, args) + if errors.Is(providerErr, context.Canceled) { + return nil + } + if result.Status == "" { + return errors.New("the result status is empty") + } + if result.CurrentChain != args.TargetChain && providerErr == nil { + result.Status = models.OrderStatusWaitCrossChain + } + if err := createUpdateLog(ctx, order, result, conf, client); err != nil { + return errors.Wrap(err, "create update log error") + } + if providerErr != nil { + return errors.Wrap(providerErr, "provider error") + } + if args.Receiver != result.Receiver && result.Receiver != "" { + order = models.GetOrder(ctx, db.DB(), order.ID) + if order.ID == 0 { + return errors.New("order not found") + } + _, err = transfer(ctx, order, daemons.CreateSwapParams(order, orderProcess, log, conf.GetWallet(order.Wallet)), conf, client) + if err != nil { + return errors.Wrap(err, "transfer error") + } + } + return nil +} diff --git a/internal/daemons/market/transfer.go b/internal/daemons/market/transfer.go new file mode 100644 index 0000000..f9c037f --- /dev/null +++ b/internal/daemons/market/transfer.go @@ -0,0 +1,34 @@ +package market + +import ( + "context" + "github.com/ethereum/go-ethereum/ethclient/simulated" + "github.com/pkg/errors" + "omni-balance/internal/models" + "omni-balance/utils/configs" + "omni-balance/utils/constant" + "omni-balance/utils/error_types" + "omni-balance/utils/provider" +) + +func transfer(ctx context.Context, order models.Order, args provider.SwapParams, + conf configs.Config, client simulated.Client) (bool, error) { + ctx = context.WithValue(ctx, constant.ChainNameKeyInCtx, order.TargetChainName) + //if order.Status != models.OrderStatusWaitTransferFromOperator { + // return false, errors.Errorf("order #%d status is %s, not wait transfer from operator", order.ID, order.Status) + //} + result, err := provider.Transfer(ctx, conf, args, client) + if errors.Is(err, error_types.ErrNativeTokenInsufficient) || + errors.Is(err, error_types.ErrWalletLocked) || + errors.Is(err, context.Canceled) { + return true, errors.Wrap(err, "transfer error") + } + if err == nil { + return true, createUpdateLog(ctx, order, result, conf, client) + } + if !errors.Is(errors.Unwrap(err), error_types.ErrInsufficientBalance) && + !errors.Is(errors.Unwrap(err), error_types.ErrInsufficientLiquidity) && err != nil { + return false, errors.Wrap(err, "transfer not is insufficient balance") + } + return false, nil +} diff --git a/internal/daemons/market/util.go b/internal/daemons/market/util.go new file mode 100644 index 0000000..efc7d6f --- /dev/null +++ b/internal/daemons/market/util.go @@ -0,0 +1,194 @@ +package market + +import ( + "context" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient/simulated" + "github.com/pkg/errors" + "github.com/shopspring/decimal" + "github.com/sirupsen/logrus" + "omni-balance/internal/db" + "omni-balance/internal/models" + "omni-balance/utils" + "omni-balance/utils/configs" + "omni-balance/utils/error_types" + "omni-balance/utils/provider" + "omni-balance/utils/wallets" + "sync" +) + +var ( + orderError = make(map[uint]int) + m sync.Mutex +) + +func createUpdateLog(ctx context.Context, order models.Order, result provider.SwapResult, conf configs.Config, + client simulated.Client) error { + + wallet := conf.GetWallet(order.Wallet) + walletBalance := getWalletTokenBalance(ctx, wallet, order.TokenOutName, order.TargetChainName, conf, client) + + updateOrder := &models.Order{ + TokenInName: result.TokenInName, + SourceChainName: result.TokenInChainName, + CurrentChainName: result.CurrentChain, + CurrentBalance: walletBalance, + ProviderOrderId: result.OrderId, + Tx: result.Tx, + Order: result.MarshalOrder(), + Error: result.Error, + Status: result.Status, + } + log := utils.GetLogFromCtx(ctx).WithFields(logrus.Fields{ + "order_id": order.ID, + "result": utils.ToMap(result), + }) + if result.Status == provider.TxStatusSuccess && + wallet.IsDifferentAddress() && + result.Receiver != order.Wallet && + result.Receiver != "" { + updateOrder.Status = provider.TxStatus(models.OrderStatusWaitTransferFromOperator) + } + log.Debugf("order status is %v", updateOrder.Status) + return db.DB().Model(&models.Order{}).Where("id = ?", order.ID).Limit(1).Updates(updateOrder).Error +} + +func getWalletTokenBalance(ctx context.Context, wallet wallets.Wallets, tokenName, chainName string, + conf configs.Config, client simulated.Client) decimal.Decimal { + + chainConfig := conf.GetChainConfig(chainName) + if len(chainConfig.RpcEndpoints) == 0 { + return decimal.Zero + } + token := conf.GetTokenInfoOnChain(tokenName, chainName) + + balance, err := wallet.GetExternalBalance(ctx, common.HexToAddress(token.ContractAddress), token.Decimals, client) + if err != nil { + return decimal.Zero + } + return balance +} + +func getBestProvider(ctx context.Context, order models.Order, conf configs.Config) (provider.Provider, error) { + log := order.GetLogs() + if order.ProviderType != "" && order.ProviderName != "" { + log.Debugf("provider type is %s, provider name is %s", order.ProviderType, order.ProviderName) + fn, err := provider.GetProvider(order.ProviderType, order.ProviderName) + if err != nil { + return nil, errors.Wrap(err, "get provider error") + } + return fn(conf) + } + type canUseProvider struct { + provider provider.Provider + tokenInCosts provider.TokenInCosts + } + var canUseProviders []canUseProvider + providers := provider.ListProvidersByConfig(conf) + for _, providerFns := range providers { + for _, providerFn := range providerFns { + p, err := provider.InitializeBridge(providerFn, conf) + if err != nil { + log.Debugf("init provider error: %s", err.Error()) + continue + } + log = log.WithFields(logrus.Fields{ + "provider_type": p.Type(), + "provider_name": p.Name(), + }) + tokenInCosts, ok := providerSupportsOrder(ctx, p, order, conf, log) + if !ok || len(tokenInCosts) == 0 { + continue + } + log.Debugf("provider %s can use %s on %s. The tokenInCosts is %+v", + p.Name(), order.TokenOutName, order.TargetChainName, tokenInCosts) + canUseProviders = append(canUseProviders, canUseProvider{ + provider: p, + tokenInCosts: tokenInCosts, + }) + } + } + + if len(canUseProviders) <= 0 { + return nil, error_types.ErrNoProvider + } + if len(canUseProviders) == 1 { + log.Debugf("can use %s provider, the tokenIn is %+v", canUseProviders[0].provider.Name(), canUseProviders[0].tokenInCosts) + return canUseProviders[0].provider, nil + } + var ( + minPrice decimal.Decimal + providerObj provider.Provider + ) + for _, canUseProvider := range canUseProviders { + var ( + tokenNames []string + tokenInCostsMap = make(map[string]decimal.Decimal) + ) + for _, tokenIn := range canUseProvider.tokenInCosts { + if tokenIn.TokenName == order.TokenInName { + return canUseProvider.provider, nil + } + tokenInCostsMap[tokenIn.TokenName] = tokenIn.CostAmount + tokenNames = append(tokenNames, tokenIn.TokenName) + + } + tokenName2Price, err := models.FindTokenPrice(db.DB(), tokenNames) + if err != nil { + log.Warnf("find token price error: %s", err.Error()) + continue + } + + for name, v := range tokenName2Price { + log.Debugf("token %s price %s on %s", name, v.String(), canUseProvider.provider.Name()) + price := v.Mul(tokenInCostsMap[name]) + if price.IsZero() { + continue + } + if price.LessThan(minPrice) { + minPrice = price + providerObj = canUseProvider.provider + continue + } + if minPrice.IsZero() { + minPrice = price + providerObj = canUseProvider.provider + } + } + } + if providerObj == nil { + return nil, errors.New("no provider can use") + } + log.Debugf("min price %s, provider %s", minPrice, providerObj.Name()) + return providerObj, nil +} + +func providerSupportsOrder(ctx context.Context, p provider.Provider, order models.Order, + conf configs.Config, log *logrus.Entry) (provider.TokenInCosts, bool) { + tokenInCosts, err := p.GetCost(ctx, provider.SwapParams{ + SourceToken: order.TokenInName, + Sender: conf.GetWallet(order.Wallet), + TargetToken: order.TokenOutName, + Receiver: order.Wallet, + TargetChain: order.TargetChainName, + Amount: order.Amount, + }) + if err != nil { + log.Debugf("check token %s on %s use %s error: %s", order.TokenOutName, order.TargetChainName, p.Name(), err.Error()) + return nil, false + } + return tokenInCosts, true +} + +func addOrderError(orderId uint) int { + m.Lock() + defer m.Unlock() + orderError[orderId] = orderError[orderId] + 1 + return orderError[orderId] +} + +func removeOrderError(orderId uint) { + m.Lock() + defer m.Unlock() + delete(orderError, orderId) +} diff --git a/internal/daemons/market/vars.go b/internal/daemons/market/vars.go new file mode 100644 index 0000000..0d3f7a7 --- /dev/null +++ b/internal/daemons/market/vars.go @@ -0,0 +1,16 @@ +package market + +import "omni-balance/utils/bot" + +var ( + taskQueue = make(chan Task, 100) +) + +type Task struct { + Id string + ProcessType bot.ProcessType +} + +func PushTask(task Task) { + taskQueue <- task +} diff --git a/internal/daemons/monitor/monitor.go b/internal/daemons/monitor/monitor.go deleted file mode 100644 index 1cb9d15..0000000 --- a/internal/daemons/monitor/monitor.go +++ /dev/null @@ -1,100 +0,0 @@ -package monitor - -import ( - "context" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "omni-balance/internal/daemons" - "omni-balance/internal/db" - "omni-balance/internal/models" - "omni-balance/utils/configs" - "omni-balance/utils/provider" - "omni-balance/utils/wallet_monitor" - "time" -) - -func init() { - daemons.RegisterIntervalTask(daemons.Task{ - Name: "monitor_wallet_balance", - Description: "Responsible for monitoring the balance of specified tokens in the wallet.", - TaskFunc: Run, - DefaultInterval: time.Minute * 3, - }) -} - -func Run(ctx context.Context, conf configs.Config) error { - existBuyTokens, err := getExistingBuyTokens() - if err != nil { - return errors.Wrap(err, "find buy tokens error") - } - - ignoreTokens := createIgnoreTokens(existBuyTokens) - result, err := wallet_monitor.NewMonitor(conf).Check(ctx, ignoreTokens...) - if err != nil { - return errors.Wrap(err, "check wallet error") - } - - if err := createOrder(result, conf); err != nil { - return errors.Wrap(err, "create buy tokens error") - } - return nil -} - -func getExistingBuyTokens() ([]*models.Order, error) { - var existBuyTokens []*models.Order - err := db.DB().Where("status != ? ", provider.TxStatusSuccess).Find(&existBuyTokens).Error - if err != nil { - return nil, err - } - return existBuyTokens, nil -} - -func createIgnoreTokens(existBuyTokens []*models.Order) []wallet_monitor.IgnoreToken { - var ignoreTokens []wallet_monitor.IgnoreToken - for _, v := range existBuyTokens { - ignoreTokens = append(ignoreTokens, wallet_monitor.IgnoreToken{ - Name: v.TokenOutName, - Chain: v.TargetChainName, - Address: v.Wallet, - }) - } - return ignoreTokens -} - -func createOrder(result []wallet_monitor.Result, conf configs.Config) error { - if len(result) == 0 { - return nil - } - var orders []*models.Order - for _, r := range result { - for _, token := range r.Tokens { - for _, v := range token.Chains { - threshold := conf.GetTokenThreshold(r.Wallet, token.Name, v.ChainName) - if v.TokenBalance.Add(v.Amount).LessThanOrEqual(threshold) { - v.Amount = threshold.Add(v.Amount).Sub(v.TokenBalance) - logrus.WithFields(logrus.Fields{ - "wallet": r.Wallet, - "token": token.Name, - "chain": v.ChainName, - "threshold": threshold, - "token_balance": v.TokenBalance, - }).Infof("The amount was set too small, "+ - "and another rebalance would still be required after this one. "+ - "Therefore, the amount for this rebalance is set to %s.", v.Amount) - } - orders = append(orders, &models.Order{ - Wallet: r.Wallet, - TokenOutName: token.Name, - TargetChainName: v.ChainName, - CurrentBalance: v.TokenBalance, - Amount: v.Amount, - Status: provider.TxStatusPending, - }) - } - } - } - if err := db.DB().CreateInBatches(orders, 100).Error; err != nil { - return err - } - return nil -} diff --git a/internal/daemons/rebalance/rebalance.go b/internal/daemons/rebalance/rebalance.go deleted file mode 100644 index 4cbdf51..0000000 --- a/internal/daemons/rebalance/rebalance.go +++ /dev/null @@ -1,416 +0,0 @@ -package rebalance - -import ( - "context" - "fmt" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/ethclient/simulated" - "github.com/pkg/errors" - "github.com/shopspring/decimal" - "github.com/sirupsen/logrus" - "omni-balance/internal/daemons" - "omni-balance/internal/db" - "omni-balance/internal/models" - "omni-balance/utils" - "omni-balance/utils/chains" - "omni-balance/utils/configs" - "omni-balance/utils/constant" - "omni-balance/utils/error_types" - "omni-balance/utils/notice" - "omni-balance/utils/provider" - "omni-balance/utils/wallets" - "sync" - "time" -) - -func init() { - daemons.RegisterIntervalTask(daemons.Task{ - Name: "rebalance", - Description: "Responsible for injecting specified assets into monitored wallets in the most efficient way when the balance of specified tokens is insufficient.", - TaskFunc: Run, - DefaultInterval: time.Minute * 10, - }) -} - -var ( - orderError = make(map[uint]int) - m sync.Mutex -) - -func addOrderError(orderId uint) int { - m.Lock() - defer m.Unlock() - orderError[orderId] = orderError[orderId] + 1 - return orderError[orderId] -} - -func removeOrderError(orderId uint) { - m.Lock() - defer m.Unlock() - delete(orderError, orderId) -} - -func Run(ctx context.Context, conf configs.Config) error { - orders, err := listOrders(ctx) - if err != nil { - return errors.Wrap(err, "find orders error") - } - if len(orders) == 0 { - return nil - } - var w sync.WaitGroup - for index := range orders { - if utils.InArray(orders[index].Status.String(), []string{models.OrderStatusWaitCrossChain.String()}) { - continue - } - if orders[index].HasLocked() { - logrus.Debugf("order #%d has locked, skip", orders[index].ID) - continue - } - w.Add(1) - go func(order *models.Order) { - defer utils.Recover() - defer w.Done() - log := order.GetLogs() - subCtx, cancel := context.WithCancel(utils.SetLogToCtx(ctx, log)) - defer cancel() - utils.Go(func() { - defer cancel() - var t = time.NewTicker(time.Second * 5) - defer t.Stop() - - for { - select { - case <-subCtx.Done(): - return - case <-t.C: - var count int64 - _ = db.DB().Model(&models.Order{}).Where("id = ?", order.ID).Count(&count) - if count == 0 { - log.Infof("order #%d not found, exit this order rebalance", order.ID) - return - } - } - } - }) - - err := reBalance(subCtx, order, conf) - if errors.Is(err, error_types.ErrNoProvider) { - if addOrderError(order.ID) > 3 { - log.Errorf("order #%d not found provider, exit this order rebalance. send notice", order.ID) - _ = notice.Send( - provider.WithNotify(ctx, provider.WithNotifyParams{ - OrderId: order.ID, - Receiver: common.HexToAddress(order.Wallet), - TokenOut: order.TokenOutName, - TokenOutChain: order.TargetChainName, - }), - "Find provider error", - fmt.Sprintf("order #%d not found provider, please check the provider configuration.", order.ID), - logrus.ErrorLevel, - ) - return - } - } - if err != nil { - log.Errorf("reBalance order #%d error: %s", order.ID, err) - return - } - removeOrderError(order.ID) - order = models.GetOrder(ctx, db.DB(), order.ID) - - err = notice.Send( - provider.WithNotify(ctx, provider.WithNotifyParams{ - OrderId: order.ID, - Receiver: common.HexToAddress(order.Wallet), - CurrentBalance: order.CurrentBalance, - }), - fmt.Sprintf("rebalance %s on %s success", order.TokenOutName, order.TargetChainName), - fmt.Sprintf("rebalance %s %s from %s to %s use %s %s", - order.TokenOutName, order.Amount, order.SourceChainName, order.TargetChainName, - order.ProviderName, order.ProviderType), - logrus.InfoLevel, - ) - if err != nil { - log.Debugf("notice error: %s", err) - } - log.Infof("reBalance order #%d success", order.ID) - }(orders[index]) - } - w.Wait() - return nil -} - -func transfer(ctx context.Context, order *models.Order, args provider.SwapParams, - conf configs.Config, client simulated.Client) (bool, error) { - ctx = context.WithValue(ctx, constant.ChainNameKeyInCtx, order.TargetChainName) - if order.Status != models.OrderStatusWaitTransferFromOperator { - return false, errors.Errorf("order #%d status is %s, not wait transfer from operator", order.ID, order.Status) - } - result, err := provider.Transfer(ctx, conf, args, client) - if errors.Is(err, error_types.ErrNativeTokenInsufficient) || - errors.Is(err, error_types.ErrWalletLocked) || - errors.Is(err, context.Canceled) { - return true, errors.Wrap(err, "transfer error") - } - if err == nil { - return true, createUpdateLog(ctx, order, result, conf, client) - } - if !errors.Is(errors.Unwrap(err), error_types.ErrInsufficientBalance) && - !errors.Is(errors.Unwrap(err), error_types.ErrInsufficientLiquidity) && err != nil { - return true, errors.Wrap(err, "transfer not is insufficient balance") - } - return false, nil -} - -func reBalance(ctx context.Context, order *models.Order, conf configs.Config) error { - log := utils.GetLogFromCtx(ctx) - if order.Lock(db.DB()) { - return errors.Errorf("order #%d locked, unlock time is %s", order.ID, time.Unix(order.LockTime+60*60*1, 0)) - } - defer order.UnLock(db.DB()) - var ( - orderProcess = models.GetLastOrderProcess(ctx, db.DB(), order.ID) - args = daemons.CreateSwapParams(*order, orderProcess, log, conf.GetWallet(order.Wallet)) - wallet = conf.GetWallet(order.Wallet) - token = conf.GetTokenInfoOnChain(order.TokenOutName, order.TargetChainName) - chain = conf.GetChainConfig(order.TargetChainName) - client, err = chains.NewTryClient(ctx, chain.RpcEndpoints) - ) - - if err != nil { - return errors.Wrap(err, "new evm client error") - } - defer client.Close() - if wallet.IsDifferentAddress() || order.Status == models.OrderStatusWaitTransferFromOperator { - ok, err := transfer(ctx, order, args, conf, client) - if err != nil && ok { - return errors.Wrap(err, "transfer error") - } - if ok { - return nil - } - log.Infof("cannot use transfer, try other providers.") - } - - balance, err := wallet.GetExternalBalance(ctx, common.HexToAddress(token.ContractAddress), token.Decimals, client) - if err != nil { - return errors.Wrap(err, "check balance error") - } - - for _, v := range conf.GetWalletConfig(order.Wallet).Tokens { - if !utils.InArray(order.TargetChainName, v.Chains) { - continue - } - if order.TokenOutName != v.Name { - continue - } - if !balance.GreaterThan(balance) { - break - } - log.Infof("%s balance on %s is enough, skip", v.Name, order.TargetChainName) - if err := order.Success(db.DB(), "", nil, balance); err != nil { - return errors.Wrap(err, "update order success error") - } - return nil - } - - providerObj, err := getReBalanceProvider(ctx, *order, conf) - if err != nil { - return errors.Wrap(err, "get reBalance provider error") - } - - if err := order.SaveProvider(db.DB(), providerObj.Type(), providerObj.Name()); err != nil { - return errors.Wrap(err, "save provider error") - } - - log.Infof("start reBalance #%d %s on %s use %s provider", order.ID, order.TokenOutName, - order.TargetChainName, providerObj.Name()) - result, providerErr := providerObj.Swap(ctx, args) - if result.Status == "" { - return errors.New("the result status is empty") - } - if result.CurrentChain != args.TargetChain { - result.Status = models.OrderStatusWaitCrossChain - } - if err := createUpdateLog(ctx, order, result, conf, client); err != nil { - return errors.Wrap(err, "create update log error") - } - if providerErr != nil { - return errors.Wrap(providerErr, "provider error") - } - if args.Receiver != result.Receiver && result.Receiver != "" { - order = models.GetOrder(ctx, db.DB(), order.ID) - if order == nil { - return errors.New("order not found") - } - _, err = transfer(ctx, order, daemons.CreateSwapParams(*order, orderProcess, log, conf.GetWallet(order.Wallet)), conf, client) - if err != nil { - return errors.Wrap(err, "transfer error") - } - } - return nil -} - -func listOrders(_ context.Context) ([]*models.Order, error) { - var orders []*models.Order - err := db.DB().Where("status != ?", provider.TxStatusSuccess).Find(&orders).Error - if err != nil { - return nil, errors.Wrap(err, "find buy tokens error") - } - return orders, nil -} - -func createUpdateLog(ctx context.Context, order *models.Order, result provider.SwapResult, conf configs.Config, - client simulated.Client) error { - - wallet := conf.GetWallet(order.Wallet) - walletBalance := getWalletTokenBalance(ctx, wallet, order.TokenOutName, order.TargetChainName, conf, client) - - updateOrder := &models.Order{ - TokenInName: result.TokenInName, - SourceChainName: result.TokenInChainName, - CurrentChainName: result.CurrentChain, - CurrentBalance: walletBalance, - ProviderOrderId: result.OrderId, - Tx: result.Tx, - Order: result.MarshalOrder(), - Error: result.Error, - Status: result.Status, - } - log := utils.GetLogFromCtx(ctx).WithFields(logrus.Fields{ - "order_id": order.ID, - "result": utils.ToMap(result), - }) - if result.Status == provider.TxStatusSuccess && - wallet.IsDifferentAddress() && - result.Receiver != order.Wallet && - result.Receiver != "" { - updateOrder.Status = provider.TxStatus(models.OrderStatusWaitTransferFromOperator) - } - log.Debugf("order status is %v", updateOrder.Status) - return db.DB().Model(&models.Order{}).Where("id = ?", order.ID).Limit(1).Updates(updateOrder).Error -} - -func getWalletTokenBalance(ctx context.Context, wallet wallets.Wallets, tokenName, chainName string, - conf configs.Config, client simulated.Client) decimal.Decimal { - - chainConfig := conf.GetChainConfig(chainName) - if len(chainConfig.RpcEndpoints) == 0 { - return decimal.Zero - } - token := conf.GetTokenInfoOnChain(tokenName, chainName) - - balance, err := wallet.GetExternalBalance(ctx, common.HexToAddress(token.ContractAddress), token.Decimals, client) - if err != nil { - return decimal.Zero - } - return balance -} - -func getReBalanceProvider(ctx context.Context, order models.Order, conf configs.Config) (provider.Provider, error) { - log := order.GetLogs() - if order.ProviderType != "" && order.ProviderName != "" { - log.Debugf("provider type is %s, provider name is %s", order.ProviderType, order.ProviderName) - fn, err := provider.GetProvider(order.ProviderType, order.ProviderName) - if err != nil { - return nil, errors.Wrap(err, "get provider error") - } - return fn(conf) - } - type canUseProvider struct { - provider provider.Provider - tokenInCosts provider.TokenInCosts - } - var canUseProviders []canUseProvider - providers := provider.ListProvidersByConfig(conf) - for _, providerFns := range providers { - for _, providerFn := range providerFns { - p, err := provider.InitializeBridge(providerFn, conf) - if err != nil { - log.Debugf("init provider error: %s", err.Error()) - continue - } - log = log.WithFields(logrus.Fields{ - "provider_type": p.Type(), - "provider_name": p.Name(), - }) - tokenInCosts, ok := providerSupportsOrder(ctx, p, order, conf, log) - if !ok || len(tokenInCosts) == 0 { - continue - } - log.Debugf("provider %s can use %s on %s. The tokenInCosts is %+v", - p.Name(), order.TokenOutName, order.TargetChainName, tokenInCosts) - canUseProviders = append(canUseProviders, canUseProvider{ - provider: p, - tokenInCosts: tokenInCosts, - }) - } - } - - if len(canUseProviders) <= 0 { - return nil, error_types.ErrNoProvider - } - if len(canUseProviders) == 1 { - log.Debugf("can use %s provider, the tokenIn is %+v", canUseProviders[0].provider.Name(), canUseProviders[0].tokenInCosts) - return canUseProviders[0].provider, nil - } - var ( - minPrice decimal.Decimal - providerObj provider.Provider - ) - for _, canUseProvider := range canUseProviders { - var ( - tokenNames []string - tokenInCostsMap = make(map[string]decimal.Decimal) - ) - for _, tokenIn := range canUseProvider.tokenInCosts { - if tokenIn.TokenName == order.TokenInName { - return canUseProvider.provider, nil - } - tokenInCostsMap[tokenIn.TokenName] = tokenIn.CostAmount - tokenNames = append(tokenNames, tokenIn.TokenName) - - } - tokenName2Price, err := models.FindTokenPrice(db.DB(), tokenNames) - if err != nil { - log.Warnf("find token price error: %s", err.Error()) - continue - } - - for name, v := range tokenName2Price { - log.Debugf("token %s price %s on %s", name, v.String(), canUseProvider.provider.Name()) - price := v.Mul(tokenInCostsMap[name]) - if price.LessThan(minPrice) { - minPrice = price - providerObj = canUseProvider.provider - continue - } - if minPrice.IsZero() { - minPrice = price - providerObj = canUseProvider.provider - } - } - } - if providerObj == nil { - return nil, errors.New("no provider can use") - } - log.Debugf("min price %s, provider %s", minPrice, providerObj.Name()) - return providerObj, nil -} - -func providerSupportsOrder(ctx context.Context, p provider.Provider, order models.Order, conf configs.Config, log *logrus.Entry) (provider.TokenInCosts, bool) { - tokenInCosts, err := p.GetCost(ctx, provider.SwapParams{ - SourceToken: order.TokenInName, - Sender: conf.GetWallet(order.Wallet), - TargetToken: order.TokenOutName, - Receiver: order.Wallet, - TargetChain: order.TargetChainName, - Amount: order.Amount, - }) - if err != nil { - log.Debugf("check token %s on %s use %s error: %s", order.TokenOutName, order.TargetChainName, p.Name(), err.Error()) - return nil, false - } - return tokenInCosts, true -} diff --git a/internal/daemons/utils.go b/internal/daemons/utils.go index 6893066..8a69c8b 100644 --- a/internal/daemons/utils.go +++ b/internal/daemons/utils.go @@ -55,7 +55,7 @@ func createOrderProcess(order models.Order, s provider.SwapHistory) *models.Orde providerName = order.ProviderName ) if s.ProviderType != "" { - providerType = configs.LiquidityProviderType(s.ProviderType) + providerType = configs.ProviderType(s.ProviderType) } if s.ProviderName != "" { providerName = s.ProviderName diff --git a/internal/models/order.go b/internal/models/order.go index 667abf2..b796511 100644 --- a/internal/models/order.go +++ b/internal/models/order.go @@ -3,77 +3,87 @@ package models import ( "context" "encoding/json" + "github.com/pkg/errors" "github.com/shopspring/decimal" "github.com/sirupsen/logrus" "gorm.io/gorm" "omni-balance/utils/configs" "omni-balance/utils/provider" - "time" + "sync" ) -type OrderStatus string - const ( OrderStatusWaitTransferFromOperator provider.TxStatus = "wait_transfer_from_operator" OrderStatusWaitCrossChain provider.TxStatus = "wait_cross_chain" ) +var ( + orderLocker sync.Map +) + type Order struct { gorm.Model // 唯一索引 - Wallet string `json:"wallet" gorm:"type:varchar(64)"` - TokenInName string `json:"token_in_name"` - TokenOutName string `json:"token_out_name" gorm:"type:varchar(64)"` - SourceChainName string `json:"source_chain_name"` - TargetChainName string `json:"target_chain_name"` - CurrentChainName string `json:"current_chain_name" gorm:"type:varchar(64)"` - CurrentBalance decimal.Decimal `json:"current_balance" gorm:"type:decimal(32,16); default:0"` - Amount decimal.Decimal `json:"amount" gorm:"type:decimal(32,16); default:0"` - IsLock bool `json:"is_lock" gore:"type:boolean; default:false"` - LockTime int64 `json:"lock_time" gorm:"default:0"` - Status provider.TxStatus `json:"status" gorm:"type:int; default:0;index"` - ProviderType configs.LiquidityProviderType `json:"provider_type" gorm:"type:varchar(64)"` - ProviderName string `json:"provider_name" gorm:"type:varchar(64)"` - ProviderOrderId string `json:"order_id" gorm:"type:varchar(64)"` - Tx string `json:"tx" gorm:"type:varchar(64)"` - Order *json.RawMessage `json:"order" gorm:"type:json;default:null"` - Error string `json:"error" gorm:"type:varchar(255)"` + Wallet string `json:"wallet" gorm:"type:varchar(64)"` + TokenInName string `json:"token_in_name"` + TokenOutName string `json:"token_out_name" gorm:"type:varchar(64)"` + SourceChainName string `json:"source_chain_name"` + TargetChainName string `json:"target_chain_name"` + CurrentChainName string `json:"current_chain_name" gorm:"type:varchar(64)"` + CurrentBalance decimal.Decimal `json:"current_balance" gorm:"type:decimal(32,16); default:0"` + Amount decimal.Decimal `json:"amount" gorm:"type:decimal(32,16); default:0"` + IsLock bool `json:"is_lock" gore:"type:boolean; default:false"` + LockTime int64 `json:"lock_time" gorm:"default:0"` + Status provider.TxStatus `json:"status" gorm:"type:varchar(32); default:'pending';index"` + ProviderType configs.ProviderType `json:"provider_type" gorm:"type:varchar(64)"` + ProviderName string `json:"provider_name" gorm:"type:varchar(64)"` + ProviderOrderId string `json:"order_id" gorm:"type:varchar(64)"` + Tx string `json:"tx" gorm:"type:varchar(64)"` + Order *json.RawMessage `json:"order" gorm:"type:json;default:null"` + Error string `json:"error" gorm:"type:varchar(255)"` + TaskId string `json:"task_id" gorm:"type:varchar(64)"` + ProcessType string `json:"process_type"` +} + +type Tasks struct { + Id string + ProviderType string `json:"type"` + Orders []Order `json:"orders"` } type OrderProcess struct { gorm.Model // Order.ID 一对多关联 - OrderId uint `json:"order_id" gorm:"type:int;index"` - Error string `json:"error"` - ProviderType configs.LiquidityProviderType `json:"type"` - ProviderName string `json:"provider_name"` - CurrentChainName string `json:"current_chain_name"` - Status string `json:"status"` - Action string `json:"action"` - Amount decimal.Decimal `json:"amount"` + OrderId uint `json:"order_id" gorm:"type:int;index"` + Error string `json:"error"` + ProviderType configs.ProviderType `json:"type"` + ProviderName string `json:"provider_name"` + CurrentChainName string `json:"current_chain_name"` + Status string `json:"status"` + Action string `json:"action"` + Amount decimal.Decimal `json:"amount"` // Tx is the transaction hash Tx string `json:"tx"` } func (o *Order) UnLock(db *gorm.DB) { - db.Model(&Order{}).Where("id = ?", o.ID).Updates(map[string]interface{}{"is_lock": false, "lock_time": 0}) + orderLocker.Delete(o.ID) } func (o *Order) HasLocked() bool { - return o.IsLock && time.Unix(o.LockTime, 0).Add(time.Hour).Unix() > time.Now().Unix() + _, ok := orderLocker.Load(o.ID) + return ok } func (o *Order) Lock(db *gorm.DB) bool { - var order Order - db.Where("id = ?", o.ID).First(&order) - if order.HasLocked() { + if o.HasLocked() { return true } - db.Model(&Order{}).Where("id = ?", o.ID).Updates(map[string]interface{}{"is_lock": 1, "lock_time": time.Now().Unix()}) + orderLocker.Store(o.ID, struct{}{}) return false } -func (o *Order) SaveProvider(db *gorm.DB, provider configs.LiquidityProviderType, providerName string) error { +func (o *Order) SaveProvider(db *gorm.DB, provider configs.ProviderType, providerName string) error { return db.Model(&Order{}).Where("id = ?", o.ID).Updates(map[string]interface{}{"provider_type": provider, "provider_name": providerName}).Error } @@ -94,13 +104,49 @@ func GetLastOrderProcess(ctx context.Context, db *gorm.DB, orderId uint) OrderPr return result } -func GetOrder(ctx context.Context, db *gorm.DB, orderId uint) *Order { +func GetOrder(ctx context.Context, db *gorm.DB, orderId uint) Order { var result = new(Order) _ = db.WithContext(ctx).Where("id = ?", orderId).First(&result) if result.ID == 0 { - return nil + return Order{} } - return result + return *result +} + +func ListOrdersByTaskId(ctx context.Context, db *gorm.DB, taskId string) ([]Order, error) { + var result []Order + err := db.WithContext(ctx).Where("task_id = ?", taskId).Find(&result).Error + if err != nil { + return nil, errors.Wrap(err, "find buy tokens error") + } + return result, nil +} + +func ListNotSuccessTasks(ctx context.Context, db *gorm.DB, isInclude func(order Order) bool) ([]Tasks, error) { + var ( + tasks = make(map[string][]Order) + result []Tasks + orders []Order + ) + err := db.WithContext(ctx).Where("status != ?", provider.TxStatusSuccess).Find(&orders).Error + if err != nil { + return nil, errors.Wrap(err, "find buy tokens error") + } + for index, v := range orders { + if isInclude != nil && !isInclude(v) { + continue + } + tasks[v.TaskId] = append(tasks[v.TaskId], orders[index]) + } + + for k := range tasks { + result = append(result, Tasks{ + Id: k, + ProviderType: tasks[k][0].ProcessType, + Orders: tasks[k], + }) + } + return result, nil } func (o *Order) GetLogs() *logrus.Entry { @@ -108,6 +154,20 @@ func (o *Order) GetLogs() *logrus.Entry { "orderId": o.ID, "TargetChainName": o.TargetChainName, "TokenOutName": o.TokenOutName, + "TaskId": o.TaskId, + "Amout": o.Amount, + } + if o.ProviderType != "" { + fields["ProviderType"] = o.ProviderType + } + if o.ProviderName != "" { + fields["ProviderName"] = o.ProviderName + } + if o.TokenInName != "" { + fields["TokenInName"] = o.TokenInName + } + if o.SourceChainName != "" { + fields["SourceChainName"] = o.SourceChainName } return logrus.WithFields(logrus.Fields{ "order": fields, diff --git a/utils/bot/balance_on_chain/monitor.go b/utils/bot/balance_on_chain/monitor.go new file mode 100644 index 0000000..eb41451 --- /dev/null +++ b/utils/bot/balance_on_chain/monitor.go @@ -0,0 +1,61 @@ +package balance_on_chain + +import ( + "context" + "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" + "github.com/shopspring/decimal" + "github.com/sirupsen/logrus" + "omni-balance/utils/bot" +) + +func init() { + boc := BalanceOnChain{} + bot.Register(boc.Name(), boc) +} + +type BalanceOnChain struct { +} + +func (b BalanceOnChain) Name() string { + return "balance_on_chain" +} + +func (b BalanceOnChain) Check(ctx context.Context, args bot.Params) ([]bot.Task, bot.ProcessType, error) { + var ( + config = args.Conf + info = args.Info + tasks []bot.Task + log = logrus.WithFields(logrus.Fields{ + "wallet": args.Info.Wallet.GetAddress(), + "chain": args.Info.Chain, + "tokenName": args.Info.TokenName, + "job": b.Name(), + }) + ) + token := config.GetTokenInfoOnChain(info.TokenName, args.Info.Chain) + balance, err := info.Wallet.GetBalance(ctx, common.HexToAddress(token.ContractAddress), token.Decimals, args.Client) + if err != nil { + return nil, "", errors.Wrap(err, "get balance error") + } + threshold := config.GetTokenThreshold(info.Wallet.GetAddress().Hex(), info.TokenName, info.Chain) + if balance.GreaterThan(threshold) { + log.Debugf("balance on chain is enough, balance: %s;threshold: %s", balance.String(), threshold.String()) + return nil, "", nil + } + amount := config.GetTokenPurchaseAmount(info.Wallet.GetAddress().Hex(), info.TokenName, info.Chain) + if balance.Add(amount).LessThanOrEqual(threshold) { + newAmount := threshold.Add(threshold.Mul(decimal.RequireFromString("0.3"))) + log.Infof("The %s current balance is %s, amount in config is %s, balance(%s) + amount(%s) <= threshold(%s), so set amount to %s", + info.Wallet.GetAddress(), balance, amount, balance, amount, threshold, newAmount) + amount = newAmount + } + log.Debugf("balance on chain is not enough, balance: %s;threshold: %s", balance.String(), threshold.String()) + tasks = append(tasks, bot.Task{ + TokenOutChainName: args.Info.Chain, + Wallet: info.Wallet.GetAddress().Hex(), + TokenOutName: info.TokenName, + Amount: amount, + }) + return tasks, bot.Queue, nil +} diff --git a/utils/bot/bot.go b/utils/bot/bot.go new file mode 100644 index 0000000..4f9081f --- /dev/null +++ b/utils/bot/bot.go @@ -0,0 +1,52 @@ +package bot + +import ( + "context" + "github.com/ethereum/go-ethereum/ethclient/simulated" + "github.com/shopspring/decimal" + "omni-balance/utils/configs" + "omni-balance/utils/provider" + "omni-balance/utils/wallets" +) + +type ProcessType string + +const ( + Parallel ProcessType = "parallel" + Queue ProcessType = "queue" +) + +func (p ProcessType) String() string { + return string(p) +} + +type Params struct { + Conf configs.Config + Info Config + Client simulated.Client +} + +type Bot interface { + Check(ctx context.Context, args Params) ([]Task, ProcessType, error) + Name() string +} + +type Config struct { + Wallet wallets.Wallets + TokenName string `json:"token_name"` + Chain string `json:"chains"` +} + +type Task struct { + Wallet string `json:"wallet" gorm:"type:varchar(64)"` + TokenInName string `json:"token_in_name"` + TokenOutName string `json:"token_out_name" gorm:"type:varchar(64)"` + TokenInChainName string `json:"source_chain_name"` + TokenOutChainName string `json:"target_chain_name"` + CurrentChainName string `json:"current_chain_name" gorm:"type:varchar(64)"` + Amount decimal.Decimal `json:"amount" gorm:"type:decimal(32,16); default:0"` + Status provider.TxStatus `json:"status" gorm:"type:int; default:0;index"` + ProviderType configs.ProviderType `json:"provider_type" gorm:"type:varchar(64)"` + ProviderName string `json:"provider_name" gorm:"type:varchar(64)"` + Order interface{} `json:"order"` +} diff --git a/utils/bot/init.go b/utils/bot/init.go new file mode 100644 index 0000000..d6ef38f --- /dev/null +++ b/utils/bot/init.go @@ -0,0 +1,20 @@ +package bot + +var ( + monitors map[string]Bot +) + +func Register(name string, monitor Bot) { + if monitors == nil { + monitors = make(map[string]Bot) + } + monitors[name] = monitor +} + +func ListMonitors() map[string]Bot { + return monitors +} + +func GetMonitor(name string) Bot { + return monitors[name] +} diff --git a/utils/configs/config.go b/utils/configs/config.go index d040afb..8d862a7 100644 --- a/utils/configs/config.go +++ b/utils/configs/config.go @@ -15,17 +15,17 @@ import ( "time" ) -// LiquidityProviderType liquidity provider type -type LiquidityProviderType string +// ProviderType liquidity providersMap type +type ProviderType string type DbType string const ( // CEX centralized exchange - CEX LiquidityProviderType = "CEX" + CEX ProviderType = "CEX" // DEX decentralized exchange - DEX LiquidityProviderType = "DEX" + DEX ProviderType = "DEX" // Bridge cross-chain bridge - Bridge LiquidityProviderType = "Bridge" + Bridge ProviderType = "Bridge" ) const ( @@ -41,15 +41,15 @@ type Config struct { Debug bool `json:"debug" yaml:"debug" comment:"Debug mode"` // Chains need to be monitored - Chains []Chain `json:"chains" yaml:"chains" comment:"Chains"` - chains map[string]Chain + Chains []Chain `json:"chains" yaml:"chains" comment:"Chains"` + chainsMap map[string]Chain // SourceToken used to buy other tokens - SourceToken []SourceToken `json:"source_token" yaml:"source_token" comment:"Source token used to buy other tokens"` - sourceToken map[string]SourceToken + SourceToken []SourceToken `json:"source_token" yaml:"source_token" comment:"Source token used to buy other tokens"` + sourceTokenMap map[string]SourceToken - LiquidityProviders []LiquidityProvider `json:"liquidity_providers" yaml:"liquidity_providers" comment:"Liquidity providers"` - liquidityProvider map[LiquidityProviderType][]LiquidityProvider + Providers []Provider `json:"providers" yaml:"providers" comment:"providers"` + providersMap map[ProviderType][]Provider Wallets []Wallet `json:"wallets" yaml:"wallets" comment:"Wallets need to rebalance"` wallets map[string]Wallet @@ -92,10 +92,11 @@ type Operator struct { } type WalletToken struct { - Name string `json:"name" yaml:"name" comment:"Token name"` - Amount decimal.Decimal `json:"amount" yaml:"amount" comment:"The number of each rebalance"` - Threshold decimal.Decimal `json:"threshold" yaml:"threshold" comment:"Threshold when the token balance is less than the threshold, the rebalance will be triggered"` - Chains []string `json:"chains" yaml:"chains" comment:"The chains need to be monitored"` + Name string `json:"name" yaml:"name" comment:"Token name"` + Amount decimal.Decimal `json:"amount" yaml:"amount" comment:"The number of each rebalance"` + Threshold decimal.Decimal `json:"threshold" yaml:"threshold" comment:"Threshold when the token balance is less than the threshold, the rebalance will be triggered"` + Chains []string `json:"chains" yaml:"chains" comment:"The chains need to be monitored"` + MonitorTypes map[string]string `json:"monitorTypes" yaml:"monitorTypes" comment:"Which monitoring type should this token use on the specified chain. Default: 'balance_on_chain'. Support types: balance_on_chain='The real balance of the address on the chain''"` } type CrossChain struct { @@ -123,14 +124,14 @@ type TargetToken struct { Threshold decimal.Decimal `json:"threshold" yaml:"threshold" help:"Threshold when the token balance is less than the threshold, the rebalance will be triggered"` } -// LiquidityProvider liquidity provider -type LiquidityProvider struct { - // Type liquidity provider type - Type LiquidityProviderType `json:"type" yaml:"type" comment:"Type liquidity provider type"` - // LiquidityName liquidity provider name - LiquidityName string `json:"liquidity_name" yaml:"liquidity_name" comment:"LiquidityName liquidity provider name"` - // Config liquidity provider config, depend on the type - Config map[string]interface{} `json:"config" yaml:"config" comment:"Config liquidity provider config, depend on the type"` +// Provider liquidity providersMap +type Provider struct { + // Type liquidity providersMap type + Type ProviderType `json:"type" yaml:"type" comment:"Type liquidity providersMap type"` + // Name liquidity providersMap name + Name string `json:"name" yaml:"name" comment:"providersMap name"` + // Config liquidity providersMap config, depend on the type + Config map[string]interface{} `json:"config" yaml:"config" comment:"Config liquidity providersMap config, depend on the type"` } type DbConfig struct { @@ -173,7 +174,7 @@ func (c Chain) GetToken(tokenName string) Token { } func (c *Config) Init() *Config { - c.chains = make(map[string]Chain) + c.chainsMap = make(map[string]Chain) oldName2NewName := make(map[string]string) for index, v := range c.Chains { @@ -181,13 +182,12 @@ func (c *Config) Init() *Config { if newName == "" { panic(fmt.Sprintf("chain id %d not found", v.Id)) } - // 规范所有链的名字 oldName2NewName[v.Name] = newName c.Chains[index].Name = newName - c.chains[newName] = c.Chains[index] + c.chainsMap[newName] = c.Chains[index] } - c.sourceToken = make(map[string]SourceToken) + c.sourceTokenMap = make(map[string]SourceToken) for index, v := range c.SourceToken { var chains []string for _, v := range v.Chains { @@ -198,18 +198,21 @@ func (c *Config) Init() *Config { panic(fmt.Sprintf("chain %s not found", v)) } c.SourceToken[index].Chains = chains - c.sourceToken[v.Name] = c.SourceToken[index] + c.sourceTokenMap[v.Name] = c.SourceToken[index] } - c.liquidityProvider = make(map[LiquidityProviderType][]LiquidityProvider) - for _, v := range c.LiquidityProviders { - c.liquidityProvider[v.Type] = append(c.liquidityProvider[v.Type], v) + c.providersMap = make(map[ProviderType][]Provider) + for _, v := range c.Providers { + c.providersMap[v.Type] = append(c.providersMap[v.Type], v) } c.wallets = make(map[string]Wallet) for walletIndex, v := range c.Wallets { for index, t := range v.Tokens { - var chains []string + var ( + chains []string + monitorTypes = make(map[string]string) + ) for _, v := range t.Chains { if newName, ok := oldName2NewName[v]; ok { chains = append(chains, newName) @@ -217,7 +220,15 @@ func (c *Config) Init() *Config { } panic(fmt.Sprintf("chain %s not found", v)) } + for name, v := range t.MonitorTypes { + if newName, ok := oldName2NewName[v]; ok { + monitorTypes[name] = newName + continue + } + panic(fmt.Sprintf("chain %s not found", v)) + } c.Wallets[walletIndex].Tokens[index].Chains = chains + c.Wallets[walletIndex].Tokens[index].MonitorTypes = monitorTypes } c.wallets[v.Address] = c.Wallets[walletIndex] } @@ -237,13 +248,13 @@ func (c *Config) Check() error { } for index, v := range v.Tokens { if v.ContractAddress == "" { - return errors.Errorf("chains[%d]tokens[%d]contract_address must be set", chainIndex, index) + return errors.Errorf("chainsMap[%d]tokens[%d]contract_address must be set", chainIndex, index) } if v.Name == "" { - return errors.Errorf("chains[%d]tokens[%d]name must be set", chainIndex, index) + return errors.Errorf("chainsMap[%d]tokens[%d]name must be set", chainIndex, index) } if v.Decimals == 0 { - return errors.Errorf("chains[%d]tokens[%d]decimals must be set", chainIndex, index) + return errors.Errorf("chainsMap[%d]tokens[%d]decimals must be set", chainIndex, index) } } } @@ -253,30 +264,30 @@ func (c *Config) Check() error { } for index, v := range c.SourceToken { for chainIndex, chain := range v.Chains { - if _, ok := c.chains[chain]; !ok { - return errors.Errorf("source_token[%d]chains[%d] not in chains", index, chainIndex) + if _, ok := c.chainsMap[chain]; !ok { + return errors.Errorf("source_token[%d]chainsMap[%d] not in chainsMap", index, chainIndex) } var ok bool - for _, token := range c.chains[chain].Tokens { + for _, token := range c.chainsMap[chain].Tokens { if strings.EqualFold(token.Name, v.Name) { ok = true } } if !ok { - return errors.Errorf("source_token[%d] token name not in chains", index) + return errors.Errorf("source_token[%d] token name not in chainsMap", index) } } } - if len(c.LiquidityProviders) == 0 { + if len(c.Providers) == 0 { return errors.New("liquidity_providers must be set") } - for index, v := range c.LiquidityProviders { + for index, v := range c.Providers { if v.Type == "" { return errors.Errorf("liquidity_providers[%d]type must be set", index) } - if v.LiquidityName == "" { + if v.Name == "" { return errors.Errorf("liquidity_providers[%d]liquidity_name must be set", index) } } @@ -303,20 +314,20 @@ func (c *Config) Check() error { } if len(token.Chains) == 0 { - return errors.Errorf("wallets[%d]tokens[%d]chains must be set", index, tokenIndex) + return errors.Errorf("wallets[%d]tokens[%d]chainsMap must be set", index, tokenIndex) } for chainIndex, chain := range token.Chains { - if _, ok := c.chains[chain]; !ok { - return errors.Errorf("wallets[%d]tokens[%d]chains[%d] not in chains", index, tokenIndex, chainIndex) + if _, ok := c.chainsMap[chain]; !ok { + return errors.Errorf("wallets[%d]tokens[%d]chainsMap[%d] not in chainsMap", index, tokenIndex, chainIndex) } var ok bool - for _, chainToken := range c.chains[chain].Tokens { + for _, chainToken := range c.chainsMap[chain].Tokens { if strings.EqualFold(chainToken.Name, chainToken.Name) { ok = true } } if !ok { - return errors.Errorf("wallets[%d]tokens[%d] token name not in chains", index, tokenIndex) + return errors.Errorf("wallets[%d]tokens[%d] token name not in chainsMap", index, tokenIndex) } } } @@ -337,9 +348,9 @@ func (c *Config) Check() error { return nil } -func (c *Config) GetProvidersConfig(name string, providerType LiquidityProviderType, dest interface{}) error { - for _, provider := range c.liquidityProvider[providerType] { - if !strings.EqualFold(provider.LiquidityName, name) { +func (c *Config) GetProvidersConfig(name string, providerType ProviderType, dest interface{}) error { + for _, provider := range c.providersMap[providerType] { + if !strings.EqualFold(provider.Name, name) { continue } @@ -349,11 +360,11 @@ func (c *Config) GetProvidersConfig(name string, providerType LiquidityProviderT } return json.Unmarshal(conf, dest) } - return errors.Errorf("provider %s not found", name) + return errors.Errorf("providersMap %s not found", name) } func (c *Config) GetChainConfig(chainName string) Chain { - chain := c.chains[chainName] + chain := c.chainsMap[chainName] if chain.Name == "" { logrus.Panicf("chain %s not found", chainName) } @@ -392,7 +403,7 @@ func (c *Config) GetWalletConfig(wallet string) Wallet { } func (c *Config) GetTokenInfoOnChain(tokenName, chainName string) Token { - for _, token := range c.chains[chainName].Tokens { + for _, token := range c.chainsMap[chainName].Tokens { if !strings.EqualFold(token.Name, tokenName) { continue } @@ -462,10 +473,10 @@ func (c *Config) GetTaskInterval(name string, defaultInterval time.Duration) tim } func (c *Config) IsNativeToken(chainName, tokenName string) bool { - if strings.EqualFold(c.chains[chainName].NativeToken, tokenName) { + if strings.EqualFold(c.chainsMap[chainName].NativeToken, tokenName) { return true } - for _, token := range c.chains[chainName].Tokens { + for _, token := range c.chainsMap[chainName].Tokens { if strings.EqualFold(token.Name, tokenName) { return strings.EqualFold(token.ContractAddress, constant.ZeroAddress.Hex()) } diff --git a/utils/provider/bridge/darwinia/darwinia.go b/utils/provider/bridge/darwinia/darwinia.go index 1435f5a..061b308 100644 --- a/utils/provider/bridge/darwinia/darwinia.go +++ b/utils/provider/bridge/darwinia/darwinia.go @@ -310,6 +310,6 @@ func (b *Bridge) Name() string { return "darwinia-bridge" } -func (b *Bridge) Type() configs.LiquidityProviderType { +func (b *Bridge) Type() configs.ProviderType { return configs.Bridge } diff --git a/utils/provider/bridge/darwinia/darwinia2ethereum_test.go b/utils/provider/bridge/darwinia/darwinia2ethereum_test.go index 1e38a95..7601760 100644 --- a/utils/provider/bridge/darwinia/darwinia2ethereum_test.go +++ b/utils/provider/bridge/darwinia/darwinia2ethereum_test.go @@ -22,7 +22,7 @@ func TestDarwinia2ethereum(t *testing.T) { Client: args.client, }) assert.NoError(t, err) - utils.AssertEqualFold(t, common.Bytes2Hex(tx.Data), "7104aad500000000000000000000000000000000000000000000000000000000000000010000000000000000000000004ca75992d2750bec270731a72dfdede6b9e71cc700000000000000000000000043ef13e84d9992d1461a1f90cac4653658cea4fd00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000018fc32b2d7900000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000001800000000000000000000000000000000000000000000000000000000000000080000000000000000000000000c29dcb1f12a1618262ef9fba673b77140adc02d60000000000000000000000000000000000000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000001443ef13e84d9992d1461a1f90cac4653658cea4fd00000000000000000000000000000000000000000000000000000000000000000000000000000000000000800000000000000000000000000000000000000000000000000000000000016fe400000000000000000000000043ef13e84d9992d1461a1f90cac4653658cea4fd00000000000000000000000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000000000000") + utils.AssertEqualFold(t, common.Bytes2Hex(tx.Data), "7104AAD500000000000000000000000000000000000000000000000000000000000000010000000000000000000000004CA75992D2750BEC270731A72DFDEDE6B9E71CC700000000000000000000000043EF13E84D9992D1461A1F90CAC4653658CEA4FD00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000018FC32B2D7900000000000000000000000000000000000000000000000000000000000000E000000000000000000000000000000000000000000000000000000000000001800000000000000000000000000000000000000000000000000000000000000080000000000000000000000000C29DCB1F12A1618262EF9FBA673B77140ADC02D60000000000000000000000000000000000000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000001443EF13E84D9992D1461A1F90CAC4653658CEA4FD00000000000000000000000000000000000000000000000000000000000000000000000000000000000000800000000000000000000000000000000000000000000000000000000000016FE400000000000000000000000043EF13E84D9992D1461A1F90CAC4653658CEA4FD00000000000000000000000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000000000000") assert.Equal(t, tx.To.Hex(), "0x092e19C46C9daAb7824393f1CD9c22f5BEA13560") tx, err = Darwinia2ethereum(args.ctx, SwapParams{ diff --git a/utils/provider/bridge/helix/helix.go b/utils/provider/bridge/helix/helix.go index a385ad4..93fcb8e 100644 --- a/utils/provider/bridge/helix/helix.go +++ b/utils/provider/bridge/helix/helix.go @@ -243,6 +243,6 @@ func (b *Bridge) Name() string { return "helixbridge" } -func (b *Bridge) Type() configs.LiquidityProviderType { +func (b *Bridge) Type() configs.ProviderType { return configs.Bridge } diff --git a/utils/provider/bridge/li/li.go b/utils/provider/bridge/li/li.go index 9658f51..b42c7e3 100644 --- a/utils/provider/bridge/li/li.go +++ b/utils/provider/bridge/li/li.go @@ -288,6 +288,6 @@ func (l Li) Name() string { return "li" } -func (l Li) Type() configs.LiquidityProviderType { +func (l Li) Type() configs.ProviderType { return configs.Bridge } diff --git a/utils/provider/bridge/okx/okx.go b/utils/provider/bridge/okx/okx.go index 6887b61..9338279 100644 --- a/utils/provider/bridge/okx/okx.go +++ b/utils/provider/bridge/okx/okx.go @@ -248,7 +248,6 @@ func (o *OKX) Swap(ctx context.Context, args provider.SwapParams) (provider.Swap }, client) if err != nil { args.RecordFn(sh.SetActions(SourceChainSendingAction).SetStatus(provider.TxStatusFailed).Out(), err) - args.RecordFn(sh.SetStatus(provider.TxStatusFailed).Out(), err) return sr.SetError(err).SetStatus(provider.TxStatusFailed).Out(), errors.Wrap(err, "send tx error") } log = log.WithField("tx", txHash) @@ -293,6 +292,6 @@ func (o *OKX) Name() string { return "okx" } -func (o *OKX) Type() configs.LiquidityProviderType { +func (o *OKX) Type() configs.ProviderType { return configs.Bridge } diff --git a/utils/provider/bridge/routernitro/routernitro.go b/utils/provider/bridge/routernitro/routernitro.go index f8ba47f..0a5d411 100644 --- a/utils/provider/bridge/routernitro/routernitro.go +++ b/utils/provider/bridge/routernitro/routernitro.go @@ -264,7 +264,7 @@ func (r Routernitro) Swap(ctx context.Context, args provider.SwapParams) (provid tx = txHash.Hex() } - args.RecordFn(sh.SetStatus(provider.TxStatusPending).Out()) + args.RecordFn(sh.SetActions(WaitForTxAction).SetStatus(provider.TxStatusPending).Out()) log.Debugf("waiting for tx on chain") if err := args.Sender.WaitTransaction(ctx, common.HexToHash(tx), client); err != nil { args.RecordFn(sh.SetActions(WaitForTxAction).SetStatus(provider.TxStatusFailed).Out(), err) @@ -295,6 +295,6 @@ func (r Routernitro) Name() string { return "router_nitro" } -func (r Routernitro) Type() configs.LiquidityProviderType { +func (r Routernitro) Type() configs.ProviderType { return configs.Bridge } diff --git a/utils/provider/bridge/routernitro/utils.go b/utils/provider/bridge/routernitro/utils.go index 7385b34..81072d0 100644 --- a/utils/provider/bridge/routernitro/utils.go +++ b/utils/provider/bridge/routernitro/utils.go @@ -115,7 +115,6 @@ func (r Routernitro) GetBestQuote(ctx context.Context, args provider.SwapParams) return errors.Wrap(err, "get quote") } currentLog = currentLog.WithField("quote", utils.ToMap(quoteData)) - currentLog.Debug("get quote success") minimumReceived := chains.WeiToEth(quoteData.Destination.TokenAmount.BigInt(), tokenOut.Decimals) needBalance := tokenInTestBalance.Div(minimumReceived).Mul(args.Amount) diff --git a/utils/provider/cex/gate/gate.io.go b/utils/provider/cex/gate/gate.io.go index a29dc63..8305f1e 100644 --- a/utils/provider/cex/gate/gate.io.go +++ b/utils/provider/cex/gate/gate.io.go @@ -404,6 +404,6 @@ func (g *Gate) Name() string { return "gate.io" } -func (g *Gate) Type() configs.LiquidityProviderType { +func (g *Gate) Type() configs.ProviderType { return configs.CEX } diff --git a/utils/provider/cex/gate/utils.go b/utils/provider/cex/gate/utils.go index 732b971..85f950a 100644 --- a/utils/provider/cex/gate/utils.go +++ b/utils/provider/cex/gate/utils.go @@ -74,6 +74,15 @@ func ChainName2GateChainName(chainName string) string { return chainName } +func GateChainName2StandardName(chainName string) string { + for k, v := range gateChain2StandardName { + if strings.EqualFold(chainName, k) { + return v + } + } + return chainName +} + func (g *Gate) ticker(pairs string) (TickerResult, error) { req, err := http.NewRequest("GET", fmt.Sprintf("https://data.gateapi.io/api/1/ticker/%s", pairs), nil) if err != nil { @@ -201,3 +210,42 @@ func (g *Gate) IsVerifiedAddress(ctx context.Context, address, tokenName, chainN } return isSenderInVerifiedAddress, nil } + +// GetDepositAddress retrieves the deposit address for a token. +// This method calls the GateAPI's GetDepositAddress interface, using tokenName and chainName to filter for valid deposit addresses. +// Parameters: +// +// ctx: Context object for canceling requests or other operations. +// tokenName: The name of the token. +// chainName: The name of the blockchain. If chainName is empty, all deposit addresses for the specified token are returned. +// +// Returns: +// +// []gateapi.MultiChainAddressItem: A list of deposit addresses that meet the specified criteria. +// error: If an error occurs while fetching the address, an error message is returned. +func (g *Gate) GetDepositAddress(ctx context.Context, tokenName, chainName string) ([]gateapi.MultiChainAddressItem, error) { + depositInfo, _, err := g.client.WalletApi.GetDepositAddress(ctx, TokenName2GateTokenName(tokenName)) + if err != nil { + return nil, errors.Wrap(err, "get deposit address") + } + var result []gateapi.MultiChainAddressItem + for _, v := range depositInfo.MultichainAddresses { + if v.ObtainFailed != 0 { + continue + } + if chainName != "" && !strings.EqualFold(GateChainName2StandardName(v.Chain), chainName) { + continue + } + result = append(result, gateapi.MultiChainAddressItem{ + Chain: GateChainName2StandardName(v.Chain), + Address: v.Address, + PaymentId: v.PaymentId, + PaymentName: v.PaymentName, + ObtainFailed: v.ObtainFailed, + }) + } + if len(result) == 0 { + return nil, errors.Errorf("no deposit address for %s", tokenName) + } + return result, nil +} diff --git a/utils/provider/dex/snowswap/snowswap.go b/utils/provider/dex/snowswap/snowswap.go index 1d3df8f..5a6d74d 100644 --- a/utils/provider/dex/snowswap/snowswap.go +++ b/utils/provider/dex/snowswap/snowswap.go @@ -35,7 +35,7 @@ func (s Snowswap) Name() string { panic("implement me") } -func (s Snowswap) Type() configs.LiquidityProviderType { +func (s Snowswap) Type() configs.ProviderType { //TODO implement me panic("implement me") } diff --git a/utils/provider/dex/uniswap/uniswap.go b/utils/provider/dex/uniswap/uniswap.go index f146097..0f9210f 100644 --- a/utils/provider/dex/uniswap/uniswap.go +++ b/utils/provider/dex/uniswap/uniswap.go @@ -244,7 +244,7 @@ func (u *Uniswap) Name() string { return "uniswap" } -func (u *Uniswap) Type() configs.LiquidityProviderType { +func (u *Uniswap) Type() configs.ProviderType { return configs.DEX } diff --git a/utils/provider/init.go b/utils/provider/init.go index 89ecba4..30b3d9a 100644 --- a/utils/provider/init.go +++ b/utils/provider/init.go @@ -9,33 +9,33 @@ import ( type InitFunc func(conf configs.Config, noInit ...bool) (Provider, error) var ( - providers = make(map[configs.LiquidityProviderType][]InitFunc) + providers = make(map[configs.ProviderType][]InitFunc) m sync.Mutex ) -func Register(providerType configs.LiquidityProviderType, provider InitFunc) { +func Register(providerType configs.ProviderType, provider InitFunc) { m.Lock() defer m.Unlock() providers[providerType] = append(providers[providerType], provider) } -func ListProviders() map[configs.LiquidityProviderType][]InitFunc { - var result = make(map[configs.LiquidityProviderType][]InitFunc) +func ListProviders() map[configs.ProviderType][]InitFunc { + var result = make(map[configs.ProviderType][]InitFunc) for k, v := range providers { result[k] = v } return result } -func ListProvidersByConfig(conf configs.Config) map[configs.LiquidityProviderType][]InitFunc { +func ListProvidersByConfig(conf configs.Config) map[configs.ProviderType][]InitFunc { m.Lock() defer m.Unlock() var ( providerNames = make(map[string]struct{}) - result = make(map[configs.LiquidityProviderType][]InitFunc) + result = make(map[configs.ProviderType][]InitFunc) ) - for _, v := range conf.LiquidityProviders { - providerNames[v.LiquidityName] = struct{}{} + for _, v := range conf.Providers { + providerNames[v.Name] = struct{}{} } for providerType, providerInitFuncs := range providers { for index, fn := range providerInitFuncs { @@ -50,11 +50,11 @@ func ListProvidersByConfig(conf configs.Config) map[configs.LiquidityProviderTyp return result } -func LiquidityProviderTypeAndConf(providerType configs.LiquidityProviderType, conf configs.Config) []InitFunc { +func LiquidityProviderTypeAndConf(providerType configs.ProviderType, conf configs.Config) []InitFunc { return ListProvidersByConfig(conf)[providerType] } -func GetProvider(providerType configs.LiquidityProviderType, name string) (InitFunc, error) { +func GetProvider(providerType configs.ProviderType, name string) (InitFunc, error) { for _, fn := range providers[providerType] { p, err := fn(configs.Config{}, true) if err != nil { diff --git a/utils/provider/provider.go b/utils/provider/provider.go index d3d22cb..2d18e60 100644 --- a/utils/provider/provider.go +++ b/utils/provider/provider.go @@ -79,7 +79,7 @@ type Provider interface { // Name get provider name Name() string - Type() configs.LiquidityProviderType + Type() configs.ProviderType } type CheckParams struct { @@ -118,16 +118,16 @@ type SwapParams struct { } type SwapResult struct { - Error string `json:"error,omitempty"` - TokenInChainName string `json:"source_chain_name,omitempty"` - TokenInName string `json:"token_in_name,omitempty"` - ProviderType configs.LiquidityProviderType `json:"type,omitempty"` - ProviderName string `json:"provider_name,omitempty"` - OrderId string `json:"provider_order_id,omitempty"` - Order interface{} `json:"order,omitempty"` - Status TxStatus `json:"status,omitempty"` - CurrentChain string `json:"current_chain_name,omitempty"` - Receiver string `json:"receiver,omitempty"` + Error string `json:"error,omitempty"` + TokenInChainName string `json:"source_chain_name,omitempty"` + TokenInName string `json:"token_in_name,omitempty"` + ProviderType configs.ProviderType `json:"type,omitempty"` + ProviderName string `json:"provider_name,omitempty"` + OrderId string `json:"provider_order_id,omitempty"` + Order interface{} `json:"order,omitempty"` + Status TxStatus `json:"status,omitempty"` + CurrentChain string `json:"current_chain_name,omitempty"` + Receiver string `json:"receiver,omitempty"` // Tx is the transaction hash Tx string `json:"tx,omitempty"` } @@ -151,7 +151,7 @@ func (s *SwapResult) SetTokenInName(name string) *SwapResult { return s } -func (s *SwapResult) SetProviderType(tp configs.LiquidityProviderType) *SwapResult { +func (s *SwapResult) SetProviderType(tp configs.ProviderType) *SwapResult { s.ProviderType = tp return s } @@ -246,7 +246,7 @@ func (s *SwapHistory) SetProviderName(providerName string) *SwapHistory { return s } -func (s *SwapHistory) SetProviderType(providerType configs.LiquidityProviderType) *SwapHistory { +func (s *SwapHistory) SetProviderType(providerType configs.ProviderType) *SwapHistory { s.ProviderType = string(providerType) return s } diff --git a/utils/provider/utils.go b/utils/provider/utils.go index f3097b4..6cd0d29 100644 --- a/utils/provider/utils.go +++ b/utils/provider/utils.go @@ -40,6 +40,8 @@ func action2Int(action string) int { } func Transfer(ctx context.Context, conf configs.Config, args SwapParams, client simulated.Client) (SwapResult, error) { + args.SourceToken = args.TargetToken + args.SourceChain = args.TargetChain var ( log = utils.GetLogFromCtx(ctx) @@ -108,10 +110,11 @@ func Transfer(ctx context.Context, conf configs.Config, args SwapParams, client ToAddress: common.HexToAddress(args.Receiver), AmountWei: decimal.NewFromBigInt(chains.EthToWei(args.Amount, token.Decimals), 0), }) - sr = sr.SetOrder(common.Bytes2Hex(transaction.Data)) + if err != nil { return sr.SetStatus(TxStatusFailed).SetError(err).Out(), errors.Wrap(err, "send token error") } + sr = sr.SetOrder(common.Bytes2Hex(transaction.Data)) log.Debugf("%s transfer %s %s to %s", args.Sender.GetAddress(true), args.TargetToken, args.Amount, args.Sender.GetAddress()) recordFn(sh.SetStatus(TxStatusPending).SetActions(transferSent).Out()) @@ -157,8 +160,8 @@ func GetTokenCrossChainProviders(ctx context.Context, args GetTokenCrossChainPro continue } var hasConfig bool - for _, v := range args.Conf.LiquidityProviders { - if v.Type != configs.Bridge || v.LiquidityName == bridge.Name() { + for _, v := range args.Conf.Providers { + if v.Type != configs.Bridge || v.Name == bridge.Name() { continue } hasConfig = true @@ -181,6 +184,7 @@ func GetTokenCrossChainProviders(ctx context.Context, args GetTokenCrossChainPro } type WithNotifyParams struct { + TaskId string OrderId uint Receiver common.Address TokenIn, TokenOut, TokenInChain, TokenOutChain, ProviderName string @@ -203,6 +207,9 @@ func WithNotify(ctx context.Context, args WithNotifyParams) context.Context { fields["tokenIn"] = fmt.Sprintf("%s on %s", fields["tokenIn"], args.TokenInChain) } } + if args.TaskId != "" { + fields["taskId"] = args.TaskId + } if args.OrderId != 0 { fields["orderId"] = fmt.Sprintf("%d", args.OrderId) diff --git a/utils/util.go b/utils/util.go index a2639d3..29fdca6 100644 --- a/utils/util.go +++ b/utils/util.go @@ -15,6 +15,7 @@ import ( "net/http" "omni-balance/utils/constant" "reflect" + "runtime/debug" "strings" "sync/atomic" "testing" @@ -146,6 +147,9 @@ func Request(ctx context.Context, method string, url string, body io.Reader, des logrus.Debugf("request: %s %s, try %d", req.Method, req.URL, count) count++ resp, err := new(http.Client).Do(req) + if errors.Is(err, context.Canceled) { + return context.Canceled + } if err != nil { logrus.Debugf("request failed: %s", err) time.Sleep(time.Second) @@ -219,6 +223,18 @@ func Go(f func()) { func Recover() { if err := recover(); err != nil { + debug.PrintStack() logrus.Errorf("panic: %s", err) } } + +func Object2JsonRawMessage(v interface{}) *json.RawMessage { + if v == nil { + return nil + } + data, err := json.Marshal(v) + if err != nil { + return nil + } + return (*json.RawMessage)(&data) +} diff --git a/utils/wallet_monitor/monitor.go b/utils/wallet_monitor/monitor.go deleted file mode 100644 index 43337d4..0000000 --- a/utils/wallet_monitor/monitor.go +++ /dev/null @@ -1,148 +0,0 @@ -package wallet_monitor - -import ( - "context" - "github.com/ethereum/go-ethereum/common" - "github.com/pkg/errors" - "github.com/shopspring/decimal" - "github.com/sirupsen/logrus" - "omni-balance/utils" - "omni-balance/utils/chains" - "omni-balance/utils/configs" - "omni-balance/utils/wallets" - "sync" -) - -type Monitor struct { - config configs.Config -} - -type ReBalanceTokenChain struct { - BuyTokenInfo - ChainName string `json:"chain_name"` -} - -type BuyTokenInfo struct { - TokenBalance decimal.Decimal `json:"balance"` - Amount decimal.Decimal `json:"amount"` -} - -type Result struct { - // need rebalance wallet - Wallet string `json:"wallet"` - // need rebalance tokens - Tokens []ReBalanceToken `json:"tokens"` -} - -type ReBalanceToken struct { - Name string `json:"name"` - Chains []ReBalanceTokenChain `json:"chains"` -} - -type IgnoreToken struct { - Name string `json:"name"` - Chain string `json:"chain"` - Address string `json:"wallet"` -} - -func NewMonitor(conf configs.Config) *Monitor { - return &Monitor{config: conf} -} - -func (m *Monitor) GetBalance(ctx context.Context, wallet wallets.Wallets, tokenName, chainName string) (balance decimal.Decimal, err error) { - var ( - chain = m.config.GetChainConfig(chainName) - token = chain.GetToken(tokenName) - ) - - if len(chain.RpcEndpoints) == 0 { - return decimal.Zero, errors.New("rpc endpoints is empty") - } - client, err := chains.NewTryClient(ctx, chain.RpcEndpoints) - if err != nil { - return decimal.Zero, errors.Wrap(err, "dial rpc") - } - defer client.Close() - return wallet.GetExternalBalance(ctx, common.HexToAddress(token.ContractAddress), token.Decimals, client) -} - -func (m *Monitor) Check(ctx context.Context, ignoreToken ...IgnoreToken) (result []Result, err error) { - var ( - // balances {"wallet": {"token_name": "chain_name": BuyTokenInfo}} - balances = map[string]map[string][]ReBalanceTokenChain{} - w sync.WaitGroup - mutex sync.Mutex - ignoreTokens = make(map[string]map[string]map[string]struct{}) - ) - for _, token := range ignoreToken { - if _, ok := ignoreTokens[token.Address]; !ok { - ignoreTokens[token.Address] = make(map[string]map[string]struct{}) - } - if _, ok := ignoreTokens[token.Address][token.Name]; !ok { - ignoreTokens[token.Address][token.Name] = make(map[string]struct{}) - } - ignoreTokens[token.Address][token.Name][token.Chain] = struct{}{} - } - - appendBalances := func(wallet, tokenName string, rbtc ReBalanceTokenChain) { - mutex.Lock() - defer mutex.Unlock() - if _, ok := balances[wallet]; !ok { - balances[wallet] = make(map[string][]ReBalanceTokenChain) - } - balances[wallet][tokenName] = append(balances[wallet][tokenName], rbtc) - } - for _, wallet := range m.config.Wallets { - w.Add(1) - go func(wallet configs.Wallet) { - defer utils.Recover() - defer w.Done() - for _, token := range wallet.Tokens { - for _, chainName := range token.Chains { - if _, ok := ignoreTokens[wallet.Address][token.Name][chainName]; ok { - continue - } - balance, err := m.GetBalance(ctx, m.config.GetWallet(wallet.Address), token.Name, chainName) - if err != nil { - logrus.Errorf("get %s on %s balance error: %s", token.Name, chainName, err.Error()) - continue - } - - threshold := m.config.GetTokenThreshold(wallet.Address, token.Name, chainName) - logrus.Debugf("wallet %s token %s on %s balance is %s, the threshold is %s", - wallet.Address, token.Name, chainName, balance.String(), threshold.String()) - if balance.GreaterThan(threshold) { - continue - } - - amount := m.config.GetTokenPurchaseAmount(wallet.Address, token.Name, chainName) - if amount.LessThanOrEqual(decimal.Zero) { - logrus.Errorf("token %s config amount is invalid", token.Name) - continue - } - logrus.Infof("wallet %s token %s on %s need rebalance %s", - wallet.Address, token.Name, chainName, amount.String()) - appendBalances(wallet.Address, token.Name, ReBalanceTokenChain{ - BuyTokenInfo: BuyTokenInfo{ - TokenBalance: balance, - Amount: amount, - }, - ChainName: chainName, - }) - } - } - }(wallet) - } - w.Wait() - for walletAddress, item := range balances { - r := &Result{Wallet: walletAddress} - for tokenName, chains := range item { - r.Tokens = append(r.Tokens, ReBalanceToken{ - Name: tokenName, - Chains: chains, - }) - } - result = append(result, *r) - } - return -} diff --git a/utils/wallets/safe/safe.go b/utils/wallets/safe/safe.go index 41e7bb0..db8579f 100644 --- a/utils/wallets/safe/safe.go +++ b/utils/wallets/safe/safe.go @@ -120,11 +120,11 @@ func (s *Safe) SendTransaction(ctx context.Context, tx *types.LegacyTx, client s func (s *Safe) WaitTransaction(ctx context.Context, txHash common.Hash, _ simulated.Client) error { var ( log = utils.GetLogFromCtx(ctx).WithFields(utils.ToMap(s)) - t = time.NewTicker(time.Second * 2) + t = time.NewTicker(time.Second * 10) count = 0 ) defer t.Stop() - for count < 60 { // 180s + for count < 60 { select { case <-ctx.Done(): return context.Canceled