diff --git a/collector/src/binance/http.rs b/collector/src/binance/http.rs new file mode 100644 index 0000000..ad065ef --- /dev/null +++ b/collector/src/binance/http.rs @@ -0,0 +1,157 @@ +use std::{ + io, + io::ErrorKind, + time::{Duration, Instant}, +}; + +use anyhow::Error; +use chrono::{DateTime, Utc}; +use futures_util::{SinkExt, StreamExt}; +use reqwest; +use serde_json; +use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; +use tokio_tungstenite::{ + connect_async, + tungstenite::{client::IntoClientRequest, Message}, +}; +use tracing::{error, warn}; + +pub async fn fetch_symbol_list() -> Result, reqwest::Error> { + Ok(reqwest::Client::new() + .get("https://api.binance.com/api/v3/exchangeInfo") + .header("Accept", "application/json") + .send() + .await? + .json::() + .await? + .get("symbols") + .unwrap() + .as_array() + .unwrap() + .iter() + .map(|j_symbol| { + j_symbol + .get("symbol") + .unwrap() + .as_str() + .unwrap() + .to_string() + }) + .collect()) +} + +pub async fn fetch_depth_snapshot(symbol: &str) -> Result { + Ok(reqwest::Client::new() + .get(format!( + "https://api.binance.com/api/v3/depth?symbol={symbol}&limit=1000" + )) + .header("Accept", "application/json") + .send() + .await? + .text() + .await?) +} + +pub async fn connect( + url: &str, + ws_tx: UnboundedSender<(DateTime, String)>, +) -> Result<(), anyhow::Error> { + let request = url.into_client_request()?; + let (ws_stream, _) = connect_async(request).await?; + let (mut write, mut read) = ws_stream.split(); + let (tx, mut rx) = unbounded_channel::<()>(); + + tokio::spawn(async move { + loop { + match rx.recv().await { + Some(_) => { + if let Err(_) = write.send(Message::Pong(Vec::new())).await { + return; + } + } + None => { + break; + } + } + } + }); + + loop { + match read.next().await { + Some(Ok(Message::Text(text))) => { + let recv_time = Utc::now(); + if let Err(_) = ws_tx.send((recv_time, text)) { + break; + } + } + Some(Ok(Message::Binary(_))) => {} + Some(Ok(Message::Ping(_))) => { + tx.send(()).unwrap(); + } + Some(Ok(Message::Pong(_))) => {} + Some(Ok(Message::Close(close_frame))) => { + warn!(?close_frame, "closed"); + return Err(Error::from(io::Error::new( + ErrorKind::ConnectionAborted, + "closed", + ))); + } + Some(Ok(Message::Frame(_))) => {} + Some(Err(e)) => { + return Err(Error::from(e)); + } + None => { + break; + } + } + } + Ok(()) +} + +pub async fn keep_connection( + streams: Vec, + symbol_list: Vec, + ws_tx: UnboundedSender<(DateTime, String)>, +) { + let mut error_count = 0; + loop { + let connect_time = Instant::now(); + let streams_str = symbol_list + .iter() + .map(|pair| { + streams + .iter() + .cloned() + .map(|stream| { + stream + .replace("$symbol", pair.to_lowercase().as_str()) + .to_string() + }) + .collect::>() + }) + .flatten() + .collect::>() + .join("/"); + if let Err(error) = connect( + &format!("wss://stream.binance.com:9443/stream?streams={streams_str}"), + ws_tx.clone(), + ) + .await + { + error!(?error, "websocket error"); + error_count += 1; + if connect_time.elapsed() > Duration::from_secs(30) { + error_count = 0; + } + if error_count > 3 { + tokio::time::sleep(Duration::from_secs(1)).await; + } else if error_count > 10 { + tokio::time::sleep(Duration::from_secs(5)).await; + } else if error_count > 20 { + tokio::time::sleep(Duration::from_secs(10)).await; + } + } else { + break; + } + } +} diff --git a/collector/src/binance/mod.rs b/collector/src/binance/mod.rs new file mode 100644 index 0000000..b0b086e --- /dev/null +++ b/collector/src/binance/mod.rs @@ -0,0 +1,96 @@ +mod http; + +use std::collections::HashMap; + +use chrono::{DateTime, Utc}; +pub use http::{fetch_depth_snapshot, fetch_symbol_list, keep_connection}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; +use tracing::{error, warn}; + +use crate::error::ConnectorError; + +fn handle( + prev_u_map: &mut HashMap, + writer_tx: &UnboundedSender<(DateTime, String, String)>, + recv_time: DateTime, + data: String, +) -> Result<(), ConnectorError> { + let j: serde_json::Value = serde_json::from_str(&data)?; + if let Some(j_data) = j.get("data") { + if let Some(j_symbol) = j_data + .as_object() + .ok_or(ConnectorError::FormatError)? + .get("s") + { + let symbol = j_symbol.as_str().ok_or(ConnectorError::FormatError)?; + let ev = j_data + .get("e") + .ok_or(ConnectorError::FormatError)? + .as_str() + .ok_or(ConnectorError::FormatError)?; + if ev == "depthUpdate" { + let u = j_data + .get("u") + .ok_or(ConnectorError::FormatError)? + .as_i64() + .ok_or(ConnectorError::FormatError)?; + let U = j_data + .get("U") + .ok_or(ConnectorError::FormatError)? + .as_i64() + .ok_or(ConnectorError::FormatError)?; + let prev_u = prev_u_map.get(symbol); + if prev_u.is_none() || U != *prev_u.unwrap() + 1 { + warn!(%symbol, "missing depth feed has been detected."); + // todo: to circumvent API limits when repeated occurrences of missing depth + // feed happen within a short timeframe, implementing a backoff mechanism + // may be necessary. + let symbol_ = symbol.to_string(); + let writer_tx_ = writer_tx.clone(); + tokio::spawn(async move { + match fetch_depth_snapshot(&symbol_).await { + Ok(data) => { + let recv_time = Utc::now(); + let _ = writer_tx_.send((recv_time, symbol_, data)); + } + Err(error) => { + error!( + symbol = symbol_, + ?error, + "couldn't fetch the depth snapshot." + ); + } + } + }); + } + *prev_u_map.entry(symbol.to_string()).or_insert(0) = u; + } + let _ = writer_tx.send((recv_time, symbol.to_string(), data)); + } + } + Ok(()) +} + +pub async fn run_collection( + streams: Vec, + symbols: Vec, + writer_tx: UnboundedSender<(DateTime, String, String)>, +) -> Result<(), anyhow::Error> { + let mut prev_u_map = HashMap::new(); + let (ws_tx, mut ws_rx) = unbounded_channel(); + let h = tokio::spawn(keep_connection(streams, symbols, ws_tx.clone())); + loop { + match ws_rx.recv().await { + Some((recv_time, data)) => { + if let Err(error) = handle(&mut prev_u_map, &writer_tx, recv_time, data) { + error!(?error, "couldn't handle the received data."); + } + } + None => { + break; + } + } + } + let _ = h.await; + Ok(()) +} diff --git a/collector/src/binancefuturescm/http.rs b/collector/src/binancefuturescm/http.rs new file mode 100644 index 0000000..838bd3f --- /dev/null +++ b/collector/src/binancefuturescm/http.rs @@ -0,0 +1,158 @@ +use std::{ + io, + io::ErrorKind, + time::{Duration, Instant}, +}; + +use anyhow::Error; +use chrono::{DateTime, Utc}; +use futures_util::{SinkExt, StreamExt}; +use reqwest; +use serde_json; +use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; +use tokio_tungstenite::{ + connect_async, + tungstenite::{client::IntoClientRequest, Message}, +}; +use tracing::{error, warn}; + +pub async fn fetch_symbol_list() -> Result, reqwest::Error> { + Ok(reqwest::Client::new() + .get("https://dapi.binance.com/dapi/v1/exchangeInfo") + .header("Accept", "application/json") + .send() + .await? + .json::() + .await? + .get("symbols") + .unwrap() + .as_array() + .unwrap() + .iter() + .filter(|j_symbol| j_symbol.get("contractType").unwrap().as_str().unwrap() == "PERPETUAL") + .map(|j_symbol| { + j_symbol + .get("symbol") + .unwrap() + .as_str() + .unwrap() + .to_string() + }) + .collect()) +} + +pub async fn fetch_depth_snapshot(symbol: &str) -> Result { + Ok(reqwest::Client::new() + .get(format!( + "https://dapi.binance.com/dapi/v1/depth?symbol={symbol}&limit=1000" + )) + .header("Accept", "application/json") + .send() + .await? + .text() + .await?) +} + +pub async fn connect( + url: &str, + ws_tx: UnboundedSender<(DateTime, String)>, +) -> Result<(), anyhow::Error> { + let request = url.into_client_request()?; + let (ws_stream, _) = connect_async(request).await?; + let (mut write, mut read) = ws_stream.split(); + let (tx, mut rx) = unbounded_channel::<()>(); + + tokio::spawn(async move { + loop { + match rx.recv().await { + Some(_) => { + if let Err(_) = write.send(Message::Pong(Vec::new())).await { + return; + } + } + None => { + break; + } + } + } + }); + + loop { + match read.next().await { + Some(Ok(Message::Text(text))) => { + let recv_time = Utc::now(); + if let Err(_) = ws_tx.send((recv_time, text)) { + break; + } + } + Some(Ok(Message::Binary(_))) => {} + Some(Ok(Message::Ping(_))) => { + tx.send(()).unwrap(); + } + Some(Ok(Message::Pong(_))) => {} + Some(Ok(Message::Close(close_frame))) => { + warn!(?close_frame, "closed"); + return Err(Error::from(io::Error::new( + ErrorKind::ConnectionAborted, + "closed", + ))); + } + Some(Ok(Message::Frame(_))) => {} + Some(Err(e)) => { + return Err(Error::from(e)); + } + None => { + break; + } + } + } + Ok(()) +} + +pub async fn keep_connection( + streams: Vec, + symbol_list: Vec, + ws_tx: UnboundedSender<(DateTime, String)>, +) { + let mut error_count = 0; + loop { + let connect_time = Instant::now(); + let streams_str = symbol_list + .iter() + .map(|pair| { + streams + .iter() + .cloned() + .map(|stream| { + stream + .replace("$symbol", pair.to_lowercase().as_str()) + .to_string() + }) + .collect::>() + }) + .flatten() + .collect::>() + .join("/"); + if let Err(error) = connect( + &format!("wss://dstream.binance.com/stream?streams={streams_str}"), + ws_tx.clone(), + ) + .await + { + error!(?error, "websocket error"); + error_count += 1; + if connect_time.elapsed() > Duration::from_secs(30) { + error_count = 0; + } + if error_count > 3 { + tokio::time::sleep(Duration::from_secs(1)).await; + } else if error_count > 10 { + tokio::time::sleep(Duration::from_secs(5)).await; + } else if error_count > 20 { + tokio::time::sleep(Duration::from_secs(10)).await; + } + } else { + break; + } + } +} diff --git a/collector/src/binancefutures/mod.rs b/collector/src/binancefuturescm/mod.rs similarity index 100% rename from collector/src/binancefutures/mod.rs rename to collector/src/binancefuturescm/mod.rs diff --git a/collector/src/binancefutures/http.rs b/collector/src/binancefuturesum/http.rs similarity index 100% rename from collector/src/binancefutures/http.rs rename to collector/src/binancefuturesum/http.rs diff --git a/collector/src/binancefuturesum/mod.rs b/collector/src/binancefuturesum/mod.rs new file mode 100644 index 0000000..9baa925 --- /dev/null +++ b/collector/src/binancefuturesum/mod.rs @@ -0,0 +1,96 @@ +mod http; + +use std::collections::HashMap; + +use chrono::{DateTime, Utc}; +pub use http::{fetch_depth_snapshot, fetch_symbol_list, keep_connection}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; +use tracing::{error, warn}; + +use crate::error::ConnectorError; + +fn handle( + prev_u_map: &mut HashMap, + writer_tx: &UnboundedSender<(DateTime, String, String)>, + recv_time: DateTime, + data: String, +) -> Result<(), ConnectorError> { + let j: serde_json::Value = serde_json::from_str(&data)?; + if let Some(j_data) = j.get("data") { + if let Some(j_symbol) = j_data + .as_object() + .ok_or(ConnectorError::FormatError)? + .get("s") + { + let symbol = j_symbol.as_str().ok_or(ConnectorError::FormatError)?; + let ev = j_data + .get("e") + .ok_or(ConnectorError::FormatError)? + .as_str() + .ok_or(ConnectorError::FormatError)?; + if ev == "depthUpdate" { + let u = j_data + .get("u") + .ok_or(ConnectorError::FormatError)? + .as_i64() + .ok_or(ConnectorError::FormatError)?; + let pu = j_data + .get("pu") + .ok_or(ConnectorError::FormatError)? + .as_i64() + .ok_or(ConnectorError::FormatError)?; + let prev_u = prev_u_map.get(symbol); + if prev_u.is_none() || pu != *prev_u.unwrap() { + warn!(%symbol, "missing depth feed has been detected."); + // todo: to circumvent API limits when repeated occurrences of missing depth + // feed happen within a short timeframe, implementing a backoff mechanism + // may be necessary. + let symbol_ = symbol.to_string(); + let writer_tx_ = writer_tx.clone(); + tokio::spawn(async move { + match fetch_depth_snapshot(&symbol_).await { + Ok(data) => { + let recv_time = Utc::now(); + let _ = writer_tx_.send((recv_time, symbol_, data)); + } + Err(error) => { + error!( + symbol = symbol_, + ?error, + "couldn't fetch the depth snapshot." + ); + } + } + }); + } + *prev_u_map.entry(symbol.to_string()).or_insert(0) = u; + } + let _ = writer_tx.send((recv_time, symbol.to_string(), data)); + } + } + Ok(()) +} + +pub async fn run_collection( + streams: Vec, + symbols: Vec, + writer_tx: UnboundedSender<(DateTime, String, String)>, +) -> Result<(), anyhow::Error> { + let mut prev_u_map = HashMap::new(); + let (ws_tx, mut ws_rx) = unbounded_channel(); + let h = tokio::spawn(keep_connection(streams, symbols, ws_tx.clone())); + loop { + match ws_rx.recv().await { + Some((recv_time, data)) => { + if let Err(error) = handle(&mut prev_u_map, &writer_tx, recv_time, data) { + error!(?error, "couldn't handle the received data."); + } + } + None => { + break; + } + } + } + let _ = h.await; + Ok(()) +} diff --git a/collector/src/main.rs b/collector/src/main.rs index 7518151..aafc702 100644 --- a/collector/src/main.rs +++ b/collector/src/main.rs @@ -5,7 +5,9 @@ use tracing::{error, info}; use crate::file::Writer; -mod binancefutures; +mod binance; +mod binancefuturescm; +mod binancefuturesum; mod bybit; mod error; mod file; @@ -32,10 +34,9 @@ async fn main() -> Result<(), anyhow::Error> { let (writer_tx, mut writer_rx) = unbounded_channel(); let handle = match args.exchange.as_str() { - "binancefutures" => { + "binancefuturesum" => { let streams = vec![ "$symbol@trade", - // "$symbol@aggTrade", "$symbol@bookTicker", "$symbol@depth@0ms", // "$symbol@@markPrice@1s" @@ -44,12 +45,37 @@ async fn main() -> Result<(), anyhow::Error> { .map(|stream| stream.to_string()) .collect(); - tokio::spawn(binancefutures::run_collection( + tokio::spawn(binancefuturesum::run_collection( streams, args.symbols, writer_tx, )) } + "binancefuturescm" => { + let streams = vec![ + "$symbol@trade", + "$symbol@bookTicker", + "$symbol@depth@0ms", + // "$symbol@@markPrice@1s" + ] + .iter() + .map(|stream| stream.to_string()) + .collect(); + + tokio::spawn(binancefuturescm::run_collection( + streams, + args.symbols, + writer_tx, + )) + } + "binance" => { + let streams = vec!["$symbol@trade", "$symbol@bookTicker", "$symbol@depth@100ms"] + .iter() + .map(|stream| stream.to_string()) + .collect(); + + tokio::spawn(binance::run_collection(streams, args.symbols, writer_tx)) + } "bybit" => { let topics = vec![ "orderbook.1.$symbol",