From 5e3adeb4ad2cf0c1de1b7dd9ca27fa895ce5be65 Mon Sep 17 00:00:00 2001 From: ftocal <46001274+ftocal@users.noreply.github.com> Date: Mon, 4 Sep 2023 15:17:23 -0300 Subject: [PATCH] Retrieve chain and sender of wormchain originated vaas (#678) * Add additional information for osmosis transaction through wormchain Co-authored-by: walker-16 * Modify tx-tracker deployment Co-authored-by: walker-16 --------- Co-authored-by: walker-16 --- api/handlers/transactions/model.go | 17 +- deploy/tx-tracker/env/production-mainnet.env | 7 + deploy/tx-tracker/env/production-testnet.env | 7 + deploy/tx-tracker/env/staging-mainnet.env | 7 + deploy/tx-tracker/env/staging-testnet.env | 7 + deploy/tx-tracker/tx-tracker-service.yaml | 10 + ...ckerfile.tx-tracker-service => Dockerfile} | 0 tx-tracker/chains/api_wormchain.go | 264 ++++++++++++++++++ tx-tracker/chains/chains.go | 27 ++ tx-tracker/chains/util.go | 52 +++- tx-tracker/cmd/backfiller/main.go | 4 +- tx-tracker/cmd/fetchone/main.go | 6 +- tx-tracker/cmd/service/main.go | 2 +- tx-tracker/config/structs.go | 6 + tx-tracker/consumer/consumer.go | 5 +- tx-tracker/consumer/processor.go | 3 +- tx-tracker/consumer/repository.go | 3 + 17 files changed, 413 insertions(+), 14 deletions(-) rename tx-tracker/{Dockerfile.tx-tracker-service => Dockerfile} (100%) create mode 100644 tx-tracker/chains/api_wormchain.go diff --git a/api/handlers/transactions/model.go b/api/handlers/transactions/model.go index c17555a9e..44edc0add 100644 --- a/api/handlers/transactions/model.go +++ b/api/handlers/transactions/model.go @@ -74,14 +74,21 @@ type GlobalTransactionDoc struct { DestinationTx *DestinationTx `bson:"destinationTx" json:"destinationTx"` } -// OriginTx representa a origin transaction. +// OriginTx represents a origin transaction. type OriginTx struct { - TxHash string `bson:"nativeTxHash" json:"txHash"` - From string `bson:"from" json:"from"` - Status string `bson:"status" json:"status"` + TxHash string `bson:"nativeTxHash" json:"txHash"` + From string `bson:"from" json:"from"` + Status string `bson:"status" json:"status"` + Attribute *AttributeDoc `bson:"attribute" json:"attribute"` } -// DestinationTx representa a destination transaction. +// AttributeDoc represents a custom attribute for a origin transaction. +type AttributeDoc struct { + Type string `bson:"type" json:"type"` + Value map[string]any `bson:"value" json:"value"` +} + +// DestinationTx represents a destination transaction. type DestinationTx struct { ChainID sdk.ChainID `bson:"chainId" json:"chainId"` Status string `bson:"status" json:"status"` diff --git a/deploy/tx-tracker/env/production-mainnet.env b/deploy/tx-tracker/env/production-mainnet.env index eb13b62cf..5a3352c02 100644 --- a/deploy/tx-tracker/env/production-mainnet.env +++ b/deploy/tx-tracker/env/production-mainnet.env @@ -9,6 +9,7 @@ RESOURCES_REQUESTS_MEMORY=128Mi RESOURCES_REQUESTS_CPU=250m SQS_URL= SQS_AWS_REGION= +P2P_NETWORK=mainnet AWS_IAM_ROLE= METRICS_ENABLED=true @@ -62,6 +63,9 @@ OASIS_REQUESTS_PER_MINUTE=12 OPTIMISM_BASE_URL=https://rpc.ankr.com/optimism OPTIMISM_REQUESTS_PER_MINUTE=12 +OSMOSIS_BASE_URL=https://rpc.osmosis.zone +OSMOSIS_REQUESTS_PER_MINUTE=12 + POLYGON_BASE_URL=https://rpc.ankr.com/polygon POLYGON_REQUESTS_PER_MINUTE=12 @@ -77,5 +81,8 @@ TERRA_REQUESTS_PER_MINUTE=12 TERRA2_BASE_URL=https://phoenix-lcd.terra.dev TERRA2_REQUESTS_PER_MINUTE=12 +WORMCHAIN_BASE_URL=https://wormchain.jumpisolated.com +WORMCHAIN_REQUESTS_PER_MINUTE=12 + XPLA_BASE_URL=https://dimension-lcd.xpla.dev XPLA_REQUESTS_PER_MINUTE=12 \ No newline at end of file diff --git a/deploy/tx-tracker/env/production-testnet.env b/deploy/tx-tracker/env/production-testnet.env index 42e662ed6..3060e4a61 100644 --- a/deploy/tx-tracker/env/production-testnet.env +++ b/deploy/tx-tracker/env/production-testnet.env @@ -9,6 +9,7 @@ RESOURCES_REQUESTS_MEMORY=15Mi RESOURCES_REQUESTS_CPU=10m SQS_URL= SQS_AWS_REGION= +P2P_NETWORK=testnet AWS_IAM_ROLE= METRICS_ENABLED=true @@ -60,6 +61,9 @@ OASIS_REQUESTS_PER_MINUTE=12 OPTIMISM_BASE_URL=https://goerli.optimism.io OPTIMISM_REQUESTS_PER_MINUTE=12 +OSMOSIS_BASE_URL=https://rpc.testnet.osmosis.zone +OSMOSIS_REQUESTS_PER_MINUTE=12 + POLYGON_BASE_URL=https://rpc.ankr.com/polygon_mumbai POLYGON_REQUESTS_PER_MINUTE=12 @@ -75,5 +79,8 @@ TERRA_REQUESTS_PER_MINUTE=12 TERRA2_BASE_URL=https://pisco-lcd.terra.dev TERRA2_REQUESTS_PER_MINUTE=12 +WORMCHAIN_BASE_URL=https://wormchain-testnet.jumpisolated.com +WORMCHAIN_REQUESTS_PER_MINUTE=12 + XPLA_BASE_URL=https://cube-lcd.xpla.dev:443 XPLA_REQUESTS_PER_MINUTE=12 \ No newline at end of file diff --git a/deploy/tx-tracker/env/staging-mainnet.env b/deploy/tx-tracker/env/staging-mainnet.env index 6dc913bdb..23c5a161d 100644 --- a/deploy/tx-tracker/env/staging-mainnet.env +++ b/deploy/tx-tracker/env/staging-mainnet.env @@ -9,6 +9,7 @@ RESOURCES_REQUESTS_MEMORY=15Mi RESOURCES_REQUESTS_CPU=40m SQS_URL= SQS_AWS_REGION= +P2P_NETWORK=mainnet AWS_IAM_ROLE= METRICS_ENABLED=true @@ -62,6 +63,9 @@ OASIS_REQUESTS_PER_MINUTE=12 OPTIMISM_BASE_URL=https://rpc.ankr.com/optimism OPTIMISM_REQUESTS_PER_MINUTE=12 +OSMOSIS_BASE_URL=https://rpc.osmosis.zone +OSMOSIS_REQUESTS_PER_MINUTE=12 + POLYGON_BASE_URL=https://rpc.ankr.com/polygon POLYGON_REQUESTS_PER_MINUTE=12 @@ -77,5 +81,8 @@ TERRA_REQUESTS_PER_MINUTE=12 TERRA2_BASE_URL=https://phoenix-lcd.terra.dev TERRA2_REQUESTS_PER_MINUTE=12 +WORMCHAIN_BASE_URL=https://wormchain.jumpisolated.com +WORMCHAIN_REQUESTS_PER_MINUTE=12 + XPLA_BASE_URL=https://dimension-lcd.xpla.dev XPLA_REQUESTS_PER_MINUTE=12 \ No newline at end of file diff --git a/deploy/tx-tracker/env/staging-testnet.env b/deploy/tx-tracker/env/staging-testnet.env index 6fc974225..154aeaa08 100644 --- a/deploy/tx-tracker/env/staging-testnet.env +++ b/deploy/tx-tracker/env/staging-testnet.env @@ -9,6 +9,7 @@ RESOURCES_REQUESTS_MEMORY=15Mi RESOURCES_REQUESTS_CPU=10m SQS_URL= SQS_AWS_REGION= +P2P_NETWORK=testnet AWS_IAM_ROLE= METRICS_ENABLED=true @@ -60,6 +61,9 @@ OASIS_REQUESTS_PER_MINUTE=12 OPTIMISM_BASE_URL=https://goerli.optimism.io OPTIMISM_REQUESTS_PER_MINUTE=12 +OSMOSIS_BASE_URL=https://rpc.testnet.osmosis.zone +OSMOSIS_REQUESTS_PER_MINUTE=12 + POLYGON_BASE_URL=https://rpc.ankr.com/polygon_mumbai POLYGON_REQUESTS_PER_MINUTE=12 @@ -75,5 +79,8 @@ TERRA_REQUESTS_PER_MINUTE=12 TERRA2_BASE_URL=https://pisco-lcd.terra.dev TERRA2_REQUESTS_PER_MINUTE=12 +WORMCHAIN_BASE_URL=https://wormchain-testnet.jumpisolated.com +WORMCHAIN_REQUESTS_PER_MINUTE=12 + XPLA_BASE_URL=https://cube-lcd.xpla.dev:443 XPLA_REQUESTS_PER_MINUTE=12 \ No newline at end of file diff --git a/deploy/tx-tracker/tx-tracker-service.yaml b/deploy/tx-tracker/tx-tracker-service.yaml index e3cb83901..d5c09aa09 100644 --- a/deploy/tx-tracker/tx-tracker-service.yaml +++ b/deploy/tx-tracker/tx-tracker-service.yaml @@ -60,6 +60,8 @@ spec: value: {{ .SQS_URL }} - name: AWS_REGION value: {{ .SQS_AWS_REGION }} + - name: P2P_NETWORK + value: {{ .P2P_NETWORK }} - name: METRICS_ENABLED value: "{{ .METRICS_ENABLED }}" - name: ACALA_BASE_URL @@ -126,6 +128,10 @@ spec: value: {{ .OPTIMISM_BASE_URL }} - name: OPTIMISM_REQUESTS_PER_MINUTE value: "{{ .OPTIMISM_REQUESTS_PER_MINUTE }}" + - name: OSMOSIS_BASE_URL + value: {{ .OSMOSIS_BASE_URL }} + - name: OSMOSIS_REQUESTS_PER_MINUTE + value: "{{ .OSMOSIS_REQUESTS_PER_MINUTE }}" - name: POLYGON_BASE_URL value: {{ .POLYGON_BASE_URL }} - name: POLYGON_REQUESTS_PER_MINUTE @@ -146,6 +152,10 @@ spec: value: {{ .TERRA2_BASE_URL }} - name: TERRA2_REQUESTS_PER_MINUTE value: "{{ .TERRA2_REQUESTS_PER_MINUTE }}" + - name: WORMCHAIN_BASE_URL + value: {{ .WORMCHAIN_BASE_URL }} + - name: WORMCHAIN_REQUESTS_PER_MINUTE + value: "{{ .WORMCHAIN_REQUESTS_PER_MINUTE }}" - name: XPLA_BASE_URL value: {{ .XPLA_BASE_URL }} - name: XPLA_REQUESTS_PER_MINUTE diff --git a/tx-tracker/Dockerfile.tx-tracker-service b/tx-tracker/Dockerfile similarity index 100% rename from tx-tracker/Dockerfile.tx-tracker-service rename to tx-tracker/Dockerfile diff --git a/tx-tracker/chains/api_wormchain.go b/tx-tracker/chains/api_wormchain.go new file mode 100644 index 000000000..6a75013c9 --- /dev/null +++ b/tx-tracker/chains/api_wormchain.go @@ -0,0 +1,264 @@ +package chains + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/wormhole-foundation/wormhole-explorer/common/domain" + sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" +) + +type apiWormchain struct { + osmosisUrl string + osmosisRateLimiter *time.Ticker + p2pNetwork string +} + +type wormchainTxDetail struct { + Jsonrpc string `json:"jsonrpc"` + ID int `json:"id"` + Result struct { + Hash string `json:"hash"` + Height string `json:"height"` + Index int `json:"index"` + TxResult struct { + Code int `json:"code"` + Data string `json:"data"` + Log string `json:"log"` + Info string `json:"info"` + GasWanted string `json:"gas_wanted"` + GasUsed string `json:"gas_used"` + Events []struct { + Type string `json:"type"` + Attributes []struct { + Key string `json:"key"` + Value string `json:"value"` + Index bool `json:"index"` + } `json:"attributes"` + } `json:"events"` + Codespace string `json:"codespace"` + } `json:"tx_result"` + Tx string `json:"tx"` + } `json:"result"` +} + +type event struct { + Type string `json:"type"` + Attributes []struct { + Key string `json:"key"` + Value string `json:"value"` + } `json:"attributes"` +} + +type packetData struct { + Sender string `json:"sender"` + Receiver string `json:"receiver"` +} + +type logWrapper struct { + Events []event `json:"events"` +} + +type worchainTx struct { + srcChannel, dstChannel, sender, receiver, timestamp, sequence string +} + +func fetchWormchainDetail(ctx context.Context, baseUrl string, rateLimiter *time.Ticker, txHash string) (*worchainTx, error) { + uri := fmt.Sprintf("%s/tx?hash=%s", baseUrl, txHash) + body, err := httpGet(ctx, rateLimiter, uri) + if err != nil { + return nil, err + } + + var tx wormchainTxDetail + err = json.Unmarshal(body, &tx) + if err != nil { + return nil, err + } + + var log []logWrapper + err = json.Unmarshal([]byte(tx.Result.TxResult.Log), &log) + if err != nil { + return nil, err + } + + var srcChannel, dstChannel, sender, receiver, timestamp, sequence string + for _, l := range log { + for _, e := range l.Events { + if e.Type == "recv_packet" { + for _, attr := range e.Attributes { + if attr.Key == "packet_src_channel" { + srcChannel = attr.Value + } + if attr.Key == "packet_dst_channel" { + dstChannel = attr.Value + } + if attr.Key == "packet_timeout_timestamp" { + timestamp = attr.Value + } + + if attr.Key == "packet_sequence" { + sequence = attr.Value + } + + if attr.Key == "packet_data" { + var pd packetData + err = json.Unmarshal([]byte(attr.Value), &pd) + if err != nil { + return nil, err + } + sender = pd.Sender + receiver = pd.Receiver + } + } + } + } + } + return &worchainTx{ + srcChannel: srcChannel, + dstChannel: dstChannel, + sender: sender, + receiver: receiver, + timestamp: timestamp, + sequence: sequence, + }, nil + +} + +const queryTemplate = `send_packet.packet_sequence='%s' AND send_packet.packet_timeout_timestamp='%s' AND send_packet.packet_src_channel='%s' AND send_packet.packet_dst_channel='%s'` + +type osmosisRequest struct { + Jsonrpc string `json:"jsonrpc"` + ID int `json:"id"` + Method string `json:"method"` + Params struct { + Query string `json:"query"` + Page string `json:"page"` + } `json:"params"` +} + +type osmosisResponse struct { + Jsonrpc string `json:"jsonrpc"` + ID int `json:"id"` + Result struct { + Txs []struct { + Hash string `json:"hash"` + Height string `json:"height"` + Index int `json:"index"` + TxResult struct { + Code int `json:"code"` + Data string `json:"data"` + Log string `json:"log"` + Info string `json:"info"` + GasWanted string `json:"gas_wanted"` + GasUsed string `json:"gas_used"` + Events []struct { + Type string `json:"type"` + Attributes []struct { + Key string `json:"key"` + Value string `json:"value"` + Index bool `json:"index"` + } `json:"attributes"` + } `json:"events"` + Codespace string `json:"codespace"` + } `json:"tx_result"` + Tx string `json:"tx"` + } `json:"txs"` + TotalCount string `json:"total_count"` + } `json:"result"` +} + +type osmosisTx struct { + txHash string +} + +type WorchainAttributeTxDetail struct { + OriginChainID sdk.ChainID `bson:"originChainId"` + OriginTxHash string `bson:"originTxHash"` + OriginAddress string `bson:"originAddress"` +} + +func fetchOsmosisDetail(ctx context.Context, baseUrl string, rateLimiter *time.Ticker, sequence, timestamp, srcChannel, dstChannel string) (*osmosisTx, error) { + + query := fmt.Sprintf(queryTemplate, sequence, timestamp, srcChannel, dstChannel) + q := osmosisRequest{ + Jsonrpc: "2.0", + ID: 1, + Method: "tx_search", + Params: struct { + Query string `json:"query"` + Page string `json:"page"` + }{ + Query: query, + Page: "1", + }, + } + + response, err := httpPost(ctx, rateLimiter, baseUrl, q) + if err != nil { + return nil, err + } + + var oReponse osmosisResponse + err = json.Unmarshal(response, &oReponse) + if err != nil { + return nil, err + } + + if len(oReponse.Result.Txs) == 0 { + return nil, fmt.Errorf("can not found hash for sequence %s, timestamp %s, srcChannel %s, dstChannel %s", sequence, timestamp, srcChannel, dstChannel) + } + return &osmosisTx{txHash: oReponse.Result.Txs[0].Hash}, nil +} + +func (a *apiWormchain) fetchWormchainTx( + ctx context.Context, + rateLimiter *time.Ticker, + baseUrl string, + txHash string, +) (*TxDetail, error) { + + txHash = txHashLowerCaseWith0x(txHash) + + wormchainTx, err := fetchWormchainDetail(ctx, baseUrl, rateLimiter, txHash) + if err != nil { + return nil, err + } + + // Verify if this transaction is from osmosis by wormchain + if a.isOsmosisTx(wormchainTx) { + osmosisTx, err := fetchOsmosisDetail(ctx, a.osmosisUrl, a.osmosisRateLimiter, wormchainTx.sequence, wormchainTx.timestamp, wormchainTx.srcChannel, wormchainTx.dstChannel) + if err != nil { + return nil, err + } + return &TxDetail{ + NativeTxHash: txHash, + From: wormchainTx.receiver, + Attribute: &AttributeTxDetail{ + Type: "wormchain-gateway", + Value: &WorchainAttributeTxDetail{ + OriginChainID: ChainIDOsmosis, + OriginTxHash: osmosisTx.txHash, + OriginAddress: wormchainTx.sender, + }, + }, + }, nil + } + + return &TxDetail{ + NativeTxHash: txHash, + From: wormchainTx.receiver, + }, nil +} + +func (a *apiWormchain) isOsmosisTx(tx *worchainTx) bool { + if a.p2pNetwork == domain.P2pMainNet { + return tx.srcChannel == "channel-2186" && tx.dstChannel == "channel-3" + } + if a.p2pNetwork == domain.P2pTestNet { + return tx.srcChannel == "channel-486" && tx.dstChannel == "channel-4" + } + return false +} diff --git a/tx-tracker/chains/chains.go b/tx-tracker/chains/chains.go index 909162d1c..bf0691900 100644 --- a/tx-tracker/chains/chains.go +++ b/tx-tracker/chains/chains.go @@ -23,11 +23,23 @@ var ( baseUrlsByChain map[sdk.ChainID]string ) +// WARNING: The following chain IDs are not supported by the wormhole-sdk: +const ChainIDOsmosis sdk.ChainID = 20 + +type WormchainTxDetail struct { +} type TxDetail struct { // From is the address that signed the transaction, encoded in the chain's native format. From string // NativeTxHash contains the transaction hash, encoded in the chain's native format. NativeTxHash string + // Attribute contains the specific information of the transaction. + Attribute *AttributeTxDetail +} + +type AttributeTxDetail struct { + Type string + Value any } func Initialize(cfg *config.RpcProviderSettings) { @@ -67,6 +79,8 @@ func Initialize(cfg *config.RpcProviderSettings) { rateLimitersByChain[sdk.ChainIDTerra2] = convertToRateLimiter(cfg.Terra2RequestsPerMinute) rateLimitersByChain[sdk.ChainIDSui] = convertToRateLimiter(cfg.SuiRequestsPerMinute) rateLimitersByChain[sdk.ChainIDXpla] = convertToRateLimiter(cfg.XplaRequestsPerMinute) + rateLimitersByChain[sdk.ChainIDWormchain] = convertToRateLimiter(cfg.WormchainRequestsPerMinute) + rateLimitersByChain[ChainIDOsmosis] = convertToRateLimiter(cfg.OsmosisRequestsPerMinute) // Initialize the RPC base URLs for each chain baseUrlsByChain = make(map[sdk.ChainID]string) @@ -92,6 +106,7 @@ func Initialize(cfg *config.RpcProviderSettings) { baseUrlsByChain[sdk.ChainIDTerra2] = cfg.Terra2BaseUrl baseUrlsByChain[sdk.ChainIDSui] = cfg.SuiBaseUrl baseUrlsByChain[sdk.ChainIDXpla] = cfg.XplaBaseUrl + baseUrlsByChain[sdk.ChainIDWormchain] = cfg.WormchainBaseUrl } func FetchTx( @@ -99,6 +114,7 @@ func FetchTx( cfg *config.RpcProviderSettings, chainId sdk.ChainID, txHash string, + p2pNetwork string, ) (*TxDetail, error) { // Decide which RPC/API service to use based on chain ID @@ -132,6 +148,17 @@ func FetchTx( sdk.ChainIDOptimism, sdk.ChainIDPolygon: fetchFunc = fetchEthTx + case sdk.ChainIDWormchain: + rateLimiter, ok := rateLimitersByChain[ChainIDOsmosis] + if !ok { + return nil, errors.New("found no rate limiter for chain osmosis") + } + apiWormchain := &apiWormchain{ + osmosisUrl: cfg.OsmosisBaseUrl, + osmosisRateLimiter: rateLimiter, + p2pNetwork: p2pNetwork, + } + fetchFunc = apiWormchain.fetchWormchainTx default: return nil, ErrChainNotSupported } diff --git a/tx-tracker/chains/util.go b/tx-tracker/chains/util.go index d7c00914b..5bdd3325d 100644 --- a/tx-tracker/chains/util.go +++ b/tx-tracker/chains/util.go @@ -1,9 +1,11 @@ package chains import ( + "bytes" "context" + "encoding/json" "fmt" - "io/ioutil" + "io" "net/http" "strconv" "strings" @@ -56,13 +58,52 @@ func httpGet(ctx context.Context, rateLimiter *time.Ticker, url string) ([]byte, } // Read the response body and return - body, err := ioutil.ReadAll(response.Body) + body, err := io.ReadAll(response.Body) if err != nil { return nil, fmt.Errorf("failed to read response body: %w", err) } return body, nil } +// httpPost is a helper function that performs an HTTP request. +func httpPost(ctx context.Context, rateLimiter *time.Ticker, url string, body any) ([]byte, error) { + + // Wait for the rate limiter + if !waitForRateLimiter(ctx, rateLimiter) { + return nil, ctx.Err() + } + + b, err := json.Marshal(body) + if err != nil { + return nil, err + } + + // Build the HTTP request + request, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(b)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + request.Header.Set("Content-Type", "application/json") + + // Send it + var client http.Client + response, err := client.Do(request) + if err != nil { + return nil, fmt.Errorf("failed to query url: %w", err) + } + defer response.Body.Close() + if response.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected HTTP status code: %d", response.StatusCode) + } + + // Read the response body and return + result, err := io.ReadAll(response.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + return result, nil +} + func waitForRateLimiter(ctx context.Context, t *time.Ticker) bool { select { case <-t.C: @@ -108,3 +149,10 @@ func (c *rateLimitedRpcClient) CallContext( func (c *rateLimitedRpcClient) Close() { c.client.Close() } + +func txHashLowerCaseWith0x(v string) string { + if strings.HasPrefix(v, "0x") { + return strings.ToLower(v) + } + return "0x" + strings.ToLower(v) +} diff --git a/tx-tracker/cmd/backfiller/main.go b/tx-tracker/cmd/backfiller/main.go index d54b89ddf..1117418f0 100644 --- a/tx-tracker/cmd/backfiller/main.go +++ b/tx-tracker/cmd/backfiller/main.go @@ -109,6 +109,7 @@ func main() { wg: &wg, totalDocuments: totalDocuments, processedDocuments: &processedDocuments, + p2pNetwork: cfg.P2pNetwork, } go consume(rootCtx, &p) } @@ -242,6 +243,7 @@ type consumerParams struct { wg *sync.WaitGroup totalDocuments uint64 processedDocuments *atomic.Uint64 + p2pNetwork string } // consume reads VAA IDs from a channel, processes them, and updates the database accordingly. @@ -303,7 +305,7 @@ func consume(ctx context.Context, params *consumerParams) { TxHash: *v.TxHash, Overwrite: true, // Overwrite old contents } - err := consumer.ProcessSourceTx(ctx, params.logger, params.rpcProviderSettings, params.repository, &p) + err := consumer.ProcessSourceTx(ctx, params.logger, params.rpcProviderSettings, params.repository, &p, params.p2pNetwork) if err != nil { params.logger.Error("Failed to track source tx", zap.String("vaaId", globalTx.Id), diff --git a/tx-tracker/cmd/fetchone/main.go b/tx-tracker/cmd/fetchone/main.go index f48e8f58f..ea6f430a3 100644 --- a/tx-tracker/cmd/fetchone/main.go +++ b/tx-tracker/cmd/fetchone/main.go @@ -13,8 +13,8 @@ import ( func main() { // validate commandline arguments - if len(os.Args) != 3 { - log.Fatalf("Usage: ./%s \n", os.Args[0]) + if len(os.Args) != 4 { + log.Fatalf("Usage: ./%s \n", os.Args[0]) } // load config settings @@ -31,7 +31,7 @@ func main() { // fetch tx data chains.Initialize(cfg) - txDetail, err := chains.FetchTx(context.Background(), cfg, chainId, os.Args[2]) + txDetail, err := chains.FetchTx(context.Background(), cfg, chainId, os.Args[2], os.Args[3]) if err != nil { log.Fatalf("Failed to get transaction data: %v", err) } diff --git a/tx-tracker/cmd/service/main.go b/tx-tracker/cmd/service/main.go index ee44ba7a0..68c1a8611 100644 --- a/tx-tracker/cmd/service/main.go +++ b/tx-tracker/cmd/service/main.go @@ -63,7 +63,7 @@ func main() { // create and start a consumer. vaaConsumeFunc := newVAAConsumeFunc(rootCtx, cfg, metrics, logger) repository := consumer.NewRepository(logger, db.Database) - consumer := consumer.New(vaaConsumeFunc, &cfg.RpcProviderSettings, rootCtx, logger, repository, metrics) + consumer := consumer.New(vaaConsumeFunc, &cfg.RpcProviderSettings, rootCtx, logger, repository, metrics, cfg.P2pNetwork) consumer.Start(rootCtx) logger.Info("Started wormhole-explorer-tx-tracker") diff --git a/tx-tracker/config/structs.go b/tx-tracker/config/structs.go index 5177e75d8..5e3f7f985 100644 --- a/tx-tracker/config/structs.go +++ b/tx-tracker/config/structs.go @@ -22,6 +22,7 @@ type BackfillerSettings struct { LogLevel string `split_words:"true" default:"INFO"` NumWorkers uint `split_words:"true" required:"true"` BulkSize uint `split_words:"true" required:"true"` + P2pNetwork string `split_words:"true" required:"true"` // Strategy determines which VAAs will be affected by the backfiller. Strategy struct { @@ -41,6 +42,7 @@ type ServiceSettings struct { LogLevel string `split_words:"true" default:"INFO"` PprofEnabled bool `split_words:"true" default:"false"` MetricsEnabled bool `split_words:"true" default:"false"` + P2pNetwork string `split_words:"true" required:"true"` AwsSettings MongodbSettings RpcProviderSettings @@ -92,6 +94,8 @@ type RpcProviderSettings struct { OasisRequestsPerMinute uint16 `split_words:"true" required:"true"` OptimismBaseUrl string `split_words:"true" required:"true"` OptimismRequestsPerMinute uint16 `split_words:"true" required:"true"` + OsmosisBaseUrl string `split_words:"true" required:"true"` + OsmosisRequestsPerMinute uint16 `split_words:"true" required:"true"` PolygonBaseUrl string `split_words:"true" required:"true"` PolygonRequestsPerMinute uint16 `split_words:"true" required:"true"` SolanaBaseUrl string `split_words:"true" required:"true"` @@ -104,6 +108,8 @@ type RpcProviderSettings struct { Terra2RequestsPerMinute uint16 `split_words:"true" required:"true"` XplaBaseUrl string `split_words:"true" required:"true"` XplaRequestsPerMinute uint16 `split_words:"true" required:"true"` + WormchainBaseUrl string `split_words:"true" required:"true"` + WormchainRequestsPerMinute uint16 `split_words:"true" required:"true"` } func LoadFromEnv[T any]() (*T, error) { diff --git a/tx-tracker/consumer/consumer.go b/tx-tracker/consumer/consumer.go index 95cdd45e8..c97ea0d57 100644 --- a/tx-tracker/consumer/consumer.go +++ b/tx-tracker/consumer/consumer.go @@ -19,6 +19,7 @@ type Consumer struct { logger *zap.Logger repository *Repository metrics metrics.Metrics + p2pNetwork string } // New creates a new vaa consumer. @@ -29,6 +30,7 @@ func New( logger *zap.Logger, repository *Repository, metrics metrics.Metrics, + p2pNetwork string, ) *Consumer { c := Consumer{ @@ -37,6 +39,7 @@ func New( logger: logger, repository: repository, metrics: metrics, + p2pNetwork: p2pNetwork, } return &c @@ -81,7 +84,7 @@ func (c *Consumer) process(ctx context.Context, msg queue.ConsumerMessage) { TxHash: event.TxHash, Overwrite: false, // avoid processing the same transaction twice } - err := ProcessSourceTx(ctx, c.logger, c.rpcProviderSettings, c.repository, &p) + err := ProcessSourceTx(ctx, c.logger, c.rpcProviderSettings, c.repository, &p, c.p2pNetwork) // Log a message informing the processing status if errors.Is(err, chains.ErrChainNotSupported) { diff --git a/tx-tracker/consumer/processor.go b/tx-tracker/consumer/processor.go index 519868ddb..3e8e39e98 100644 --- a/tx-tracker/consumer/processor.go +++ b/tx-tracker/consumer/processor.go @@ -44,6 +44,7 @@ func ProcessSourceTx( rpcServiceProviderSettings *config.RpcProviderSettings, repository *Repository, params *ProcessSourceTxParams, + p2pNetwork string, ) error { if !params.Overwrite { @@ -72,7 +73,7 @@ func ProcessSourceTx( for retries := 0; ; retries++ { // Get transaction details from the emitter blockchain - txDetail, err = chains.FetchTx(ctx, rpcServiceProviderSettings, params.ChainId, params.TxHash) + txDetail, err = chains.FetchTx(ctx, rpcServiceProviderSettings, params.ChainId, params.TxHash, p2pNetwork) if err == nil { break } diff --git a/tx-tracker/consumer/repository.go b/tx-tracker/consumer/repository.go index 17ba84999..b6f11d7d2 100644 --- a/tx-tracker/consumer/repository.go +++ b/tx-tracker/consumer/repository.go @@ -53,6 +53,9 @@ func (r *Repository) UpsertDocument(ctx context.Context, params *UpsertDocumentP if params.TxDetail != nil { fields = append(fields, primitive.E{Key: "nativeTxHash", Value: params.TxDetail.NativeTxHash}) fields = append(fields, primitive.E{Key: "from", Value: params.TxDetail.From}) + if params.TxDetail.Attribute != nil { + fields = append(fields, primitive.E{Key: "attribute", Value: params.TxDetail.Attribute}) + } } update := bson.D{