Skip to content

Commit

Permalink
Merge pull request #37 from peaqnetwork/feature/1202138782029381_disc…
Browse files Browse the repository at this point in the history
…onnect-p2p-connection

Feature/1202138782029381 disconnect p2p connection
  • Loading branch information
irediaes authored Apr 20, 2022
2 parents 869eb38 + d9ac697 commit 11bba48
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 40 deletions.
Binary file modified android/app/src/main/jniLibs/arm64-v8a/libpeaq_codec_api.so
Binary file not shown.
Binary file modified android/app/src/main/jniLibs/armeabi-v7a/libpeaq_codec_api.so
Binary file not shown.
Binary file modified android/app/src/main/jniLibs/x86_64/libpeaq_codec_api.so
Binary file not shown.
2 changes: 2 additions & 0 deletions lib/common/providers/charge_provider.dart
Original file line number Diff line number Diff line change
Expand Up @@ -302,5 +302,7 @@ class CEVChargeProvider with ChangeNotifier {

_chargingStatus = LoadingStatus.success;
setStatus(LoadingStatus.idle, message: Env.transactionCompleted);

appProvider.peerProvider.disconnectP2P();
}
}
15 changes: 13 additions & 2 deletions lib/common/providers/peer_provider.dart
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ late final dylib =

late final api = PeaqCodecApiImpl(dylib);

void runPeriodically(void Function() callback) =>
Timer runPeriodically(void Function() callback) =>
Timer.periodic(const Duration(milliseconds: 1000), (timer) => callback());

