From cbd8931514f736e2cf39b7e9f9e239ef389d3dfb Mon Sep 17 00:00:00 2001 From: keroro Date: Fri, 24 Dec 2021 17:39:38 +0800 Subject: [PATCH] refactor: abstract redundant utils --- .gitignore | 1 + Cargo.lock | 1 + Cargo.toml | 1 + src/topic/compact_block_crawler.rs | 91 +++++------------------------ src/topic/network_crawler.rs | 94 ++++++------------------------ src/util/bootnodes.rs | 26 +++++++++ src/util/ipinfo.rs | 27 +++++++++ src/util/mod.rs | 3 + src/util/multiaddr.rs | 19 ++++++ 9 files changed, 110 insertions(+), 153 deletions(-) create mode 100644 src/util/bootnodes.rs create mode 100644 src/util/ipinfo.rs create mode 100644 src/util/multiaddr.rs diff --git a/.gitignore b/.gitignore index 1ce1891..2879900 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ /docker/postgresql/var/ /docker/grafana/var/ *.log +.DS_Store diff --git a/Cargo.lock b/Cargo.lock index 7ea6d47..ab8d6fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -321,6 +321,7 @@ dependencies = [ "dotenv", "futures 0.3.17", "ipinfo", + "lazy_static", "log", "lru", "rand 0.8.4", diff --git a/Cargo.toml b/Cargo.toml index 8ebbc31..45473c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,3 +28,4 @@ tokio-util = { version = "0.6", features = ["codec"] } regex = "1.5.4" dotenv = "0.15.0" lru = "0.6" +lazy_static = "1.4" diff --git a/src/topic/compact_block_crawler.rs b/src/topic/compact_block_crawler.rs index 0ca8189..8c5f4e7 100644 --- a/src/topic/compact_block_crawler.rs +++ b/src/topic/compact_block_crawler.rs @@ -1,3 +1,4 @@ +use crate::util::{bootnodes::bootnodes, ipinfo::lookup_ipinfo, multiaddr::addr_to_ip}; use ckb_testkit::{ ckb_types::{packed, prelude::*}, compress, @@ -5,7 +6,6 @@ use ckb_testkit::{ connector::SharedState, decompress, Node, SupportProtocols, }; -use ipinfo::IpInfo; use lru::LruCache; use p2p::{ builder::MetaBuilder as P2PMetaBuilder, @@ -13,7 +13,6 @@ use p2p::{ context::ProtocolContext as P2PProtocolContext, context::ProtocolContextMutRef as P2PProtocolContextMutRef, context::ServiceContext as P2PServiceContext, - multiaddr, multiaddr::Multiaddr, service::ProtocolHandle as P2PProtocolHandle, service::ProtocolMeta as P2PProtocolMeta, @@ -22,7 +21,6 @@ use p2p::{ service::TargetProtocol as P2PTargetProtocol, traits::ServiceHandle as P2PServiceHandle, traits::ServiceProtocol as P2PServiceProtocol, - utils::multiaddr_to_socketaddr, }; use rand::{thread_rng, Rng}; use std::collections::HashSet; @@ -31,6 +29,8 @@ use std::sync::{Arc, RwLock}; use std::time::Duration; use tokio_util::codec::{length_delimited::LengthDelimitedCodec, Decoder, Encoder}; +type Ip = String; + const DIAL_ONLINE_ADDRESSES_INTERVAL: Duration = Duration::from_secs(1); const DIAL_ONLINE_ADDRESSES_TOKEN: u64 = 1; @@ -53,8 +53,6 @@ pub struct CompactBlockCrawler { compact_blocks: Option>, known_ips: HashSet, - - ipinfo: IpInfo, } impl Clone for CompactBlockCrawler { @@ -67,7 +65,6 @@ impl Clone for CompactBlockCrawler { client_version: self.client_version.clone(), compact_blocks: None, known_ips: HashSet::new(), - ipinfo: create_ipinfo(), } } } @@ -89,7 +86,6 @@ impl CompactBlockCrawler { client_version, compact_blocks: Default::default(), known_ips: Default::default(), - ipinfo: create_ipinfo(), } } @@ -254,15 +250,15 @@ impl CompactBlockCrawler { return; } - if let Ok(info_map) = self.ipinfo.lookup(&[ip]) { - let ipinfo::IpDetails { - ip, - country, - city, - region, - company, - .. - } = info_map[ip].to_owned(); + if let Ok(ipinfo::IpDetails { + ip, + country, + city, + region, + company, + .. + }) = lookup_ipinfo(ip) + { let entry = crate::entry::IpInfo { network: self.node.consensus().id.clone(), ip, @@ -403,7 +399,9 @@ impl P2PServiceProtocol for CompactBlockCrawler { fn received(&mut self, context: P2PProtocolContextMutRef, data: Bytes) { if context.proto_id == SupportProtocols::Discovery.protocol_id() { self.received_discovery(context, data) - } else if context.proto_id() == SupportProtocols::Relay.protocol_id() || context.proto_id() == SupportProtocols::RelayV2.protocol_id() { + } else if context.proto_id() == SupportProtocols::Relay.protocol_id() + || context.proto_id() == SupportProtocols::RelayV2.protocol_id() + { self.received_relay(context, data) } } @@ -451,62 +449,3 @@ impl P2PServiceHandle for CompactBlockCrawler { } } } - -#[allow(clippy::mutable_key_type)] -fn bootnodes(node: &Node) -> HashSet { - let local_node_info = node.rpc_client().local_node_info(); - if !local_node_info.addresses.is_empty() { - return local_node_info - .addresses - .into_iter() - .map(|addr| addr.address.parse().unwrap()) - .collect(); - } - - let bootnode = match node.consensus().id.as_str() { - "ckb" => "/ip4/47.110.15.57/tcp/8114/p2p/QmXS4Kbc9HEeykHUTJCm2tNmqghbvWyYpUp6BtE5b6VrAU", - "ckb_testnet" => { - "/ip4/47.111.169.36/tcp/8111/p2p/QmNQ4jky6uVqLDrPU7snqxARuNGWNLgSrTnssbRuy3ij2W" - } - _ => unreachable!(), - }; - let mut bootnodes = HashSet::new(); - bootnodes.insert(bootnode.parse().unwrap()); - bootnodes -} - -type Ip = String; - -fn addr_to_ip(addr: &Multiaddr) -> Ip { - addr.iter() - .find_map(|protocol| match protocol { - multiaddr::Protocol::Ip4(ip4) => Some(ip4.to_string()), - multiaddr::Protocol::Ip6(ip6) => ip6 - .to_ipv4() - .map(|ip4| ip4.to_string()) - .or_else(|| Some(ip6.to_string())), - multiaddr::Protocol::Dns4(dns4) => Some(dns4.to_string()), - multiaddr::Protocol::Dns6(dns6) => Some(dns6.to_string()), - _ => None, - }) - .unwrap_or_else(|| { - let socket_addr = multiaddr_to_socketaddr(&addr).unwrap(); - socket_addr.ip().to_string() - }) -} - -fn create_ipinfo() -> ipinfo::IpInfo { - let ipinfo_io_token = match ::std::env::var("IPINFO_IO_TOKEN") { - Ok(token) if !token.is_empty() => Some(token), - _ => { - log::warn!("miss environment variable \"IPINFO_IO_TOKEN\", use empty value"); - None - } - }; - ipinfo::IpInfo::new(ipinfo::IpInfoConfig { - token: ipinfo_io_token, - cache_size: 10000, - timeout: ::std::time::Duration::from_secs(365 * 24 * 60 * 60), - }) - .expect("connect to https://ipinfo.io") -} diff --git a/src/topic/network_crawler.rs b/src/topic/network_crawler.rs index a82c32d..0d18e1c 100644 --- a/src/topic/network_crawler.rs +++ b/src/topic/network_crawler.rs @@ -1,3 +1,5 @@ +use crate::util::{bootnodes::bootnodes, ipinfo::lookup_ipinfo, multiaddr::addr_to_ip}; +use ckb_testkit::connector::message::build_discovery_get_nodes; use ckb_testkit::{ ckb_types::{packed, prelude::*}, compress, @@ -11,7 +13,6 @@ use p2p::{ context::ProtocolContextMutRef as P2PProtocolContextMutRef, context::ServiceContext as P2PServiceContext, context::SessionContext, - multiaddr, multiaddr::Multiaddr, service::ProtocolHandle as P2PProtocolHandle, service::ProtocolMeta as P2PProtocolMeta, @@ -20,7 +21,6 @@ use p2p::{ service::TargetProtocol as P2PTargetProtocol, traits::ServiceHandle as P2PServiceHandle, traits::ServiceProtocol as P2PServiceProtocol, - utils::multiaddr_to_socketaddr, }; use rand::{thread_rng, Rng}; use std::collections::{HashMap, HashSet}; @@ -63,14 +63,11 @@ pub struct NetworkCrawler { // #{ ip => peer_info } online: Arc>>, - // IpInfo instance - ipinfo: ipinfo::IpInfo, - // already known iP known_ips: HashSet, } -pub type Ip = String; +type Ip = String; #[derive(Debug, Clone)] pub struct PeerInfo { @@ -88,7 +85,6 @@ impl Clone for NetworkCrawler { shared: Arc::clone(&self.shared), observed_addresses: Arc::clone(&self.observed_addresses), online: Arc::clone(&self.online), - ipinfo: create_ipinfo(), known_ips: self.known_ips.clone(), } } @@ -101,19 +97,7 @@ impl NetworkCrawler { query_sender: crossbeam::channel::Sender, shared: Arc>, ) -> Self { - let ipinfo = create_ipinfo(); - let bootnode = match node.consensus().id.as_str() { - "ckb" => { - "/ip4/47.110.15.57/tcp/8114/p2p/QmXS4Kbc9HEeykHUTJCm2tNmqghbvWyYpUp6BtE5b6VrAU" - } - "ckb_testnet" => { - "/ip4/47.111.169.36/tcp/8111/p2p/QmNQ4jky6uVqLDrPU7snqxARuNGWNLgSrTnssbRuy3ij2W" - } - _ => unreachable!(), - }; - #[allow(clippy::mutable_key_type)] - let mut bootnodes = HashSet::new(); - bootnodes.insert(bootnode.parse().unwrap()); + let bootnodes = bootnodes(&node); Self { node, query_sender, @@ -135,7 +119,6 @@ impl NetworkCrawler { }) .collect(), )), - ipinfo, known_ips: Default::default(), } } @@ -286,18 +269,7 @@ impl NetworkCrawler { } fn connected_discovery(&mut self, context: P2PProtocolContextMutRef, protocol_version: &str) { - let discovery_get_node_message = packed::DiscoveryMessage::new_builder() - .payload( - packed::DiscoveryPayload::new_builder() - .set( - packed::GetNodes::new_builder() - .count(1000u32.pack()) - .version(1u32.pack()) - .build(), - ) - .build(), - ) - .build(); + let discovery_get_node_message = build_discovery_get_nodes(None, 1000u32, 1u32); if protocol_version == "0.0.1" { let mut codec = LengthDelimitedCodec::new(); let mut bytes = BytesMut::new(); @@ -426,24 +398,26 @@ impl P2PServiceProtocol for NetworkCrawler { } } - for entry in entries { + for entry in entries.iter() { let raw_query = format!( "INSERT INTO {}.peer(time, version, ip, n_reachable) \ VALUES ('{}', '{}', '{}', {})", entry.network, entry.time, entry.version, entry.ip, entry.n_reachable, ); self.query_sender.send(raw_query).unwrap(); + } + for entry in entries { if !self.known_ips.contains(&entry.ip) { - if let Ok(info_map) = self.ipinfo.lookup(&[entry.ip.as_str()]) { - let ipinfo::IpDetails { - ip, - country, - city, - region, - company, - .. - } = info_map[&entry.ip].to_owned(); + if let Ok(ipinfo::IpDetails { + ip, + country, + city, + region, + company, + .. + }) = lookup_ipinfo(&entry.ip) + { let entry = crate::entry::IpInfo { network: entry.network, ip, @@ -563,37 +537,3 @@ impl P2PServiceHandle for NetworkCrawler { } } } - -fn addr_to_ip(addr: &Multiaddr) -> Ip { - addr.iter() - .find_map(|protocol| match protocol { - multiaddr::Protocol::Ip4(ip4) => Some(ip4.to_string()), - multiaddr::Protocol::Ip6(ip6) => ip6 - .to_ipv4() - .map(|ip4| ip4.to_string()) - .or_else(|| Some(ip6.to_string())), - multiaddr::Protocol::Dns4(dns4) => Some(dns4.to_string()), - multiaddr::Protocol::Dns6(dns6) => Some(dns6.to_string()), - _ => None, - }) - .unwrap_or_else(|| { - let socket_addr = multiaddr_to_socketaddr(&addr).unwrap(); - socket_addr.ip().to_string() - }) -} - -fn create_ipinfo() -> ipinfo::IpInfo { - let ipinfo_io_token = match ::std::env::var("IPINFO_IO_TOKEN") { - Ok(token) if !token.is_empty() => Some(token), - _ => { - log::warn!("miss environment variable \"IPINFO_IO_TOKEN\", use empty value"); - None - } - }; - ipinfo::IpInfo::new(ipinfo::IpInfoConfig { - token: ipinfo_io_token, - cache_size: 10000, - timeout: ::std::time::Duration::from_secs(365 * 24 * 60 * 60), - }) - .expect("connect to https://ipinfo.io") -} diff --git a/src/util/bootnodes.rs b/src/util/bootnodes.rs new file mode 100644 index 0000000..9309773 --- /dev/null +++ b/src/util/bootnodes.rs @@ -0,0 +1,26 @@ +use ckb_testkit::Node; +use p2p::multiaddr::Multiaddr; +use std::collections::HashSet; + +#[allow(clippy::mutable_key_type)] +pub fn bootnodes(node: &Node) -> HashSet { + let local_node_info = node.rpc_client().local_node_info(); + if !local_node_info.addresses.is_empty() { + return local_node_info + .addresses + .into_iter() + .map(|addr| addr.address.parse().unwrap()) + .collect(); + } + + let bootnode = match node.consensus().id.as_str() { + "ckb" => "/ip4/47.110.15.57/tcp/8114/p2p/QmXS4Kbc9HEeykHUTJCm2tNmqghbvWyYpUp6BtE5b6VrAU", + "ckb_testnet" => { + "/ip4/47.111.169.36/tcp/8111/p2p/QmNQ4jky6uVqLDrPU7snqxARuNGWNLgSrTnssbRuy3ij2W" + } + _ => unreachable!(), + }; + let mut bootnodes = HashSet::new(); + bootnodes.insert(bootnode.parse().unwrap()); + bootnodes +} diff --git a/src/util/ipinfo.rs b/src/util/ipinfo.rs new file mode 100644 index 0000000..9a2638e --- /dev/null +++ b/src/util/ipinfo.rs @@ -0,0 +1,27 @@ +use ipinfo::{IpDetails, IpError, IpInfo}; +use lazy_static::lazy_static; +use std::sync::RwLock; + +lazy_static! { + static ref IPINFO: RwLock = { + let ipinfo_io_token = match ::std::env::var("IPINFO_IO_TOKEN") { + Ok(token) if !token.is_empty() => Some(token), + _ => { + log::warn!("miss environment variable \"IPINFO_IO_TOKEN\", use empty value"); + None + } + }; + let ipinfo = ipinfo::IpInfo::new(ipinfo::IpInfoConfig { + token: ipinfo_io_token, + cache_size: 10000, + timeout: ::std::time::Duration::from_secs(365 * 24 * 60 * 60), + }) + .expect("connect to https://ipinfo.io"); + RwLock::new(ipinfo) + }; +} + +pub fn lookup_ipinfo(ip: &str) -> Result { + let infos = IPINFO.write().unwrap().lookup(&[ip])?; + Ok(infos[ip].to_owned()) +} diff --git a/src/util/mod.rs b/src/util/mod.rs index acdfe75..41c3622 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1 +1,4 @@ +pub mod bootnodes; pub mod crossbeam_channel_to_tokio_channel; +pub mod ipinfo; +pub mod multiaddr; diff --git a/src/util/multiaddr.rs b/src/util/multiaddr.rs new file mode 100644 index 0000000..0911854 --- /dev/null +++ b/src/util/multiaddr.rs @@ -0,0 +1,19 @@ +use p2p::{multiaddr, utils::multiaddr_to_socketaddr}; + +pub fn addr_to_ip(addr: &multiaddr::Multiaddr) -> String { + addr.iter() + .find_map(|protocol| match protocol { + multiaddr::Protocol::Ip4(ip4) => Some(ip4.to_string()), + multiaddr::Protocol::Ip6(ip6) => ip6 + .to_ipv4() + .map(|ip4| ip4.to_string()) + .or_else(|| Some(ip6.to_string())), + multiaddr::Protocol::Dns4(dns4) => Some(dns4.to_string()), + multiaddr::Protocol::Dns6(dns6) => Some(dns6.to_string()), + _ => None, + }) + .unwrap_or_else(|| { + let socket_addr = multiaddr_to_socketaddr(&addr).unwrap(); + socket_addr.ip().to_string() + }) +}