Skip to content

Commit

Permalink
feat: add websocket eth subscription to deposit tracker (#4081)
Browse files Browse the repository at this point in the history
* feat: add websocket eth subscription to deposit tracker

* chore: address minor review comments

* chore: rename event_sender -> witness_sender

* feat: change chainflip-btc-tracker to chainflip-ingress-egress-tracker

* refactor: move eth under (new) witnessing module

* refactor: move shared logic and initialisation out of eth module

* feat: configuration via env variables

* chore: clippy fix

* chore: rename docker files

* fix: ignore send error if no subscribers

* chore: rename subscription endpoints
  • Loading branch information
msgmaxim authored Oct 11, 2023
1 parent b1093cd commit eb669d2
Show file tree
Hide file tree
Showing 15 changed files with 396 additions and 55 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/_01_pre_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ jobs:
- chainflip-node
- generate-genesis-keys
- rust-base
- chainflip-btc-deposit-tracker
- chainflip-ingress-egress-tracker
steps:
- name: Checkout 🏁
uses: actions/checkout@v3
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/_02_retrieve-bins.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
name: chainflip-backend-bin-ubuntu-${{ matrix.ubuntu_version }}
path: |
chainflip-broker-api
chainflip-btc-deposit-tracker
chainflip-ingress-egress-tracker
chainflip-cli
chainflip-engine
chainflip-lp-api
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/_20_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
./target/release/generate-genesis-keys
./target/release/chainflip-broker-api
./target/release/chainflip-lp-api
./target/release/chainflip-btc-deposit-tracker
./target/release/chainflip-ingress-egress-tracker
./target/release/wbuild/state-chain-runtime/state_chain_runtime*.wasm
- name: Upload binary artifacts
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/_24_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
- chainflip-broker-api
- chainflip-lp-api
- generate-genesis-keys
- chainflip-btc-deposit-tracker
- chainflip-ingress-egress-tracker
- chainflip-engine-databases
docker-repo:
- ghcr.io/${{ github.repository }}
Expand Down
42 changes: 25 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ members = [
'api/bin/chainflip-cli',
'api/bin/chainflip-broker-api',
'api/bin/chainflip-lp-api',
'api/bin/chainflip-btc-deposit-tracker',
'api/bin/chainflip-ingress-egress-tracker',
'state-chain/chains',
'state-chain/node',
'state-chain/amm',
Expand Down
17 changes: 0 additions & 17 deletions api/bin/chainflip-btc-deposit-tracker/Cargo.toml

This file was deleted.

31 changes: 31 additions & 0 deletions api/bin/chainflip-ingress-egress-tracker/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[package]
name = "chainflip-ingress-egress-tracker"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0.72"
async-trait = "0.1.73"
futures = "0.3.28"
jsonrpsee = { version = "0.16.2", features = ["server"] }
reqwest = { version = "0.11.18", features = ["json"] }
serde = "1.0.183"
tokio = "1.29.1"
tracing = "0.1.34"
tracing-subscriber = { version = "0.3.3", features = ["env-filter"] }
tempfile = "3.8"

sp-core = { git = "https://github.com/chainflip-io/substrate.git", tag = "chainflip-monthly-2023-08+2" }
codec = { package = "parity-scale-codec", version = "3.6.1", features = [
"derive",
"full",
] }

# Local dependencies
chainflip-engine = { path = "../../../engine/"}
utilities = { path = "../../../utilities" }
cf-primitives = { path = "../../../state-chain/primitives" }
pallet-cf-environment = { path = "../../../state-chain/pallets/cf-environment" }
state-chain-runtime = { path = "../../../state-chain/runtime" }
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use anyhow::anyhow;
use async_trait::async_trait;
use chainflip_engine::settings::WsHttpEndpoints;
use futures::future;
use jsonrpsee::{core::Error, server::ServerBuilder, RpcModule};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
env,
io::Write,
net::SocketAddr,
path::PathBuf,
sync::{Arc, Mutex},
time::Duration,
};
Expand All @@ -17,6 +20,8 @@ type TxHash = String;
type BlockHash = String;
type Address = String;

mod witnessing;

#[derive(Deserialize)]
struct BestBlockResult {
result: BlockHash,
Expand Down Expand Up @@ -116,8 +121,12 @@ impl BtcRpc {
})
.collect::<Vec<String>>()
.join(",");

let username = env::var("BTC_USERNAME").unwrap_or("flip".to_string());
let password = env::var("BTC_PASSWORD").unwrap_or("flip".to_string());
reqwest::Client::new()
.post(url)
.basic_auth(username, Some(password))
.header("Content-Type", "text/plain")
.body(format!("[{}]", body))
.send()
Expand Down Expand Up @@ -179,10 +188,10 @@ async fn get_updated_cache<T: BtcNode>(btc: T, previous_cache: Cache) -> anyhow:
let previous_mempool: HashMap<TxHash, QueryResult> = previous_cache
.clone()
.transactions
.iter()
.into_iter()
.filter_map(|(_, query_result)| {
if query_result.confirmations == 0 {
Some((query_result.tx_hash.clone(), query_result.clone()))
Some((query_result.tx_hash.clone(), query_result))
} else {
None
}
Expand Down Expand Up @@ -277,6 +286,13 @@ fn lookup_transactions(
}
}

pub struct DepositTrackerSettings {
eth_node: WsHttpEndpoints,
// The key shouldn't be necessary, but the current witnesser wants this
eth_key_path: PathBuf,
state_chain_ws_endpoint: String,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::FmtSubscriber::builder()
Expand Down Expand Up @@ -306,7 +322,13 @@ async fn main() -> anyhow::Result<()> {
}
}
});
let server = ServerBuilder::default().build("0.0.0.0:13337".parse::<SocketAddr>()?).await?;
let server = ServerBuilder::default()
// It seems that if the client doesn't unsubscribe correctly, a "connection"
// won't be released, and we will eventually reach the limit, so we increase
// as a way to mitigate this issue.
// TODO: ensure that connections are always released
.build("0.0.0.0:13337".parse::<SocketAddr>()?)
.await?;
let mut module = RpcModule::new(());
module.register_async_method("status", move |arguments, _context| {
let cache = cache.clone();
Expand All @@ -317,6 +339,57 @@ async fn main() -> anyhow::Result<()> {
.and_then(|addresses| lookup_transactions(cache.lock().unwrap().clone(), addresses))
}
})?;

