Skip to content

Commit

Permalink
feat: price-feeder: peggo integration (#649) (#664)
Browse files Browse the repository at this point in the history
(cherry picked from commit 58be905)

Co-authored-by: Rafael Tenfen <rafaeltenfen.rt@gmail.com>
  • Loading branch information
mergify[bot] and RafilxTenfen authored Mar 21, 2022
1 parent 7f14f3a commit e3cab7b
Show file tree
Hide file tree
Showing 14 changed files with 379 additions and 71 deletions.
1 change: 1 addition & 0 deletions price-feeder/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
- [#592](https://github.com/umee-network/umee/pull/592) Add subscribe ticker function to the following providers: Binance, Huobi, Kraken, and Okx.
- [#601](https://github.com/umee-network/umee/pull/601) Use TVWAP formula for determining prices when available.
- [#609](https://github.com/umee-network/umee/pull/609) TVWAP faulty provider detection.
- [#649](https://github.com/umee-network/umee/pull/649) Add "GetAvailablePairs" function to providers.

### Bug Fixes

Expand Down
66 changes: 28 additions & 38 deletions price-feeder/oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,53 +340,43 @@ func (o *Oracle) getOrSetProvider(ctx context.Context, providerName string) (pro

priceProvider, ok = o.priceProviders[providerName]
if !ok {
switch providerName {
case config.ProviderBinance:
binanceProvider, err := provider.NewBinanceProvider(ctx, o.logger, o.providerPairs[config.ProviderBinance]...)
if err != nil {
return nil, err
}
priceProvider = binanceProvider
newProvider, err := NewProvider(ctx, providerName, o.logger, o.providerPairs[providerName]...)
if err != nil {
return nil, err
}
priceProvider = newProvider

case config.ProviderKraken:
krakenProvider, err := provider.NewKrakenProvider(ctx, o.logger, o.providerPairs[config.ProviderKraken]...)
if err != nil {
return nil, err
}
priceProvider = krakenProvider
o.priceProviders[providerName] = priceProvider
}

return priceProvider, nil
}

case config.ProviderOsmosis:
priceProvider = provider.NewOsmosisProvider()
func NewProvider(ctx context.Context, providerName string, logger zerolog.Logger, providerPairs ...types.CurrencyPair) (provider.Provider, error) {
switch providerName {
case config.ProviderBinance:
return provider.NewBinanceProvider(ctx, logger, providerPairs...)

case config.ProviderHuobi:
huobiProvider, err := provider.NewHuobiProvider(ctx, o.logger, o.providerPairs[config.ProviderHuobi]...)
if err != nil {
return nil, err
}
priceProvider = huobiProvider
case config.ProviderKraken:
return provider.NewKrakenProvider(ctx, logger, providerPairs...)

case config.ProviderOkx:
okxProvider, err := provider.NewOkxProvider(ctx, o.logger, o.providerPairs[config.ProviderOkx]...)
if err != nil {
return nil, err
}
priceProvider = okxProvider
case config.ProviderOsmosis:
return provider.NewOsmosisProvider(), nil

case config.ProviderGate:
gateProvider, err := provider.NewGateProvider(ctx, o.logger, o.providerPairs[config.ProviderGate]...)
if err != nil {
return nil, err
}
priceProvider = gateProvider
case config.ProviderHuobi:
return provider.NewHuobiProvider(ctx, logger, providerPairs...)

case config.ProviderMock:
priceProvider = provider.NewMockProvider()
}
case config.ProviderOkx:
return provider.NewOkxProvider(ctx, logger, providerPairs...)

o.priceProviders[providerName] = priceProvider
case config.ProviderGate:
return provider.NewGateProvider(ctx, logger, providerPairs...)

case config.ProviderMock:
return provider.NewMockProvider(), nil
}

return priceProvider, nil
return nil, fmt.Errorf("provider %s not found", providerName)
}

// filterTickerDeviations finds the standard deviations of the prices of
Expand Down
8 changes: 8 additions & 0 deletions price-feeder/oracle/oracle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func (m mockProvider) SubscribeCurrencyPairs(_ ...types.CurrencyPair) error {
return nil
}

func (m mockProvider) GetAvailablePairs() (map[string]struct{}, error) {
return map[string]struct{}{}, nil
}

type failingProvider struct {
prices map[string]provider.TickerPrice
}
Expand All @@ -61,6 +65,10 @@ func (m failingProvider) SubscribeCurrencyPairs(_ ...types.CurrencyPair) error {
return nil
}

func (m failingProvider) GetAvailablePairs() (map[string]struct{}, error) {
return map[string]struct{}{}, nil
}

type OracleTestSuite struct {
suite.Suite

Expand Down
36 changes: 33 additions & 3 deletions price-feeder/oracle/provider/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"strings"
"sync"
Expand All @@ -15,8 +16,9 @@ import (
)

const (
binanceHost = "stream.binance.com:9443"
binancePath = "/ws/umeestream"
binanceHost = "stream.binance.com:9443"
binancePath = "/ws/umeestream"
binancePairsEndpoint = "https://api1.binance.com/api/v3/ticker/price"
)

var _ Provider = (*BinanceProvider)(nil)
Expand Down Expand Up @@ -68,6 +70,12 @@ type (
Params []string `json:"params"` // streams to subscribe ex.: usdtatom@ticker
ID uint16 `json:"id"` // identify messages going back and forth
}

// BinancePairSummary defines the response structure for a Binance pair
// summary.
BinancePairSummary struct {
Symbol string `json:"symbol"`
}
)

func NewBinanceProvider(ctx context.Context, logger zerolog.Logger, pairs ...types.CurrencyPair) (*BinanceProvider, error) {
Expand Down Expand Up @@ -178,7 +186,7 @@ func (p *BinanceProvider) subscribedPairsToSlice() []types.CurrencyPair {
p.mtx.RLock()
defer p.mtx.RUnlock()

return mapPairsToSlice(p.subscribedPairs)
return types.MapPairsToSlice(p.subscribedPairs)
}

func (p *BinanceProvider) getTickerPrice(key string) (TickerPrice, error) {
Expand Down Expand Up @@ -358,6 +366,28 @@ func (p *BinanceProvider) subscribePairs(pairs ...string) error {
return p.wsClient.WriteJSON(subsMsg)
}

// GetAvailablePairs returns all pairs to which the provider can subscribe.
// ex.: map["ATOMUSDT" => {}, "UMEEUSDC" => {}].
func (p *BinanceProvider) GetAvailablePairs() (map[string]struct{}, error) {
resp, err := http.Get(binancePairsEndpoint)
if err != nil {
return nil, err
}
defer resp.Body.Close()

var pairsSummary []BinancePairSummary
if err := json.NewDecoder(resp.Body).Decode(&pairsSummary); err != nil {
return nil, err
}

availablePairs := make(map[string]struct{}, len(pairsSummary))
for _, pairName := range pairsSummary {
availablePairs[strings.ToUpper(pairName.Symbol)] = struct{}{}
}

return availablePairs, nil
}

// currencyPairToBinanceTickerPair receives a currency pair and return binance
// ticker symbol atomusdt@ticker.
func currencyPairToBinanceTickerPair(cp types.CurrencyPair) string {
Expand Down
42 changes: 38 additions & 4 deletions price-feeder/oracle/provider/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"strings"
"sync"
"time"

Expand All @@ -15,9 +17,10 @@ import (
)

const (
gateHost = "ws.gate.io"
gatePath = "/v3"
gatePingCheck = time.Second * 28 // should be < 30
gateHost = "ws.gate.io"
gatePath = "/v3"
gatePingCheck = time.Second * 28 // should be < 30
gatePairsEndpoint = "https://api.gateio.ws/api/v4/spot/currency_pairs"
)

var _ Provider = (*GateProvider)(nil)
Expand Down Expand Up @@ -89,6 +92,12 @@ type (
GateEventResult struct {
Status string `json:"status"` // ex. "successful"
}

// GatePairSummary defines the response structure for a Gate pair summary.
GatePairSummary struct {
Base string `json:"base"`
Quote string `json:"quote"`
}
)

// NewGateProvider creates a new GateProvider.
Expand Down Expand Up @@ -234,7 +243,7 @@ func (p *GateProvider) subscribedPairsToSlice() []types.CurrencyPair {
p.mtx.RLock()
defer p.mtx.RUnlock()

return mapPairsToSlice(p.subscribedPairs)
return types.MapPairsToSlice(p.subscribedPairs)
}

func (p *GateProvider) getTickerPrice(cp types.CurrencyPair) (TickerPrice, error) {
Expand Down Expand Up @@ -493,6 +502,31 @@ func (p *GateProvider) pongHandler(appData string) error {
return nil
}

// GetAvailablePairs returns all pairs to which the provider can subscribe.
func (p *GateProvider) GetAvailablePairs() (map[string]struct{}, error) {
resp, err := http.Get(gatePairsEndpoint)
if err != nil {
return nil, err
}
defer resp.Body.Close()

var pairsSummary []GatePairSummary
if err := json.NewDecoder(resp.Body).Decode(&pairsSummary); err != nil {
return nil, err
}

availablePairs := make(map[string]struct{}, len(pairsSummary))
for _, pair := range pairsSummary {
cp := types.CurrencyPair{
Base: strings.ToUpper(pair.Base),
Quote: strings.ToUpper(pair.Quote),
}
availablePairs[cp.String()] = struct{}{}
}

return availablePairs, nil
}

func (ticker GateTicker) toTickerPrice() (TickerPrice, error) {
return newTickerPrice("Gate", ticker.Symbol, ticker.Last, ticker.Vol)
}
Expand Down
36 changes: 35 additions & 1 deletion price-feeder/oracle/provider/huobi.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
Expand All @@ -22,6 +23,7 @@ const (
huobiHost = "api-aws.huobi.pro"
huobiPath = "/ws"
huobiReconnectTime = time.Minute * 2
huobiPairsEndpoint = "https://api.huobi.pro/market/tickers"
)

var _ Provider = (*HuobiProvider)(nil)
Expand Down Expand Up @@ -74,6 +76,17 @@ type (
HuobiSubscriptionMsg struct {
Sub string `json:"sub"` // channel to subscribe market.$symbol.ticker
}

// HuobiPairsSummary defines the response structure for an Huobi pairs
// summary.
HuobiPairsSummary struct {
Data []HuobiPairData `json:"data"`
}

// HuobiPairData defines the data response structure for an Huobi pair.
HuobiPairData struct {
Symbol string `json:"symbol"`
}
)

// NewHuobiProvider returns a new Huobi provider with the WS connection and msg handler.
Expand Down Expand Up @@ -183,7 +196,7 @@ func (p *HuobiProvider) subscribedPairsToSlice() []types.CurrencyPair {
p.mtx.RLock()
defer p.mtx.RUnlock()

return mapPairsToSlice(p.subscribedPairs)
return types.MapPairsToSlice(p.subscribedPairs)
}

func (p *HuobiProvider) handleWebSocketMsgs(ctx context.Context) {
Expand Down Expand Up @@ -384,6 +397,27 @@ func (p *HuobiProvider) setSubscribedPairs(cps ...types.CurrencyPair) {
}
}

// GetAvailablePairs returns all pairs to which the provider can subscribe.
func (p *HuobiProvider) GetAvailablePairs() (map[string]struct{}, error) {
resp, err := http.Get(huobiPairsEndpoint)
if err != nil {
return nil, err
}
defer resp.Body.Close()

var pairsSummary HuobiPairsSummary
if err := json.NewDecoder(resp.Body).Decode(&pairsSummary); err != nil {
return nil, err
}

availablePairs := make(map[string]struct{}, len(pairsSummary.Data))
for _, pair := range pairsSummary.Data {
availablePairs[strings.ToUpper(pair.Symbol)] = struct{}{}
}

return availablePairs, nil
}

// decompressGzip uncompress gzip compressed messages. All data returned from the
// websocket Market APIs is compressed with GZIP, so it needs to be unzipped.
func decompressGzip(bz []byte) ([]byte, error) {
Expand Down
Loading

0 comments on commit e3cab7b

Please sign in to comment.