Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(deps): use timed-map crate instead of internal ExpirableMap #2247

Draft
wants to merge 1 commit into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mm2src/coins/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ sha3 = "0.9"
utxo_signer = { path = "utxo_signer" }
# using the same version as cosmrs
tendermint-rpc = { version = "0.34", default-features = false }
timed-map = "0.1.0"
Copy link
Member

@onur-ozkan onur-ozkan Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so you know, we should use version 1.1.

tokio-tungstenite-wasm = { git = "https://github.com/KomodoPlatform/tokio-tungstenite-wasm", rev = "d20abdb", features = ["rustls-tls-native-roots"]}
url = { version = "2.2.2", features = ["serde"] }
uuid = { version = "1.2.2", features = ["fast-rng", "serde", "v4"] }
Expand Down
10 changes: 5 additions & 5 deletions mm2src/coins/eth/web3_transport/websocket_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::eth::web3_transport::Web3SendOut;
use crate::eth::{EthCoin, RpcTransportEventHandlerShared};
use crate::{MmCoin, RpcTransportEventHandler};
use common::executor::{AbortSettings, SpawnAbortable, Timer};
use common::expirable_map::ExpirableMap;
use common::log;
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::channel::oneshot;
Expand All @@ -25,6 +24,7 @@ use proxy_signature::{ProxySign, RawMessage};
use std::sync::atomic::AtomicBool;
use std::sync::{atomic::{AtomicUsize, Ordering},
Arc};
use timed_map::{StdClock, TimedMap};
use tokio_tungstenite_wasm::WebSocketStream;
use web3::error::{Error, TransportError};
use web3::helpers::to_string;
Expand Down Expand Up @@ -137,15 +137,15 @@ impl WebsocketTransport {
&self,
request: Option<ControllerMessage>,
wsocket: &mut WebSocketStream,
response_notifiers: &mut ExpirableMap<usize, oneshot::Sender<Vec<u8>>>,
response_notifiers: &mut TimedMap<StdClock, usize, oneshot::Sender<Vec<u8>>>,
) -> OuterAction {
match request {
Some(ControllerMessage::Request(WsRequest {
request_id,
serialized_request,
response_notifier,
})) => {
response_notifiers.insert(
response_notifiers.insert_expirable(
request_id,
response_notifier,
// Since request will be cancelled when timeout occurs, we are free to drop its state.
Expand Down Expand Up @@ -188,7 +188,7 @@ impl WebsocketTransport {
async fn handle_response(
&self,
message: Option<Result<tokio_tungstenite_wasm::Message, tokio_tungstenite_wasm::Error>>,
response_notifiers: &mut ExpirableMap<usize, oneshot::Sender<Vec<u8>>>,
response_notifiers: &mut TimedMap<StdClock, usize, oneshot::Sender<Vec<u8>>>,
) -> OuterAction {
match message {
Some(Ok(tokio_tungstenite_wasm::Message::Text(inc_event))) => {
Expand Down Expand Up @@ -249,7 +249,7 @@ impl WebsocketTransport {
let _guard = self.connection_guard.lock().await;

// List of awaiting requests
let mut response_notifiers: ExpirableMap<RequestId, oneshot::Sender<Vec<u8>>> = ExpirableMap::default();
let mut response_notifiers: TimedMap<StdClock, RequestId, oneshot::Sender<Vec<u8>>> = TimedMap::default();

let mut wsocket = match self
.attempt_to_establish_socket_connection(MAX_ATTEMPTS, SLEEP_DURATION)
Expand Down
2 changes: 1 addition & 1 deletion mm2src/common/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ pub mod crash_reports;
pub mod custom_futures;
pub mod custom_iter;
#[path = "executor/mod.rs"] pub mod executor;
pub mod expirable_map;
pub mod expirable_entry;
pub mod number_type_casting;
pub mod password_policy;
pub mod seri;
Expand Down
34 changes: 34 additions & 0 deletions mm2src/common/expirable_entry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//! This module provides a cross-compatible map that associates values with keys and supports expiring entries.
//!
//! Designed for performance-oriented use-cases utilizing `FxHashMap` under the hood,
//! and is not suitable for cryptographic purposes.

use instant::{Duration, Instant};
#[derive(Clone, Debug)]
pub struct ExpirableEntry<V> {
pub(crate) value: V,
pub(crate) expires_at: Instant,
}

impl<V> ExpirableEntry<V> {
#[inline(always)]
pub fn new(v: V, exp: Duration) -> Self {
Self {
expires_at: Instant::now() + exp,
value: v,
}
}

#[inline(always)]
pub fn get_element(&self) -> &V { &self.value }

#[inline(always)]
pub fn update_value(&mut self, v: V) { self.value = v }

#[inline(always)]
pub fn update_expiration(&mut self, expires_at: Instant) { self.expires_at = expires_at }

/// Checks whether entry has longer ttl than the given one.
#[inline(always)]
pub fn has_longer_life_than(&self, min_ttl: Duration) -> bool { self.expires_at > Instant::now() + min_ttl }
}
163 changes: 0 additions & 163 deletions mm2src/common/expirable_map.rs

This file was deleted.

2 changes: 1 addition & 1 deletion mm2src/common/time_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::collections::hash_map::{self,
use std::collections::VecDeque;
use std::time::Duration;

use crate::expirable_map::ExpirableEntry;
use crate::expirable_entry::ExpirableEntry;

#[derive(Debug)]
pub struct TimeCache<Key, Value> {
Expand Down
1 change: 1 addition & 0 deletions mm2src/mm2_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ ser_error = { path = "../derives/ser_error" }
ser_error_derive = { path = "../derives/ser_error_derive" }
serde_json = { version = "1", features = ["preserve_order", "raw_value"] }
shared_ref_counter = { path = "../common/shared_ref_counter" }
timed-map = "0.1.0"
uuid = { version = "1.2.2", features = ["fast-rng", "serde", "v4"] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
Expand Down
6 changes: 3 additions & 3 deletions mm2src/mm2_core/src/data_asker.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use common::expirable_map::ExpirableMap;
use common::{HttpStatusCode, StatusCode};
use derive_more::Display;
use futures::channel::oneshot;
Expand All @@ -12,6 +11,7 @@ use serde::{Deserialize, Serialize};
use serde_json::json;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::Arc;
use timed_map::{StdClock, TimedMap};

use crate::mm_ctx::{MmArc, MmCtx};

Expand All @@ -20,7 +20,7 @@ const EVENT_NAME: &str = "DATA_NEEDED";
#[derive(Clone, Debug, Default)]
pub struct DataAsker {
data_id: Arc<AtomicUsize>,
awaiting_asks: Arc<AsyncMutex<ExpirableMap<usize, oneshot::Sender<serde_json::Value>>>>,
awaiting_asks: Arc<AsyncMutex<TimedMap<StdClock, usize, oneshot::Sender<serde_json::Value>>>>,
}

#[derive(Debug, Display)]
Expand Down Expand Up @@ -59,7 +59,7 @@ impl MmCtx {
.awaiting_asks
.lock()
.await
.insert(data_id, sender, timeout);
.insert_expirable(data_id, sender, timeout);
}

let input = json!({
Expand Down
10 changes: 5 additions & 5 deletions mm2src/mm2_core/src/mm_ctx.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
#[cfg(feature = "track-ctx-pointer")]
use common::executor::Timer;
use common::executor::{abortable_queue::{AbortableQueue, WeakSpawner},
graceful_shutdown, AbortSettings, AbortableSystem, SpawnAbortable, SpawnFuture};
use common::log::{self, LogLevel, LogOnError, LogState};
use common::{cfg_native, cfg_wasm32, small_rng};
use common::{executor::{abortable_queue::{AbortableQueue, WeakSpawner},
graceful_shutdown, AbortSettings, AbortableSystem, SpawnAbortable, SpawnFuture},
expirable_map::ExpirableMap};
use futures::channel::oneshot;
use futures::lock::Mutex as AsyncMutex;
use gstuff::{try_s, Constructible, ERR, ERRL};
Expand All @@ -23,6 +22,7 @@ use std::fmt;
use std::future::Future;
use std::ops::Deref;
use std::sync::{Arc, Mutex};
use timed_map::{StdClock, TimedMap};

use crate::data_asker::DataAsker;

Expand Down Expand Up @@ -146,7 +146,7 @@ pub struct MmCtx {
#[cfg(not(target_arch = "wasm32"))]
pub async_sqlite_connection: Constructible<Arc<AsyncMutex<AsyncConnection>>>,
/// Links the RPC context to the P2P context to handle health check responses.
pub healthcheck_response_handler: AsyncMutex<ExpirableMap<PeerAddress, oneshot::Sender<()>>>,
pub healthcheck_response_handler: AsyncMutex<TimedMap<StdClock, PeerAddress, oneshot::Sender<()>>>,
}

impl MmCtx {
Expand Down Expand Up @@ -196,7 +196,7 @@ impl MmCtx {
nft_ctx: Mutex::new(None),
#[cfg(not(target_arch = "wasm32"))]
async_sqlite_connection: Constructible::default(),
healthcheck_response_handler: AsyncMutex::new(ExpirableMap::default()),
healthcheck_response_handler: AsyncMutex::new(TimedMap::default()),
}
}

Expand Down
Loading
Loading