Skip to content

Commit

Permalink
review(onur): don't expose ctx from MmCoin
Browse files Browse the repository at this point in the history
pass the streaming manager down to where it should be used instead

this does so for utxo balance history, tendermint to come
  • Loading branch information
mariocynicys committed Nov 6, 2024
1 parent 78cece1 commit 3a467dd
Show file tree
Hide file tree
Showing 23 changed files with 36 additions and 38 deletions.
2 changes: 0 additions & 2 deletions mm2src/coins/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5447,8 +5447,6 @@ impl EthTxFeeDetails {
impl MmCoin for EthCoin {
fn is_asset_chain(&self) -> bool { false }

fn get_ctx(&self) -> Option<MmArc> { MmArc::from_weak(&self.ctx) }

fn spawner(&self) -> WeakSpawner { self.abortable_system.weak_spawner() }

fn get_raw_transaction(&self, req: RawTransactionRequest) -> RawTransactionFut {
Expand Down
2 changes: 0 additions & 2 deletions mm2src/coins/lightning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1267,8 +1267,6 @@ struct LightningProtocolInfo {
impl MmCoin for LightningCoin {
fn is_asset_chain(&self) -> bool { false }

fn get_ctx(&self) -> Option<MmArc> { self.platform.coin.get_ctx() }

fn spawner(&self) -> WeakSpawner { self.platform.abortable_system.weak_spawner() }

fn get_raw_transaction(&self, _req: RawTransactionRequest) -> RawTransactionFut {
Expand Down
6 changes: 0 additions & 6 deletions mm2src/coins/lp_coins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3271,12 +3271,6 @@ pub trait MmCoin:
coin_conf["wallet_only"].as_bool().unwrap_or(false)
}

/// Returns the MmArc context the coin is running on top of.
///
/// This should really never fail (return `None`), but it's here since coins hold a weak
/// reference to the context (TODO: why?).
fn get_ctx(&self) -> Option<MmArc>;

/// Returns a spawner pinned to the coin.
///
/// # Note
Expand Down
2 changes: 0 additions & 2 deletions mm2src/coins/qrc20.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1300,8 +1300,6 @@ impl MarketCoinOps for Qrc20Coin {
impl MmCoin for Qrc20Coin {
fn is_asset_chain(&self) -> bool { utxo_common::is_asset_chain(&self.utxo) }

fn get_ctx(&self) -> Option<MmArc> { MmArc::from_weak(&self.as_ref().ctx) }

fn spawner(&self) -> WeakSpawner { self.as_ref().abortable_system.weak_spawner() }

fn withdraw(&self, req: WithdrawRequest) -> WithdrawFut {
Expand Down
2 changes: 0 additions & 2 deletions mm2src/coins/siacoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,6 @@ pub struct SiaCoinProtocolInfo;
impl MmCoin for SiaCoin {
fn is_asset_chain(&self) -> bool { false }

fn get_ctx(&self) -> Option<MmArc> { unimplemented!() }

fn spawner(&self) -> WeakSpawner { unimplemented!() }

fn get_raw_transaction(&self, _req: RawTransactionRequest) -> RawTransactionFut { unimplemented!() }
Expand Down
2 changes: 0 additions & 2 deletions mm2src/coins/tendermint/tendermint_coin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2160,8 +2160,6 @@ pub async fn get_ibc_chain_list() -> IBCChainRegistriesResult {
impl MmCoin for TendermintCoin {
fn is_asset_chain(&self) -> bool { false }

fn get_ctx(&self) -> Option<MmArc> { MmArc::from_weak(&self.ctx) }

fn wallet_only(&self, ctx: &MmArc) -> bool {
let coin_conf = crate::coin_conf(ctx, self.ticker());
let wallet_only_conf = coin_conf["wallet_only"].as_bool().unwrap_or(false);
Expand Down
2 changes: 0 additions & 2 deletions mm2src/coins/tendermint/tendermint_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,6 @@ impl MarketCoinOps for TendermintToken {
impl MmCoin for TendermintToken {
fn is_asset_chain(&self) -> bool { false }

fn get_ctx(&self) -> Option<MmArc> { self.platform_coin.get_ctx() }

fn wallet_only(&self, ctx: &MmArc) -> bool {
let coin_conf = crate::coin_conf(ctx, self.ticker());
let wallet_only_conf = coin_conf["wallet_only"].as_bool().unwrap_or(false);
Expand Down
3 changes: 2 additions & 1 deletion mm2src/coins/tendermint/tendermint_tx_history_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use cosmrs::tendermint::abci::{Code as TxCode, EventAttribute};
use cosmrs::tx::Fee;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::MmResult;
use mm2_event_stream::StreamingManager;
use mm2_number::BigDecimal;
use mm2_state_machine::prelude::*;
use mm2_state_machine::state_machine::StateMachineTrait;
Expand Down Expand Up @@ -588,7 +589,7 @@ where
"could not get rpc client"
);

let streaming_manager = coin.get_ctx().unwrap().event_stream_manager.clone();
let streaming_manager: StreamingManager = panic!();
loop {
let response = try_or_return_stopped_as_err!(
client
Expand Down
2 changes: 0 additions & 2 deletions mm2src/coins/test_coin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,6 @@ impl WatcherOps for TestCoin {
impl MmCoin for TestCoin {
fn is_asset_chain(&self) -> bool { unimplemented!() }

fn get_ctx(&self) -> Option<MmArc> { unimplemented!() }

fn spawner(&self) -> WeakSpawner { unimplemented!() }

fn get_raw_transaction(&self, _req: RawTransactionRequest) -> RawTransactionFut { unimplemented!() }
Expand Down
2 changes: 0 additions & 2 deletions mm2src/coins/utxo/bch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1288,8 +1288,6 @@ impl MarketCoinOps for BchCoin {
impl MmCoin for BchCoin {
fn is_asset_chain(&self) -> bool { utxo_common::is_asset_chain(&self.utxo_arc) }

fn get_ctx(&self) -> Option<MmArc> { MmArc::from_weak(&self.as_ref().ctx) }

fn spawner(&self) -> WeakSpawner { self.as_ref().abortable_system.weak_spawner() }

fn get_raw_transaction(&self, req: RawTransactionRequest) -> RawTransactionFut {
Expand Down
2 changes: 0 additions & 2 deletions mm2src/coins/utxo/qtum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -903,8 +903,6 @@ impl MarketCoinOps for QtumCoin {
impl MmCoin for QtumCoin {
fn is_asset_chain(&self) -> bool { utxo_common::is_asset_chain(&self.utxo_arc) }

fn get_ctx(&self) -> Option<MmArc> { MmArc::from_weak(&self.as_ref().ctx) }

fn spawner(&self) -> WeakSpawner { self.as_ref().abortable_system.weak_spawner() }

fn get_raw_transaction(&self, req: RawTransactionRequest) -> RawTransactionFut {
Expand Down
2 changes: 0 additions & 2 deletions mm2src/coins/utxo/slp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1600,8 +1600,6 @@ impl From<SlpFeeDetails> for TxFeeDetails {
impl MmCoin for SlpToken {
fn is_asset_chain(&self) -> bool { false }

fn get_ctx(&self) -> Option<MmArc> { MmArc::from_weak(&self.as_ref().ctx) }

fn spawner(&self) -> WeakSpawner { self.conf.abortable_system.weak_spawner() }

fn get_raw_transaction(&self, req: RawTransactionRequest) -> RawTransactionFut {
Expand Down
2 changes: 2 additions & 0 deletions mm2src/coins/utxo/utxo_common_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ pub(super) async fn test_hd_utxo_tx_history_impl(rpc_client: ElectrumClient) {
coin.clone(),
storage,
ctx.metrics.clone(),
ctx.event_stream_manager.clone(),
current_balances.clone(),
));

Expand All @@ -316,6 +317,7 @@ pub(super) async fn test_hd_utxo_tx_history_impl(rpc_client: ElectrumClient) {
coin.clone(),
storage,
ctx.metrics.clone(),
ctx.event_stream_manager.clone(),
current_balances,
));

Expand Down
2 changes: 0 additions & 2 deletions mm2src/coins/utxo/utxo_standard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -975,8 +975,6 @@ impl MarketCoinOps for UtxoStandardCoin {
impl MmCoin for UtxoStandardCoin {
fn is_asset_chain(&self) -> bool { utxo_common::is_asset_chain(&self.utxo_arc) }

fn get_ctx(&self) -> Option<MmArc> { MmArc::from_weak(&self.as_ref().ctx) }

fn spawner(&self) -> WeakSpawner { self.as_ref().abortable_system.weak_spawner() }

fn get_raw_transaction(&self, req: RawTransactionRequest) -> RawTransactionFut {
Expand Down
10 changes: 8 additions & 2 deletions mm2src/coins/utxo/utxo_tx_history_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use common::log::{error, info};
use derive_more::Display;
use keys::Address;
use mm2_err_handle::prelude::*;
use mm2_event_stream::StreamingManager;
use mm2_metrics::MetricsArc;
use mm2_number::BigDecimal;
use mm2_state_machine::prelude::*;
Expand Down Expand Up @@ -146,6 +147,8 @@ struct UtxoTxHistoryStateMachine<Coin: UtxoTxHistoryOps, Storage: TxHistoryStora
coin: Coin,
storage: Storage,
metrics: MetricsArc,
/// An instance of the streaming manager used for sending TX updates in realtime.
streaming_manager: StreamingManager,
/// Last requested balances of the activated coin's addresses.
/// TODO add a `CoinBalanceState` structure and replace [`HashMap<String, BigDecimal>`] everywhere.
balances: HashMap<String, BigDecimal>,
Expand Down Expand Up @@ -590,7 +593,6 @@ where

let my_addresses = try_or_stop_unknown!(ctx.coin.my_addresses().await, "Error on getting my addresses");

let streaming_manager = ctx.coin.get_ctx().unwrap().event_stream_manager.clone();
for (tx_hash, height) in self.all_tx_ids_with_height {
let tx_hash_string = format!("{:02x}", tx_hash);
match ctx.storage.history_has_tx_hash(&wallet_id, &tx_hash_string).await {
Expand Down Expand Up @@ -622,7 +624,7 @@ where
},
};

streaming_manager
ctx.streaming_manager
.send_fn(&TxHistoryEventStreamer::derive_streamer_id(ctx.coin.ticker()), || {
tx_details.clone()
})
Expand Down Expand Up @@ -715,6 +717,7 @@ pub async fn bch_and_slp_history_loop(
coin: BchCoin,
storage: impl TxHistoryStorage,
metrics: MetricsArc,
streaming_manager: StreamingManager,
current_balance: Option<BigDecimal>,
) {
let balances = match current_balance {
Expand Down Expand Up @@ -751,6 +754,7 @@ pub async fn bch_and_slp_history_loop(
coin,
storage,
metrics,
streaming_manager,
balances,
};
state_machine
Expand All @@ -763,6 +767,7 @@ pub async fn utxo_history_loop<Coin, Storage>(
coin: Coin,
storage: Storage,
metrics: MetricsArc,
streaming_manager: StreamingManager,
current_balances: HashMap<String, BigDecimal>,
) where
Coin: UtxoTxHistoryOps,
Expand All @@ -772,6 +777,7 @@ pub async fn utxo_history_loop<Coin, Storage>(
coin,
storage,
metrics,
streaming_manager,
balances: current_balances,
};
state_machine
Expand Down
2 changes: 0 additions & 2 deletions mm2src/coins/z_coin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1662,8 +1662,6 @@ impl WatcherOps for ZCoin {
impl MmCoin for ZCoin {
fn is_asset_chain(&self) -> bool { self.utxo_arc.conf.asset_chain }

fn get_ctx(&self) -> Option<MmArc> { MmArc::from_weak(&self.as_ref().ctx) }

fn spawner(&self) -> WeakSpawner { self.as_ref().abortable_system.weak_spawner() }

fn withdraw(&self, _req: WithdrawRequest) -> WithdrawFut {
Expand Down
8 changes: 7 additions & 1 deletion mm2src/coins_activation/src/bch_with_tokens_activation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,13 @@ impl PlatformCoinWithTokensActivationOps for BchCoin {
storage: impl TxHistoryStorage + Send + 'static,
initial_balance: Option<BigDecimal>,
) {
let fut = bch_and_slp_history_loop(self.clone(), storage, ctx.metrics.clone(), initial_balance);
let fut = bch_and_slp_history_loop(
self.clone(),
storage,
ctx.metrics.clone(),
ctx.event_stream_manager.clone(),
initial_balance,
);

let settings = AbortSettings::info_on_abort(format!("bch_and_slp_history_loop stopped for {}", self.ticker()));
self.spawner().spawn_with_settings(fut, settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use coins::{lp_coinfind, lp_register_coin, CoinsContext, MmCoinEnum, RegisterCoi
use common::{log, SuccessResponse};
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::*;
use mm2_event_stream::StreamingManager;
use mm2_metrics::MetricsArc;
use mm2_number::BigDecimal;
use rpc_task::rpc_common::{CancelRpcTaskRequest, InitRpcTaskResponse, RpcTaskStatusRequest, RpcTaskUserActionRequest};
Expand Down Expand Up @@ -73,6 +74,7 @@ pub trait InitStandaloneCoinActivationOps: Into<MmCoinEnum> + Send + Sync + 'sta
&self,
metrics: MetricsArc,
storage: impl TxHistoryStorage,
streaming_manager: StreamingManager,
current_balances: HashMap<String, BigDecimal>,
);
}
Expand Down Expand Up @@ -217,6 +219,7 @@ where
coin.start_history_background_fetching(
self.ctx.metrics.clone(),
TxHistoryStorageBuilder::new(&self.ctx).build()?,
self.ctx.event_stream_manager.clone(),
current_balances,
);
}
Expand Down
4 changes: 3 additions & 1 deletion mm2src/coins_activation/src/utxo_activation/common_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crypto::{CryptoCtxError, HwRpcError};
use futures::compat::Future01CompatExt;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::*;
use mm2_event_stream::StreamingManager;
use mm2_metrics::MetricsArc;
use mm2_number::BigDecimal;
use std::collections::HashMap;
Expand Down Expand Up @@ -99,14 +100,15 @@ pub(crate) fn start_history_background_fetching<Coin>(
coin: Coin,
metrics: MetricsArc,
storage: impl TxHistoryStorage,
streaming_manager: StreamingManager,
current_balances: HashMap<String, BigDecimal>,
) where
Coin: AsRef<UtxoCoinFields> + UtxoTxHistoryOps,
{
let spawner = coin.as_ref().abortable_system.weak_spawner();

let msg = format!("'utxo_history_loop' has been aborted for {}", coin.ticker());
let fut = utxo_history_loop(coin, storage, metrics, current_balances);
let fut = utxo_history_loop(coin, storage, metrics, streaming_manager, current_balances);

let settings = AbortSettings::info_on_abort(msg);
spawner.spawn_with_settings(fut, settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use coins::utxo::utxo_builder::{UtxoArcBuilder, UtxoCoinBuilder};
use coins::CoinProtocol;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::*;
use mm2_event_stream::StreamingManager;
use mm2_metrics::MetricsArc;
use mm2_number::BigDecimal;
use serde_json::Value as Json;
Expand Down Expand Up @@ -109,8 +110,9 @@ impl InitStandaloneCoinActivationOps for BchCoin {
&self,
metrics: MetricsArc,
storage: impl TxHistoryStorage,
streaming_manager: StreamingManager,
current_balances: HashMap<String, BigDecimal>,
) {
start_history_background_fetching(self.clone(), metrics, storage, current_balances)
start_history_background_fetching(self.clone(), metrics, storage, streaming_manager, current_balances)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use coins::utxo::UtxoActivationParams;
use coins::CoinProtocol;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::*;
use mm2_event_stream::StreamingManager;
use mm2_metrics::MetricsArc;
use mm2_number::BigDecimal;
use serde_json::Value as Json;
Expand Down Expand Up @@ -83,8 +84,9 @@ impl InitStandaloneCoinActivationOps for QtumCoin {
&self,
metrics: MetricsArc,
storage: impl TxHistoryStorage,
streaming_manager: StreamingManager,
current_balances: HashMap<String, BigDecimal>,
) {
start_history_background_fetching(self.clone(), metrics, storage, current_balances)
start_history_background_fetching(self.clone(), metrics, storage, streaming_manager, current_balances)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use coins::CoinProtocol;
use futures::StreamExt;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::*;
use mm2_event_stream::StreamingManager;
use mm2_metrics::MetricsArc;
use mm2_number::BigDecimal;
use serde_json::Value as Json;
Expand Down Expand Up @@ -124,8 +125,9 @@ impl InitStandaloneCoinActivationOps for UtxoStandardCoin {
&self,
metrics: MetricsArc,
storage: impl TxHistoryStorage,
streaming_manager: StreamingManager,
current_balances: HashMap<String, BigDecimal>,
) {
start_history_background_fetching(self.clone(), metrics, storage, current_balances)
start_history_background_fetching(self.clone(), metrics, storage, streaming_manager, current_balances)
}
}
2 changes: 2 additions & 0 deletions mm2src/coins_activation/src/z_coin_activation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use derive_more::Display;
use futures::compat::Future01CompatExt;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::*;
use mm2_event_stream::StreamingManager;
use mm2_metrics::MetricsArc;
use mm2_number::BigDecimal;
use rpc_task::RpcTaskError;
Expand Down Expand Up @@ -303,6 +304,7 @@ impl InitStandaloneCoinActivationOps for ZCoin {
&self,
_metrics: MetricsArc,
_storage: impl TxHistoryStorage,
_streaming_manager: StreamingManager,
_current_balances: HashMap<String, BigDecimal>,
) {
}
Expand Down

0 comments on commit 3a467dd

Please sign in to comment.