Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(event-streaming): new streams (orders, swaps, eth_fee_estimation) #2172

Open
wants to merge 134 commits into
base: dev
Choose a base branch
from

Conversation

mariocynicys
Copy link
Collaborator

@mariocynicys mariocynicys commented Jul 22, 2024

This PR aims to support event streaming in some of MM2's API endpoints.

The work in this PR till now is only a refactor of the event streaming code after multiple iterations of discussing how this should end up looking like.

The flow goes as follows:

  • A client initially opens an SSE connection with MM2 to receive events on. For wasm, since it can't support SSEs, we use a worker to deliver the events to the client (let's consider it as SSE too).
  • If a client wants to receive some events (balance change, network topology change, new gas estimations, etc...), they need to subscribe to these events (enable these streamers) from the API.
  • A streamer is a background task that is responsible for firing some events. Initially MM2 doesn't have any streamers running. When a client enables a streamer (= subscribes to a particular event), they start receiving events from that streamer. Until they unsubscribe to them.
  • If no client is listening to some active streamer, it will shutdown. A new streamer will start again if a client re-subscribes.
  • (little irrelevant now) Multiple clients subscribing to the same event on MM2 doesn't mean we will have a new streamer for every client. Only one streamer is booted up for the first subscribed client and new subscribing clients will be sharing the same streamer.

The role of the StreamingManager:

  • Manage different clients (each client has an id cid when it first connects through SSE).
  • Add a new (cid, sid) pair (translates to: client with id cid wants to listen to the streamer with id sid). If such a streamer with sid is already running, it is just instructed that a new client is gonna be listening along, otherwise, it will be spawned in the background.
  • Stop listening (cid, sid) pair (= client with cid no longer wants to receive events from streamer with id sid, if no more clients are listening, the streamer will die).
  • Manage different streamers.
  • Manage the connections between the streamers (the background thread) and MM2. Streamers either work in a periodic manner (do some job and possibly fire some events then sleep for a while and repeat) or they need to receive a notification from MM2 (e.g. UTXO balance event streaming rely on electrum subscriptions: the electrum server sends a notification to MM2 when an address balance has changed and MM2 in turn sends this notification to the streamer responsible for firing balance events). To send some data to a streamer, StreamingManager::send(streamer_id, arbitrary_data) is used.

Breaking Changes:

  • No more event_stream_configuration in MM2 json config. Requested streams are initialized and configured dynamically through the API.
  • Thus event_stream_configuration.access_control_allow_origin config was moved to access_control_allow_origin (one scope out) b9d1218.
  • And event_stream_configuration.worker_path was changed to event_stream_worker_path (one scope out) ee32fd1.
  • No more filters are required in the initial SSE setup with MM2. (e.g. mm2.com/event-stream?filter=NETWORK,BALANCE,DATA_NEEDED:datatype -> mm2.com/event-stream?id=1), but a client id is now required (a random u64 identifier for [& generated by] the client) (e.g. mm2.com/event-stream?id=799384531)
  • How to filter the wanted events then (client gets no events by default)? The client needs to activate specific event streamers through RPC (e.g. stream::enable::balance).
  • Contrary to the note in the previous point (client gets no events by default), some special events are now delivered to the client even if they never requested/filtered them (this happens using broadcast_all). example of these events are DATA_NEEDED:datatype, and HEARTBEAT should be similar later as well.

(so for keplr integration, only ?id=<RANDOM_U64> must be added to the SSE init request and everything should work. ?filter= shall be removed)
(the <RANDOM_U64> should be stored by the client since it's required in any event streamer activation/deactivation. If for keplr only though, this isn't necessary for now since DATA_NEEDED:datatype event streamer is a special event that's broadcasted to all clients and doesn't require any sort of activation/subscription before hand)

