Skip to content

Commit

Permalink
feat(rust): add Binance Spot and Binance Coin-M Futures collector.
Browse files Browse the repository at this point in the history
  • Loading branch information
nkaz001 committed Aug 8, 2024
1 parent c870bd5 commit f34c013
Show file tree
Hide file tree
Showing 7 changed files with 537 additions and 4 deletions.
157 changes: 157 additions & 0 deletions collector/src/binance/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
use std::{
io,
io::ErrorKind,
time::{Duration, Instant},
};

use anyhow::Error;
use chrono::{DateTime, Utc};
use futures_util::{SinkExt, StreamExt};
use reqwest;
use serde_json;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio_tungstenite::{
connect_async,
tungstenite::{client::IntoClientRequest, Message},
};
use tracing::{error, warn};

pub async fn fetch_symbol_list() -> Result<Vec<String>, reqwest::Error> {
Ok(reqwest::Client::new()
.get("https://api.binance.com/api/v3/exchangeInfo")
.header("Accept", "application/json")
.send()
.await?
.json::<serde_json::Value>()
.await?
.get("symbols")
.unwrap()
.as_array()
.unwrap()
.iter()
.map(|j_symbol| {
j_symbol
.get("symbol")
.unwrap()
.as_str()
.unwrap()
.to_string()
})
.collect())
}

pub async fn fetch_depth_snapshot(symbol: &str) -> Result<String, reqwest::Error> {
Ok(reqwest::Client::new()
.get(format!(
"https://api.binance.com/api/v3/depth?symbol={symbol}&limit=1000"
))
.header("Accept", "application/json")
.send()
.await?
.text()
.await?)
}

pub async fn connect(
url: &str,
ws_tx: UnboundedSender<(DateTime<Utc>, String)>,
) -> Result<(), anyhow::Error> {
let request = url.into_client_request()?;
let (ws_stream, _) = connect_async(request).await?;
let (mut write, mut read) = ws_stream.split();
let (tx, mut rx) = unbounded_channel::<()>();

tokio::spawn(async move {
loop {
match rx.recv().await {
Some(_) => {
if let Err(_) = write.send(Message::Pong(Vec::new())).await {
return;
}
}
None => {
break;
}
}
}
});

loop {
match read.next().await {
Some(Ok(Message::Text(text))) => {
let recv_time = Utc::now();
if let Err(_) = ws_tx.send((recv_time, text)) {
break;
}
}
Some(Ok(Message::Binary(_))) => {}
Some(Ok(Message::Ping(_))) => {
tx.send(()).unwrap();
}
Some(Ok(Message::Pong(_))) => {}
Some(Ok(Message::Close(close_frame))) => {
warn!(?close_frame, "closed");
return Err(Error::from(io::Error::new(
ErrorKind::ConnectionAborted,
"closed",
)));
}
Some(Ok(Message::Frame(_))) => {}
Some(Err(e)) => {
return Err(Error::from(e));
}
None => {
break;
}
}
}
Ok(())
}

pub async fn keep_connection(
streams: Vec<String>,
symbol_list: Vec<String>,
ws_tx: UnboundedSender<(DateTime<Utc>, String)>,
) {
let mut error_count = 0;
loop {
let connect_time = Instant::now();
let streams_str = symbol_list
.iter()
.map(|pair| {
streams
.iter()
.cloned()
.map(|stream| {
stream
.replace("$symbol", pair.to_lowercase().as_str())
.to_string()
})
.collect::<Vec<_>>()
})
.flatten()
.collect::<Vec<_>>()
.join("/");
if let Err(error) = connect(
&format!("wss://stream.binance.com:9443/stream?streams={streams_str}"),
ws_tx.clone(),
)
.await
{
error!(?error, "websocket error");
error_count += 1;
if connect_time.elapsed() > Duration::from_secs(30) {
error_count = 0;
}
if error_count > 3 {
tokio::time::sleep(Duration::from_secs(1)).await;
} else if error_count > 10 {
tokio::time::sleep(Duration::from_secs(5)).await;
} else if error_count > 20 {
tokio::time::sleep(Duration::from_secs(10)).await;
}
} else {
break;
}
}
}
96 changes: 96 additions & 0 deletions collector/src/binance/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
mod http;

use std::collections::HashMap;

use chrono::{DateTime, Utc};
pub use http::{fetch_depth_snapshot, fetch_symbol_list, keep_connection};
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tracing::{error, warn};

use crate::error::ConnectorError;

fn handle(
prev_u_map: &mut HashMap<String, i64>,
writer_tx: &UnboundedSender<(DateTime<Utc>, String, String)>,
recv_time: DateTime<Utc>,
data: String,
) -> Result<(), ConnectorError> {
let j: serde_json::Value = serde_json::from_str(&data)?;
if let Some(j_data) = j.get("data") {
if let Some(j_symbol) = j_data
.as_object()
.ok_or(ConnectorError::FormatError)?
.get("s")
{
let symbol = j_symbol.as_str().ok_or(ConnectorError::FormatError)?;
let ev = j_data
.get("e")
.ok_or(ConnectorError::FormatError)?
.as_str()
.ok_or(ConnectorError::FormatError)?;
if ev == "depthUpdate" {
let u = j_data
.get("u")
.ok_or(ConnectorError::FormatError)?
.as_i64()
.ok_or(ConnectorError::FormatError)?;
let U = j_data
.get("U")
.ok_or(ConnectorError::FormatError)?
.as_i64()
.ok_or(ConnectorError::FormatError)?;
let prev_u = prev_u_map.get(symbol);
if prev_u.is_none() || U != *prev_u.unwrap() + 1 {
warn!(%symbol, "missing depth feed has been detected.");
// todo: to circumvent API limits when repeated occurrences of missing depth
// feed happen within a short timeframe, implementing a backoff mechanism
// may be necessary.
let symbol_ = symbol.to_string();
let writer_tx_ = writer_tx.clone();
tokio::spawn(async move {
match fetch_depth_snapshot(&symbol_).await {
Ok(data) => {
let recv_time = Utc::now();
let _ = writer_tx_.send((recv_time, symbol_, data));
}
Err(error) => {
error!(
symbol = symbol_,
?error,
"couldn't fetch the depth snapshot."
);
}
}
});
}
*prev_u_map.entry(symbol.to_string()).or_insert(0) = u;
}
let _ = writer_tx.send((recv_time, symbol.to_string(), data));
}
}
Ok(())
}

pub async fn run_collection(
streams: Vec<String>,
symbols: Vec<String>,
writer_tx: UnboundedSender<(DateTime<Utc>, String, String)>,
) -> Result<(), anyhow::Error> {
let mut prev_u_map = HashMap::new();
let (ws_tx, mut ws_rx) = unbounded_channel();
let h = tokio::spawn(keep_connection(streams, symbols, ws_tx.clone()));
loop {
match ws_rx.recv().await {
Some((recv_time, data)) => {
if let Err(error) = handle(&mut prev_u_map, &writer_tx, recv_time, data) {
error!(?error, "couldn't handle the received data.");
}
}
None => {
break;
}
}
}
let _ = h.await;
Ok(())
}
Loading

0 comments on commit f34c013

Please sign in to comment.