Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introductory PR for lots of changes #31

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
changing get_service_nodes to get_service_nodes_health adding the cat…
…alog/service nodes list as the default, also adding get single NodeFull by service and name
  • Loading branch information
rrichardson committed Jun 26, 2023
commit 2704704fd9c5df6d88a59831fc9e01441cd030e2
121 changes: 106 additions & 15 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -155,6 +155,7 @@ const GET_LOCK_METHOD_NAME: &str = "get_lock";
const REGISTER_ENTITY_METHOD_NAME: &str = "register_entity";
const DEREGISTER_ENTITY_METHOD_NAME: &str = "deregister_entity";
const GET_ALL_REGISTERED_SERVICE_NAMES_METHOD_NAME: &str = "get_all_registered_service_names";
const GET_SERVICE_NODES_HEALTH_METHOD_NAME: &str = "get_service_nodes_health";
const GET_SERVICE_NODES_METHOD_NAME: &str = "get_service_nodes";
const CREATE_SESSION_METHOD_NAME: &str = "create_session";
const GET_DATACENTERS: &str = "get_datacenters";
@@ -736,9 +737,51 @@ impl Consul {
.uri(uri.clone())
}

/// returns the Node by a given service and name
///
/// If the node is not present, returns none.
///
/// This uses the catalog#list-nodes function in the Consul API
/// then supplies a filter by the Node name
pub async fn get_node_by_name_and_service(
&self,
service: &str,
node_name: &str,
query_opts: Option<QueryOptions>,
) -> Result<Option<ResponseMeta<NodeFull>>> {
let query_opts = query_opts.unwrap_or_default();
let filter = format!("Node == {node_name}");
let request = GetServiceNodesRequest {
service,
near: None,
passing: false,
filter: Some(filter.as_str()),
};
let req = self.build_get_service_nodes_catalog_req(request, &query_opts);
let (mut response_body, index) = self
.execute_request(
req,
hyper::Body::empty(),
query_opts.timeout,
GET_SERVICE_NODES_METHOD_NAME,
)
.await?;
let bytes = response_body.copy_to_bytes(response_body.remaining());
let mut response = serde_json::from_slice::<Vec<NodeFull>>(&bytes)
.map_err(ConsulError::ResponseDeserializationFailed)?;
if let Some(node) = response.pop() {
Ok(Some(ResponseMeta {
response: node,
index,
}))
} else {
Ok(None)
}
}