class CEVPeerProvider with ChangeNotifier {
Expand All @@ -53,10 +53,12 @@ class CEVPeerProvider with ChangeNotifier {
LoadingStatus _status = LoadingStatus.idle;
String _error = '';
String _statusMessage = '';
String _peerId = '';
double _chargeProgress = 0;
bool _isLoggedIn = false;
bool _showNodeDropdown = false;
List<Detail> _details = [];
Timer? _runningLoop;

String _identityChallengeData = '';
String _p2pURL = '';
Expand Down Expand Up @@ -87,8 +89,15 @@ class CEVPeerProvider with ChangeNotifier {
.setStatus(LoadingStatus.error, message: Env.invalidP2PUrl);
}

_peerId = splitURL.last;

api.connectP2P(url: _p2pURL);
runPeriodically(getEvent);
_runningLoop = runPeriodically(getEvent);
}

Future<void> disconnectP2P() async {
await api.disconnectP2P(peerId: _peerId);
_runningLoop!.cancel();
}

Future<void> getEvent() async {
Expand Down Expand Up @@ -127,6 +136,7 @@ class CEVPeerProvider with ChangeNotifier {
if (!err) {
_processServiceRequestedAckEvent();
} else {
disconnectP2P();
appProvider.chargeProvider.setStatus(LoadingStatus.error,
message: Env.providerRejectService +
": " +
Expand All @@ -143,6 +153,7 @@ class CEVPeerProvider with ChangeNotifier {
}
case msg.EventType.PEER_CONNECTION_FAILED:
{
disconnectP2P();
_isPeerConnected = false;
appProvider.chargeProvider.setStatus(LoadingStatus.error,
message: Env.unableToConnectToPeer);
Expand Down
32 changes: 32 additions & 0 deletions lib/common/services/fr_bridge/bridge_generated.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ abstract class PeaqCodecApi {

Future<void> connectP2P({required String url, dynamic hint});

Future<void> disconnectP2P({required String peerId, dynamic hint});

Future<Uint8List> sendIdentityChallengeEvent({dynamic hint});

Future<Uint8List> sendStopChargeEvent({dynamic hint});
Expand Down Expand Up @@ -101,6 +103,19 @@ class PeaqCodecApiImpl extends FlutterRustBridgeBase<PeaqCodecApiWire>
hint: hint,
));

Future<void> disconnectP2P({required String peerId, dynamic hint}) =>
executeNormal(FlutterRustBridgeTask(
callFfi: (port_) =>
inner.wire_disconnect_p2p(port_, _api2wire_String(peerId)),
parseSuccessData: _wire2api_unit,
constMeta: const FlutterRustBridgeTaskConstMeta(
debugName: "disconnect_p2p",
argNames: ["peerId"],
),
argValues: [peerId],
hint: hint,
));

Future<Uint8List> sendIdentityChallengeEvent({dynamic hint}) =>
executeNormal(FlutterRustBridgeTask(
callFfi: (port_) => inner.wire_send_identity_challenge_event(port_),
Expand Down Expand Up @@ -412,6 +427,23 @@ class PeaqCodecApiWire implements FlutterRustBridgeWireBase {
late final _wire_connect_p2p = _wire_connect_p2pPtr
.asFunction<void Function(int, ffi.Pointer<wire_uint_8_list>)>();

void wire_disconnect_p2p(
int port_,
ffi.Pointer<wire_uint_8_list> peer_id,
) {
return _wire_disconnect_p2p(
port_,
peer_id,
);
}

late final _wire_disconnect_p2pPtr = _lookup<
ffi.NativeFunction<
ffi.Void Function(ffi.Int64,
ffi.Pointer<wire_uint_8_list>)>>('wire_disconnect_p2p');
late final _wire_disconnect_p2p = _wire_disconnect_p2pPtr
.asFunction<void Function(int, ffi.Pointer<wire_uint_8_list>)>();

void wire_send_identity_challenge_event(
int port_,
) {
Expand Down
7 changes: 7 additions & 0 deletions rust/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ pub fn connect_p2p(url: String) -> Result<()> {
Ok(())
}

// unsubscribe and disconnect from a p2p peer connection
pub fn disconnect_p2p(peer_id: String) -> Result<()> {
trace!("\n\n P2P PEEER ID ON RUST {}", peer_id);
request::disconnect_p2p(peer_id).unwrap();
Ok(())
}

// Send Identity Challenge event to provider peer
pub fn send_identity_challenge_event() -> Result<Vec<u8>> {
let res = request::send_identity_challenge_event().unwrap();
Expand Down
15 changes: 15 additions & 0 deletions rust/src/bridge_generated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,21 @@ pub extern "C" fn wire_connect_p2p(port_: i64, url: *mut wire_uint_8_list) {
)
}

#[no_mangle]
pub extern "C" fn wire_disconnect_p2p(port_: i64, peer_id: *mut wire_uint_8_list) {
FLUTTER_RUST_BRIDGE_HANDLER.wrap(
WrapInfo {
debug_name: "disconnect_p2p",
port: Some(port_),
mode: FfiCallMode::Normal,
},
move || {
let api_peer_id = peer_id.wire2api();
move |task_callback| disconnect_p2p(api_peer_id)
},
)
}

#[no_mangle]
pub extern "C" fn wire_send_identity_challenge_event(port_: i64) {
FLUTTER_RUST_BRIDGE_HANDLER.wrap(
Expand Down
110 changes: 74 additions & 36 deletions rust/src/p2p/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ use libp2p::{
tcp::TokioTcpConfig,
yamux, Multiaddr, NetworkBehaviour, PeerId, Transport,
};
use log::trace;
use peaq_p2p_proto_message::p2p_message_format as msg;
use protobuf::Message;
use std::{collections::VecDeque, error::Error, time::Duration};
use sp_runtime::PerU16;
use std::{collections::VecDeque, error::Error, str::FromStr, time::Duration};

use once_cell::sync::Lazy;
use std::sync::Mutex;
Expand All @@ -36,7 +38,11 @@ pub(crate) static mut EVENTS: Lazy<Mutex<VecDeque<Vec<u8>>>> =

// Static GLOBAL variable of SWARM so other event publishing function can use it
// outside of this scope
pub(crate) static mut EVENT_BEHAVIOUR: Lazy<Mutex<Swarm<EventBehaviour>>> = Lazy::new(|| {
pub(crate) static mut EVENT_BEHAVIOUR: Lazy<Mutex<Option<Swarm<EventBehaviour>>>> =
Lazy::new(|| Mutex::new(None));

// use to create new instance of swarm every time a new connection is requested
fn init_swarm() -> Result<Swarm<EventBehaviour>, Box<dyn Error>> {
let gossipsub_config = GossipsubConfigBuilder::default()
.heartbeat_interval(Duration::from_secs(10)) // This is set to aid debugging by not cluttering the log space
.validation_mode(ValidationMode::Strict) // This sets the kind of message validation. The default is Strict (enforce message signing)
Expand All @@ -61,7 +67,7 @@ pub(crate) static mut EVENT_BEHAVIOUR: Lazy<Mutex<Swarm<EventBehaviour>>> = Lazy
// Create a random PeerId
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
println!("Local peer id: {:?}", local_peer_id);
trace!("Local peer id: {:?}", local_peer_id);

// Create a keypair for authenticated encryption of the transport.
let noise_keys = noise::Keypair::<noise::X25519Spec>::new()
Expand Down Expand Up @@ -98,50 +104,82 @@ pub(crate) static mut EVENT_BEHAVIOUR: Lazy<Mutex<Swarm<EventBehaviour>>> = Lazy
}))
.build()
};
Mutex::new(swarm)
});

Ok(swarm)
}

#[tokio::main]
pub async fn connect(peer_url: String) -> Result<(), Box<dyn Error>> {
let new_swarm = init_swarm().unwrap();
let swarm;
unsafe {
// set a new instance of swarm event behaviour
*EVENT_BEHAVIOUR.lock().unwrap() = Some(new_swarm);
swarm = EVENT_BEHAVIOUR.get_mut().unwrap();
}
// Listen on all interfaces and whatever port the OS assigns
swarm
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.unwrap();

// Dial another peer address if supplied
if let Some(to_dial) = Some(peer_url) {
let address: Multiaddr = to_dial.parse().expect("User to provide valid address.");
match swarm.dial(address.clone()) {
Ok(_) => {
event::add_event_to_global(msg::EventType::PEER_CONNECTED);
println!("Dialed {:?}", address);
}
Err(e) => {
event::add_event_to_global(msg::EventType::PEER_CONNECTION_FAILED);
if let Some(swm) = swarm {
// Listen on all interfaces and whatever port the OS assigns
swm.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.unwrap();

// Dial another peer address if supplied
if let Some(to_dial) = Some(peer_url) {
let address: Multiaddr = to_dial.parse().expect("User to provide valid address.");
match swm.dial(address.clone()) {
Ok(_) => {
event::add_event_to_global(msg::EventType::PEER_CONNECTED);
trace!("Dialed {:?}", address);
}
Err(e) => {
event::add_event_to_global(msg::EventType::PEER_CONNECTION_FAILED);

trace!("Dial {:?} failed: {:?}", address, e);
}
};
}

// Read full lines from stdin
loop {
tokio::select! {

// listening to swarm events
event = swm.select_next_some() => match event {
SwarmEvent::NewListenAddr { address, .. } => {
trace!("Listening on {:?}", address);
},
SwarmEvent::ConnectionClosed {..} => {
trace!("P2P Connection closed!");
break;
}
_ => {}
}

println!("Dial {:?} failed: {:?}", address, e);
}
};
}
}

// Read full lines from stdin
loop {
tokio::select! {
Ok(())
}

// listening to swarm events
event = swarm.select_next_some() => match event {
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening on {:?}", address);
}
_ => {}
}
pub fn disconnect(peer_id: String) -> Result<(), Box<dyn Error>> {
let peer_id = PeerId::from_str(&peer_id).expect("unable to parse peer_id");

let swarm;
let topic;

unsafe {
swarm = EVENT_BEHAVIOUR.get_mut().unwrap();
topic = EVENT_TOPIC.get_mut().unwrap().clone();
}
if let Some(swm) = swarm {
if let Some(top) = topic {
swm.behaviour_mut().gossip.unsubscribe(&top).unwrap();
}

swm.disconnect_peer_id(peer_id).unwrap();
}

Ok(())
}

// We create a custom network behaviour.
Expand All @@ -157,11 +195,11 @@ pub(crate) struct EventBehaviour {
impl NetworkBehaviourEventProcess<GossipsubEvent> for EventBehaviour {
// Called when `gossip` produces an event.
fn inject_event(&mut self, event: GossipsubEvent) {
println!("MSG: {:?}", event);
trace!("MSG: {:?}", event);
match event {
GossipsubEvent::Subscribed { peer_id, topic } => {
event::add_event_to_global(msg::EventType::PEER_SUBSCRIBED);
println!("Subscribed:: peer: {} topic/channel: {}", peer_id, topic)
trace!("Subscribed:: peer: {} topic/channel: {}", peer_id, topic)
}
GossipsubEvent::Message {
propagation_source: peer_id,
Expand All @@ -170,8 +208,8 @@ impl NetworkBehaviourEventProcess<GossipsubEvent> for EventBehaviour {
} => {
let ev = msg::Event::parse_from_bytes(&message.data).unwrap();

println!("\nev-parse:: {:?}\n", &ev);
println!(
trace!("\nev-parse:: {:?}\n", &ev);
trace!(
"Got message: {} with id: {} from peer: {:?}",
String::from_utf8_lossy(&message.data),
id,
Expand Down
6 changes: 4 additions & 2 deletions rust/src/p2p/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ pub fn send_event(
}
// trace!("send_identity_challenge_event topic:: {:?}", &topic);

if let Some(top) = topic {
swarm.behaviour_mut().gossip.publish(top, &*v).unwrap();
if let Some(swm) = swarm {
if let Some(top) = topic {
swm.behaviour_mut().gossip.publish(top, &*v).unwrap();
}
}

Ok(())
Expand Down
8 changes: 8 additions & 0 deletions rust/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ pub fn connect_p2p(url: String) -> Result<()> {
Ok(())
}

pub fn disconnect_p2p(peer_id: String) -> Result<()> {
trace!("\n\n disconnect_p2p RUST hitts:: p2p peer_id = {}", peer_id);

behaviour::disconnect(peer_id).expect("p2p disconnection failed");

Ok(())
}

// get events from the the global variable
pub fn get_event() -> Result<Vec<u8>> {
trace!("\n\n RUST - get_event hitts");
Expand Down

0 comments on commit 11bba48

Please sign in to comment.