Skip to content

Commit

Permalink
Adjust deps and code
Browse files Browse the repository at this point in the history
  • Loading branch information
Dzejkop committed Oct 14, 2024
1 parent c22539f commit 704a965
Show file tree
Hide file tree
Showing 12 changed files with 1,206 additions and 1,199 deletions.
1,861 changes: 989 additions & 872 deletions Cargo.lock

Large diffs are not rendered by default.

25 changes: 18 additions & 7 deletions crates/tx-sitter-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,41 @@
use data::{GetTxResponse, SendTxRequest, SendTxResponse, TxStatus};
use reqwest::header::HeaderMap;
use reqwest::{RequestBuilder, Response};
use telemetry_batteries::tracing::trace_to_headers;
use tracing::instrument;

pub mod data;

pub struct TxSitterClient {
client: reqwest::Client,
url: String,
url: String,
}

impl TxSitterClient {
pub fn new(url: impl ToString) -> Self {
Self {
client: reqwest::Client::new(),
url: url.to_string(),
url: url.to_string(),
}
}

// async fn inject_tracing_headers(req_builder: &mut RequestBuilder) {
// trace_to_
// }
fn inject_tracing_headers(req_builder: RequestBuilder) -> RequestBuilder {
let mut headers = HeaderMap::new();

trace_to_headers(&mut headers);

req_builder.headers(headers)
}

async fn json_post<T, R>(&self, url: &str, body: T) -> anyhow::Result<R>
where
T: serde::Serialize,
R: serde::de::DeserializeOwned,
{
let response = self.client.post(url).json(&body).send().await?;
let response = Self::inject_tracing_headers(self.client.post(url))
.json(&body)
.send()
.await?;

let response = Self::validate_response(response).await?;

Expand All @@ -37,7 +46,9 @@ impl TxSitterClient {
where
R: serde::de::DeserializeOwned,
{
let response = self.client.get(url).send().await?;
let response = Self::inject_tracing_headers(self.client.get(url))
.send()
.await?;

let response = Self::validate_response(response).await?;

Expand Down
75 changes: 2 additions & 73 deletions src/database/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> {
Ok(())
}

#[cfg(test)]
async fn get_all_recoveries(self) -> Result<Vec<RecoveryEntry>, Error> {
Ok(
sqlx::query_as::<_, RecoveryEntry>("SELECT * FROM recoveries")
Expand Down Expand Up @@ -659,6 +660,7 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> {
Ok(())
}

#[cfg(test)]
async fn get_next_batch(self, prev_root: &Hash) -> Result<Option<BatchEntry>, Error> {
let res = sqlx::query_as::<_, BatchEntry>(
r#"
Expand Down Expand Up @@ -701,29 +703,6 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> {
Ok(res)
}

async fn get_latest_batch_with_transaction(self) -> Result<Option<BatchEntry>, Error> {
let res = sqlx::query_as::<_, BatchEntry>(
r#"
SELECT
batches.id,
batches.next_root,
batches.prev_root,
batches.created_at,
batches.batch_type,
batches.data
FROM batches
LEFT JOIN transactions ON batches.next_root = transactions.batch_next_root
WHERE transactions.batch_next_root IS NOT NULL AND batches.prev_root IS NOT NULL
ORDER BY batches.id DESC
LIMIT 1
"#,
)
.fetch_optional(self)
.await?;

Ok(res)
}

async fn get_next_batch_without_transaction(self) -> Result<Option<BatchEntry>, Error> {
let res = sqlx::query_as::<_, BatchEntry>(
r#"
Expand Down Expand Up @@ -767,26 +746,6 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> {
Ok(res)
}

async fn get_all_batches_after(self, id: i64) -> Result<Vec<BatchEntry>, Error> {
let res = sqlx::query_as::<_, BatchEntry>(
r#"
SELECT
id,
next_root,
prev_root,
created_at,
batch_type,
data
FROM batches WHERE id >= $1 ORDER BY id ASC
"#,
)
.bind(id)
.fetch_all(self)
.await?;

Ok(res)
}

#[instrument(skip(self), level = "debug")]
async fn delete_batches_after_root(self, root: &Hash) -> Result<(), Error> {
let query = sqlx::query(
Expand All @@ -813,15 +772,6 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> {
Ok(())
}

async fn root_in_batch_chain(self, root: &Hash) -> Result<bool, Error> {
let query = sqlx::query(
r#"SELECT exists(SELECT 1 FROM batches where prev_root = $1 OR next_root = $1)"#,
)
.bind(root);
let row_unprocessed = self.fetch_one(query).await?;
Ok(row_unprocessed.get::<bool, _>(0))
}

async fn insert_new_transaction(
self,
transaction_id: &String,
Expand All @@ -842,25 +792,4 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> {
self.execute(query).await?;
Ok(())
}

async fn get_transaction_for_batch(
self,
next_root: &Hash,
) -> Result<Option<TransactionEntry>, Error> {
let res = sqlx::query_as::<_, TransactionEntry>(
r#"
SELECT
transaction_id,
batch_next_root,
created_at
FROM transactions WHERE batch_next_root = $1
LIMIT 1
"#,
)
.bind(next_root)
.fetch_optional(self)
.await?;

Ok(res)
}
}
32 changes: 15 additions & 17 deletions src/database/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,22 @@ use crate::identity_tree::{Hash, UnprocessedStatus};
use crate::prover::identity::Identity;

pub struct UnprocessedCommitment {
pub commitment: Hash,
pub status: UnprocessedStatus,
pub created_at: DateTime<Utc>,
pub processed_at: Option<DateTime<Utc>>,
pub error_message: Option<String>,
pub commitment: Hash,
pub status: UnprocessedStatus,
pub created_at: DateTime<Utc>,
pub processed_at: Option<DateTime<Utc>>,
pub error_message: Option<String>,
pub eligibility_timestamp: DateTime<Utc>,
}

#[derive(FromRow)]
pub struct RecoveryEntry {
// existing commitment is used in tests only, but recoveries in general
// are used in production code via the FromRow trait
// so removing this field would break the production code
#[allow(unused)]
pub existing_commitment: Hash,
pub new_commitment: Hash,
pub new_commitment: Hash,
}

pub struct LatestInsertionEntry {
Expand Down Expand Up @@ -68,26 +72,20 @@ impl std::fmt::Display for BatchType {

#[derive(Debug, Clone, FromRow)]
pub struct BatchEntry {
pub id: i64,
pub next_root: Hash,
pub id: i64,
pub next_root: Hash,
// In general prev_root is present all the time except the first row (head of the batches
// chain)
pub prev_root: Option<Hash>,
pub prev_root: Option<Hash>,
pub created_at: DateTime<Utc>,
pub batch_type: BatchType,
pub data: sqlx::types::Json<BatchEntryData>,
pub data: sqlx::types::Json<BatchEntryData>,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct BatchEntryData {
pub identities: Vec<Identity>,
pub indexes: Vec<usize>,
}

#[derive(Debug, Clone, FromRow)]
pub struct TransactionEntry {
pub batch_next_root: Hash,
pub transaction_id: String,
pub indexes: Vec<usize>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down
18 changes: 7 additions & 11 deletions src/ethereum/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use chrono::{Duration as ChronoDuration, Utc};
use ethers::abi::Error as AbiError;
use ethers::providers::{Http, Middleware, Provider};
use ethers::types::{BlockId, BlockNumber, Chain, U256};
use futures::{try_join, FutureExt};
use futures::try_join;
use thiserror::Error;
use tracing::{error, info};
use url::Url;
Expand All @@ -16,9 +16,8 @@ type InnerProvider = Provider<RpcLogger<Http>>;

#[derive(Clone, Debug)]
pub struct ReadProvider {
inner: InnerProvider,
inner: InnerProvider,
pub chain_id: U256,
pub legacy: bool,
}

impl ReadProvider {
Expand All @@ -30,7 +29,7 @@ impl ReadProvider {
// TODO: Does the WebSocket impl handle dropped connections by
// reconnecting? What is the timeout on stalled connections? What is
// the retry policy?
let (provider, chain_id, eip1559) = {
let (provider, chain_id) = {
info!(
provider = %url,
"Connecting to provider"
Expand All @@ -40,13 +39,10 @@ impl ReadProvider {
let provider = Provider::new(logger);

// Fetch state of the chain.
let (version, chain_id, latest_block, eip1559) = try_join!(
let (version, chain_id, latest_block) = try_join!(
provider.client_version(),
provider.get_chainid(),
provider.get_block(BlockId::Number(BlockNumber::Latest)),
provider
.fee_history(1, BlockNumber::Latest, &[])
.map(|r| Ok(r.is_ok()))
)?;

// Identify chain.
Expand All @@ -63,7 +59,7 @@ impl ReadProvider {
.number
.ok_or_else(|| anyhow!("Could not read latest block number"))?;
let block_time = latest_block.time()?;
info!(%version, %chain_id, %chain, %eip1559, %block_number, ?block_hash, %block_time, "Connected to Ethereum provider");
info!(%version, %chain_id, %chain, %block_number, ?block_hash, %block_time, "Connected to Ethereum provider");

// Sanity check the block timestamp
let now = Utc::now();
Expand All @@ -77,13 +73,13 @@ impl ReadProvider {
// Log an error, but proceed anyway since this doesn't technically block us.
error!(%now, %block_time, %block_age, "Block time is more than 30 minutes from now.");
}
(provider, chain_id, eip1559)

(provider, chain_id)
};

Ok(Self {
inner: provider,
chain_id,
legacy: !eip1559,
})
}
}
Expand Down
29 changes: 11 additions & 18 deletions src/server/custom_middleware/logging_layer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#![allow(clippy::cast_possible_truncation)]

use axum::{body::Body, extract::Request};
use axum::body::Body;
use axum::extract::Request;
use axum::http::StatusCode;
use axum::middleware::Next;
use axum::response::Response;
use bytes::Bytes;
use hyper::Method;
use bytes::{BufMut, Bytes};
use futures::StreamExt;
use hyper::{body::Body as _, Method};
use telemetry_batteries::tracing::{trace_from_headers, trace_to_headers};
use tracing::{error, info, info_span, warn, Instrument};

Expand Down Expand Up @@ -122,7 +124,7 @@ async fn handle_response(
);
}

let body = axum::body::boxed(Body::from(response_body));
let body = Body::from(response_body);

Response::from_parts(parts, body)
} else {
Expand All @@ -142,13 +144,7 @@ async fn handle_response(

/// Reads the body of a request into a `Bytes` object chunk by chunk
/// and returns an error if the body is larger than `MAX_REQUEST_BODY_SIZE`.
async fn body_to_bytes_safe<B>(body: B) -> Result<Bytes, StatusCode>
where
B: HttpBody,
<B as HttpBody>::Error: std::error::Error,
{
use bytes::BufMut;

async fn body_to_bytes_safe(body: Body) -> Result<Bytes, StatusCode> {
let size_hint = body
.size_hint()
.upper()
Expand All @@ -164,10 +160,11 @@ where
}

let mut body_bytes = Vec::with_capacity(size_hint as usize);
let mut data_stream = body.into_data_stream();

futures_util::pin_mut!(body);
// futures_util::pin_mut!(body);

while let Some(chunk) = body.data().await {
while let Some(chunk) = data_stream.next().await {
let chunk = chunk.map_err(|error| {
error!("Error reading body: {}", error);
StatusCode::INTERNAL_SERVER_ERROR
Expand All @@ -189,11 +186,7 @@ where
Ok(body_bytes.into())
}

async fn body_to_string<B>(body: B) -> Result<String, StatusCode>
where
B: HttpBody,
<B as HttpBody>::Error: std::error::Error,
{
async fn body_to_string(body: Body) -> Result<String, StatusCode> {
let body_bytes = body_to_bytes_safe(body).await?;

let s = match String::from_utf8(body_bytes.to_vec()) {
Expand Down
4 changes: 2 additions & 2 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
pub mod error;

use std::net::TcpListener;
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -13,6 +12,7 @@ use error::Error;
use hyper::header::CONTENT_TYPE;
use hyper::StatusCode;
use prometheus::{Encoder, TextEncoder};
use tokio::net::TcpListener;
use tracing::info;

use crate::app::App;
Expand Down Expand Up @@ -151,7 +151,7 @@ pub async fn run(
shutdown: Arc<Shutdown>,
) -> anyhow::Result<()> {
info!("Will listen on {}", config.address);
let listener = TcpListener::bind(config.address)?;
let listener = TcpListener::bind(config.address).await?;

bind_from_listener(app, config.serve_timeout, listener, shutdown).await?;

Expand Down
Loading

0 comments on commit 704a965

Please sign in to comment.