/// returns the nodes providing the service indicated on the path.
/// Users can also build in support for dynamic load balancing and other features by incorporating the use of health checks.
/// See the [consul docs](https://www.consul.io/api-docs/health#list-nodes-for-service) for more information.
/// See the [consul docs](https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes-for-service) for more information.
/// # Arguments:
/// - request - the [GetServiceNodesRequest](consul::types::GetServiceNodesRequest)
/// # Errors:
@@ -747,9 +790,9 @@ impl Consul {
&self,
request: GetServiceNodesRequest<'_>,
query_opts: Option<QueryOptions>,
) -> Result<ResponseMeta<GetServiceNodesResponse>> {
) -> Result<ResponseMeta<Vec<NodeFull>>> {
let query_opts = query_opts.unwrap_or_default();
let req = self.build_get_service_nodes_req(request, &query_opts);
let req = self.build_get_service_nodes_catalog_req(request, &query_opts);
let (mut response_body, index) = self
.execute_request(
req,
@@ -759,6 +802,34 @@ impl Consul {
)
.await?;
let bytes = response_body.copy_to_bytes(response_body.remaining());
let response = serde_json::from_slice::<Vec<NodeFull>>(&bytes)
.map_err(ConsulError::ResponseDeserializationFailed)?;
Ok(ResponseMeta { response, index })
}

/// returns the nodes providing the service indicated on the path.
/// Users can also build in support for dynamic load balancing and other features by incorporating the use of health checks.
/// See the [consul docs](https://www.consul.io/api-docs/health#list-nodes-for-service) for more information.
/// # Arguments:
/// - request - the [GetServiceNodesRequest](consul::types::GetServiceNodesRequest)
/// # Errors:
/// [ConsulError](consul::ConsulError) describes all possible errors returned by this api.
pub async fn get_service_nodes_health(
&self,
request: GetServiceNodesRequest<'_>,
query_opts: Option<QueryOptions>,
) -> Result<ResponseMeta<GetServiceNodesResponse>> {
let query_opts = query_opts.unwrap_or_default();
let req = self.build_get_service_nodes_health_req(request, &query_opts);
let (mut response_body, index) = self
.execute_request(
req,
hyper::Body::empty(),
query_opts.timeout,
GET_SERVICE_NODES_HEALTH_METHOD_NAME,
)
.await?;
let bytes = response_body.copy_to_bytes(response_body.remaining());
let response = serde_json::from_slice::<GetServiceNodesResponse>(&bytes)
.map_err(ConsulError::ResponseDeserializationFailed)?;
Ok(ResponseMeta { response, index })
@@ -775,10 +846,9 @@ impl Consul {
passing: true,
..Default::default()
};
let services = self.get_service_nodes(request, query_opts).await.map_err(|e| {
let services = self.get_service_nodes_health(request, query_opts).await.map_err(|e| {
let err = format!(
"Unable to query consul to resolve service '{}' to a list of addresses and ports: {:?}",
service_name, e
"Unable to query consul to resolve service '{service_name}' to a list of addresses and ports: {e:?}"
);
error!("{}", err);
ConsulError::ServiceInstanceResolutionFailed(service_name.to_string())
@@ -881,7 +951,28 @@ impl Consul {
serde_json::from_slice(&bytes).map_err(ConsulError::ResponseDeserializationFailed)
}

fn build_get_service_nodes_req(
fn build_get_service_nodes_catalog_req(
&self,
request: GetServiceNodesRequest<'_>,
query_opts: &QueryOptions,
) -> http::request::Builder {
let req = hyper::Request::builder().method(Method::GET);
let mut url = String::new();
url.push_str(&format!(
"{}/v1/catalog/service/{}",
self.config.address, request.service
));
if request.passing {
url.push_str(&format!("?passing={}", request.passing));
}
if let Some(filter) = request.filter {
url.push_str(&format!("&filter={filter}"));
}
add_query_option_params(&mut url, query_opts, '&');
req.uri(url)
}

fn build_get_service_nodes_health_req(
&self,
request: GetServiceNodesRequest<'_>,
query_opts: &QueryOptions,
@@ -896,10 +987,10 @@ impl Consul {
url.push_str(&format!("?passing={}", request.passing));
}
if let Some(near) = request.near {
url.push_str(&format!("&near={}", near));
url.push_str(&format!("&near={near}"));
}
if let Some(filter) = request.filter {
url.push_str(&format!("&filter={}", filter));
url.push_str(&format!("&filter={filter}"));
}
add_query_option_params(&mut url, query_opts, '&');
req.uri(url)
@@ -985,7 +1076,7 @@ impl Consul {
fn build_create_txn_url(&self, datacenter: Option<&str>) -> String {
let mut url = format!("{}/v1/txn", self.config.address);
if let Some(dc) = datacenter {
url.push_str(&format!("?datacenter={}", dc));
url.push_str(&format!("?datacenter={dc}"));
}
url
}
@@ -1010,7 +1101,7 @@ impl Consul {
}
if let Some(cas_idx) = request.check_and_set {
url = add_query_param_separator(url, added_query_param);
url.push_str(&format!("cas={}", cas_idx));
url.push_str(&format!("cas={cas_idx}"));
}

add_namespace_and_datacenter(url, request.namespace, request.datacenter)
@@ -1020,18 +1111,18 @@ impl Consul {
fn add_query_option_params(uri: &mut String, query_opts: &QueryOptions, mut separator: char) {
if let Some(ns) = &query_opts.namespace {
if !ns.is_empty() {
uri.push_str(&format!("{}ns={}", separator, ns));
uri.push_str(&format!("{separator}ns={ns}"));
separator = '&';
}
}
if let Some(dc) = &query_opts.datacenter {
if !dc.is_empty() {
uri.push_str(&format!("{}dc={}", separator, dc));
uri.push_str(&format!("{separator}dc={dc}"));
separator = '&';
}
}
if let Some(idx) = query_opts.index {
uri.push_str(&format!("{}index={}", separator, idx));
uri.push_str(&format!("{separator}index={idx}"));
separator = '&';
if let Some(wait) = query_opts.wait {
uri.push_str(&format!(
@@ -1168,7 +1259,7 @@ mod tests {

for sn in list_response.response.iter() {
let dereg_request = DeregisterEntityRequest {
node: "local".into(),
node: "local",
service_id: Some(sn.service.id.as_str()),
..Default::default()
};
25 changes: 25 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
@@ -507,6 +507,7 @@ pub struct ServiceNode {
#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "PascalCase")]
/// The node information of an instance providing a Consul service.
/// provided by the Consul Health API
pub struct Node {
/// The ID of the service node.
#[serde(rename = "ID")]
@@ -525,6 +526,30 @@ pub struct Node {
pub meta: HashMap<String, String>,
}

#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "PascalCase")]
/// The node information as returned by the Consul Catalog API
pub struct NodeFull {
id: String,
node: String,
address: String,
datacenter: String,
tagged_addresses: HashMap<String, String>,
node_meta: HashMap<String, String>,
create_index: u64,
modify_index: u64,
service_address: Option<String>,
service_enable_tag_override: Option<bool>,
#[serde(rename = "Service_ID")]
service_id: Option<String>,
service_name: Option<String>,
service_port: Option<u16>,
service_meta: HashMap<String, String>,
service_tagged_addresses: HashMap<String, String>,
service_tags: Vec<String>,
namespace: Option<String>,
}

#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "PascalCase")]
/// The service information of an instance providing a Consul service.