Skip to content

Commit

Permalink
Optimize jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
perrornet committed Jul 2, 2024
1 parent 07122e9 commit a020d95
Show file tree
Hide file tree
Showing 39 changed files with 1,160 additions and 809 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
local
.run
./config.yaml
omni-balance.db
start.sh
18 changes: 9 additions & 9 deletions cmd/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,26 +193,26 @@ func CreateExampleConfig(exampleConfigPath string) error {
Chains: []string{"etnereum", "arbitrum"},
},
},
LiquidityProviders: []configs.LiquidityProvider{
Providers: []configs.Provider{
{
Type: configs.CEX,
LiquidityName: "gate.io",
Type: configs.CEX,
Name: "gate.io",
Config: map[string]interface{}{
"key": "<gate_api_key>",
"secret": "<gate_api_secret>",
},
},
{
Type: configs.DEX,
LiquidityName: "uniswap",
Type: configs.DEX,
Name: "uniswap",
},
{
Type: configs.Bridge,
LiquidityName: "helixbridge",
Type: configs.Bridge,
Name: "helixbridge",
},
{
Type: configs.Bridge,
LiquidityName: "darwinia-bridge",
Type: configs.Bridge,
Name: "darwinia-bridge",
},
},
Wallets: []configs.Wallet{
Expand Down
6 changes: 4 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"net/http"
"net/url"
"omni-balance/internal/daemons"
_ "omni-balance/internal/daemons/bot"
_ "omni-balance/internal/daemons/cross_chain"
_ "omni-balance/internal/daemons/monitor"
_ "omni-balance/internal/daemons/rebalance"
_ "omni-balance/internal/daemons/market"
_ "omni-balance/internal/daemons/token_price"
"omni-balance/internal/db"
"omni-balance/internal/models"
Expand Down Expand Up @@ -66,6 +66,7 @@ func Usage(_ *cli.Context) error {
}

func Action(cli *cli.Context) error {
fmt.Println(cli.String("conf"))
if err := initConfig(ctx, cli.Bool("placeholder"), cli.String("conf"), cli.String("port")); err != nil {
return errors.Wrap(err, "init config")
}
Expand Down Expand Up @@ -104,6 +105,7 @@ func Action(cli *cli.Context) error {
if err := daemons.Run(ctx, *config); err != nil {
return errors.Wrap(err, "run daemons")
}

utils.FinishInit()

quit := make(chan os.Signal, 1)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ require (
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/rs/cors v1.7.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/satori/go.uuid v1.2.0 // indirect
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/status-im/keycard-go v0.2.0 // indirect
github.com/supranational/blst v0.3.11 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik=
github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
Expand Down
93 changes: 93 additions & 0 deletions internal/daemons/bot/bot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package bot

import (
"context"
"github.com/ethereum/go-ethereum/ethclient/simulated"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"omni-balance/internal/daemons/market"
"omni-balance/utils/bot"
"omni-balance/utils/bot/balance_on_chain"
"omni-balance/utils/chains"
"omni-balance/utils/configs"
"sync"
)

func Run(ctx context.Context, conf configs.Config) error {
existBuyTokens, err := getExistingBuyTokens()
if err != nil {
return errors.Wrap(err, "find buy tokens error")
}
var (
clients = make(map[string]simulated.Client)
)
for _, chain := range conf.Chains {
client, err := chains.NewTryClient(ctx, chain.RpcEndpoints)
if err != nil {
return errors.Wrap(err, "create client error")
}
clients[chain.Name] = client
}
defer func() {
for _, client := range clients {
client.(*chains.Client).Close()
}
}()

var (
ignoreTokens = createIgnoreTokens(existBuyTokens)
w sync.WaitGroup
)

for _, wallet := range conf.Wallets {
for _, token := range wallet.Tokens {
for _, chainName := range token.Chains {
if ignoreTokens.Contains(token.Name, chainName, wallet.Address) {
logrus.Debugf("ignore token %s on chain %s", token.Name, chainName)
continue
}
monitorType := token.MonitorTypes[chainName]
if monitorType == "" {
monitorType = balance_on_chain.BalanceOnChain{}.Name()
}
fn := process(ctx, conf, wallet.Address, token.Name, chainName, monitorType, clients[chainName])
w.Add(1)
go func() {
defer w.Done()
tasks, processType, err := fn()
if err != nil {
logrus.Errorf("bot error: %s", err)
return
}
_, taskId, err := createOrder(tasks, processType)
if err != nil {
logrus.Errorf("create order error: %s", err)
return
}
market.PushTask(market.Task{
Id: taskId,
ProcessType: processType,
})
}()
}
}
}
w.Wait()
return nil
}

func process(ctx context.Context, conf configs.Config, walletAddress, tokenName, chainName, monitorType string,
client simulated.Client) func() ([]bot.Task, bot.ProcessType, error) {
m := bot.GetMonitor(monitorType)
return func() ([]bot.Task, bot.ProcessType, error) {
return m.Check(ctx, bot.Params{
Conf: conf,
Info: bot.Config{
Wallet: conf.GetWallet(walletAddress),
TokenName: tokenName,
Chain: chainName,
},
Client: client,
})
}
}
15 changes: 15 additions & 0 deletions internal/daemons/bot/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package bot

import (
"omni-balance/internal/daemons"
"time"
)

func init() {
daemons.RegisterIntervalTask(daemons.Task{
Name: "bot",
Description: "Check the balance based on the specific bot and create an order",
TaskFunc: Run,
DefaultInterval: time.Minute * 3,
})
}
71 changes: 71 additions & 0 deletions internal/daemons/bot/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package bot

import (
uuid "github.com/satori/go.uuid"
"omni-balance/internal/db"
"omni-balance/internal/models"
"omni-balance/utils"
"omni-balance/utils/bot"
"omni-balance/utils/provider"
)

func getExistingBuyTokens() ([]*models.Order, error) {
var existBuyTokens []*models.Order
err := db.DB().Where("status != ? ", provider.TxStatusSuccess).Find(&existBuyTokens).Error
if err != nil {
return nil, err
}
return existBuyTokens, nil
}

func createIgnoreTokens(existBuyTokens []*models.Order) IgnoreTokens {
var ignoreTokens []IgnoreToken
for _, v := range existBuyTokens {
ignoreTokens = append(ignoreTokens, IgnoreToken{
Name: v.TokenOutName,
Chain: v.TargetChainName,
Address: v.Wallet,
})
}
return ignoreTokens
}

func createOrder(tasks []bot.Task, processType bot.ProcessType) (orders []*models.Order, taskId string, err error) {
if len(tasks) == 0 {
return
}
var (
txn = db.DB().Begin()
)

taskId = uuid.NewV4().String()

for _, v := range tasks {
o := &models.Order{
Wallet: v.Wallet,
TokenInName: v.TokenInName,
TokenOutName: v.TokenOutName,
SourceChainName: v.TokenInChainName,
TargetChainName: v.TokenOutChainName,
CurrentChainName: v.CurrentChainName,
Amount: v.Amount,
Status: v.Status,
ProviderType: v.ProviderType,
ProviderName: v.ProviderName,
Order: utils.Object2JsonRawMessage(v.Order),
TaskId: taskId,
ProcessType: string(processType),
}

if err = txn.Create(o).Error; err != nil {
txn.Rollback()
return
}
orders = append(orders, o)
}
if err = txn.Commit().Error; err != nil {
txn.Rollback()
return
}
return
}
25 changes: 25 additions & 0 deletions internal/daemons/bot/vars.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package bot

import (
"fmt"
"strings"
)

type IgnoreTokens []IgnoreToken

type IgnoreToken struct {
Name string `json:"name"`
Chain string `json:"chain"`
Address string `json:"wallet"`
}

func (i IgnoreTokens) Contains(name, chain, address string) bool {
key := fmt.Sprintf("%s_%s_%s", name, chain, address)
for _, token := range i {
if !strings.EqualFold(key, fmt.Sprintf("%s_%s_%s", token.Name, token.Chain, token.Address)) {
continue
}
return true
}
return false
}
16 changes: 16 additions & 0 deletions internal/daemons/market/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package market

import (
"omni-balance/internal/daemons"
"time"
)

func init() {
daemons.RegisterIntervalTask(daemons.Task{
Name: "market",
Description: "Look for tasks in the database that are not being processed and process them.",
TaskFunc: Run,
DefaultInterval: time.Minute * 3,
RunOnStart: true,
})
}
Loading

0 comments on commit a020d95

Please sign in to comment.