// Broadcast channel will drop old messages when the buffer if full to
// avoid "memory leaks" due to slow receivers.
const EVENT_BUFFER_SIZE: usize = 1024;
let (witness_sender, _) =
tokio::sync::broadcast::channel::<state_chain_runtime::RuntimeCall>(EVENT_BUFFER_SIZE);

// Temporary hack: we don't actually use eth key, but the current witnesser is
// expecting a path with a valid key, so we create a temporary dummy key file here:
let mut eth_key_temp_file = tempfile::NamedTempFile::new()?;
eth_key_temp_file
.write_all(b"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")
.unwrap();
let eth_key_path = eth_key_temp_file.path();

let eth_ws_endpoint = env::var("ETH_WS_ENDPOINT").unwrap_or("ws://localhost:8546".to_string());
let eth_http_endpoint =
env::var("ETH_HTTP_ENDPOINT").unwrap_or("http://localhost:8545".to_string());
let sc_ws_endpoint = env::var("SC_WS_ENDPOINT").unwrap_or("ws://localhost:9944".to_string());

let settings = DepositTrackerSettings {
eth_node: WsHttpEndpoints {
ws_endpoint: eth_ws_endpoint.into(),
http_endpoint: eth_http_endpoint.into(),
},
eth_key_path: eth_key_path.into(),
state_chain_ws_endpoint: sc_ws_endpoint,
};

witnessing::start(settings, witness_sender.clone());

module.register_subscription(
"subscribe_witnessing",
"s_witnessing",
"unsubscribe_witnessing",
move |_params, mut sink, _context| {
let mut witness_receiver = witness_sender.subscribe();

tokio::spawn(async move {
while let Ok(event) = witness_receiver.recv().await {
use codec::Encode;
if let Ok(false) = sink.send(&event.encode()) {
log::debug!("Subscription is closed");
break
}
}
});
Ok(())
},
)?;

let addr = server.local_addr()?;
log::info!("Listening on http://{}", addr);
let serverhandle = Box::pin(server.start(module)?.stopped());
Expand Down
Loading

0 comments on commit eb669d2

Please sign in to comment.