it was confusing why a utxo standard coin was being used instead of the coin we are building, this makes a bit more sense
There was no point of the option wrapping. Simplifying stuff.
also does some housekeeping, file splitting and renaming
remove the intermediate bridge task and replace it with `filter_map` call.
also properly implement the shutdown handle
also wraps the streamermanager mutable data inside Arc<RwLock< so to be used in concurrent code.
RwLock was chosen here since we should be sending streaming data way more than editing the streamers
along with some cleanups and simplifications
Also use the `data_rx` for utxo balance events.
A cleanup for the scripthash notification channels/logic will follow
this is very messy though (because of the current state of electurm code) and will be reverted, just recording this in the history for now
made more sense to have it inside the manager. also now the manager provides the controller as a parameter in the handle method so different streamers don't have to keep a copy of ctx just to broadcast on the controller.
this is still not hooked up though. no data will be received on this rx channel till we do
`event_stream_manager.send('BALANCE:ZCoin') (or whatever the coin name is)` somewhere.
ignoring the non-wasm test for now since it needs zcash params
downloaded which we don't have an auto way to do. would be better if we
can download them in the same way we do for wasm (right from kdf).
naming was used loosely, pirate was used as if it was zombie, a previous
fix in the url constants led to this breaking.
@mariocynicys mariocynicys force-pushed the event-streaming branch 2 times, most recently from 0d3ac5f to 159445b Compare November 2, 2024 09:17
@@ -903,7 +903,9 @@ impl MarketCoinOps for QtumCoin {
impl MmCoin for QtumCoin {
fn is_asset_chain(&self) -> bool { utxo_common::is_asset_chain(&self.utxo_arc) }

fn spawner(&self) -> CoinFutSpawner { CoinFutSpawner::new(&self.as_ref().abortable_system) }
fn get_ctx(&self) -> Option<MmArc> { MmArc::from_weak(&self.as_ref().ctx) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if I mentioned this before (since my last review was a month ago), but why are we implementing this at the trait level? Coins don't create MmArc instances—they're already available at higher levels and some coins just receive them during initialization. We could likely pass MmArc where needed, as we do for certain coins. Adding this function seems unnecessary and inconsistent across different coins.

Copy link
Collaborator Author

@mariocynicys mariocynicys Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed in MmCoin so we can access it in generic impls over MmCoins, e.g.:

let streaming_manager = ctx.coin.get_ctx().unwrap().event_stream_manager.clone();

i think i have answered that somewhere in the threads above though, could you please go over it and resolve addressed comments.

p.s. in the e.g. above, we could add it in the UtxoCoinOps instead, but this completely makes no sense IMO. We could also opt for making it get_streaming_manager but i opted for get_ctx since we already have global states other than just the streaming manager and some coins already accesses this global state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it's not in

struct UtxoTxHistoryStateMachine<Coin: UtxoTxHistoryOps, Storage: TxHistoryStorage> {
coin: Coin,
storage: Storage,
metrics: MetricsArc,
/// Last requested balances of the activated coin's addresses.
/// TODO add a `CoinBalanceState` structure and replace [`HashMap<String, BigDecimal>`] everywhere.
balances: HashMap<String, BigDecimal>,
}
?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a good place as well. im down to storing it (streaming manager) there if u are keen on the coins to expose no ctx :(

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer storing it where it's needed rather than forcing it into an unrelated trait.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

into an unrelated trait

gonna have to disagree with that, but guess that's a topic for another day :)

addressed in 65bd662 & e65ace0

pass the streaming manager down to where it should be used instead

this does so for utxo balance history, tendermint to come
Copy link
Member

@borngraced borngraced left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

next iteration. I tried to focus on optimizations in this review.


/// Returns a human readable unique identifier for the event streamer.
/// No other event streamer should have the same identifier.
fn streamer_id(&self) -> String;
Copy link
Member

@borngraced borngraced Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamer_id is being cloned too often, please convert to Arc<str>

mm2src/mm2_event_stream/src/streamer.rs Outdated Show resolved Hide resolved
mm2src/mm2_event_stream/src/streamer.rs Outdated Show resolved Hide resolved
#[derive(Default)]
pub struct Event {
/// The type of the event (balance, network, swap, etc...).
event_type: String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be converted to Arc<str> ? I see it's mostly constructed via streamer_id which I also suggested to be made into an Arc as it's being cloned too often.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The event itself is Arced (since the json message inside it might be arbitrary large), so we don't really need to wrap the event_type here in an Arc.
streamer_id thought could be Arced in some places but i went for keeping it simple since it's cloned as many as the events we spit out and not as many as the events we hand to clients. i think we can have this reconsidered though.

Copy link
Member

@borngraced borngraced Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright thanks. Streamer_id def needs to be made into an Arc.. I counted the clones and they're much

mm2src/mm2_event_stream/src/event.rs Outdated Show resolved Hide resolved
mm2src/mm2_event_stream/src/manager.rs Show resolved Hide resolved
Comment on lines +269 to +272
let mut this = self.write();
if this.clients.contains_key(&client_id) {
return Err(StreamingManagerError::ClientExists);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above.

Comment on lines +318 to +322
let mut this = self.write();
let Some(streamer_info) = this.streamers.get(streamer_id) else { return };
if !streamer_info.is_down() {
return;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

mm2src/mm2_event_stream/src/manager.rs Outdated Show resolved Hide resolved
#[derive(Default)]
pub struct Event {
/// The type of the event (balance, network, swap, etc...).
event_type: String,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

event_id is a String although it has a structure like "TASK:{task_id}" or "FEE_ESTIMATION:{ticker}".
But all event id constructions are scattered over the code - so it's easy to overlook which events we support,
also the format is not checked - easy to make a mistake.
Maybe we could make event_id as a enum with the deser feature? It would be easier to maintain all event and document them for the GUI

@dimxy
Copy link
Collaborator

dimxy commented Nov 22, 2024

I tried to use this PR and it appears to work okay. This is great work.
I am about to approve it, to move forward and let GUI play with it.
There are few notes about I'd like to see resolved before (mandatory HEARBEATs)

I'd like to leave a note though, how IMO the event streaming could be simplified:

  • there are several dedicate rpcs for each event type to enable it - these rpcs could be converted to single or even eliminated,
  • GUI needs to generate unique client-id to receive events, this is not GUI friendly and need to sync between several GUI processes - client-id could be KDF created or even eliminated,
  • task manager rpcs become too heavily integrated with event streaming: GUI needs to send client-id in each task init rpc and the task manager tackles it - I think client-id could be eliminated from the task API.

I think , these problems come from use of one-directional Server-Side Events (SSE) protocol. SSE is a web protocol but we use it for native configuration.
We have an example of event delivering as eth_subscribe call. It works over web sockets which, as two-directional, can send both control and data over same connection (no need for dedicated rpcs and client-ids).
I think we could do this similarly, to simplify rpcs and internal architecture:
GUI opens a ws connection and sends a json to subscribe to events (using known names like 'TASK', 'BALANCE' etc.).
We don't need client-id anymore: API already knowns which connection expects which events.
In any time client (GUI) can subscribe to new events or unsubscribe from others.
We need a streaming manager serving as an event bus, which:

  • receives events from all event sources in code (task manager, polling loops)
  • maintains client subscriptions: when a client subscribes on some events the streaming manager keeps those events in a list for this client and filters and sends only the subscribed events to the client (over the same channel that used now).

@shamardy shamardy added 2.3.0-beta and removed P2 labels Nov 25, 2024
@shamardy shamardy requested a review from dimxy November 25, 2024 10:49
@laruh
Copy link
Member

laruh commented Nov 26, 2024

@mariocynicys could you fix conflicts please

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

return an error if transaction history futures initialization fails
6 participants