diff --git a/Cargo.toml b/Cargo.toml index d3eb588..9b5985b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,10 +11,11 @@ license-file = "LICENSE" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] default = ["rustls-native"] -metrics = ["prometheus", "lazy_static"] +metrics = ["prometheus"] +default-tls = ["hyper-tls"] rustls-native = ["hyper-rustls/rustls-native-certs"] rustls-webpki = ["hyper-rustls/webpki-roots"] -trace = ["dep:opentelemetry"] +trace = ["opentelemetry"] # keep this list sorted! [dependencies] @@ -23,7 +24,8 @@ futures = "0.3" http = "0.2" hyper = { version = "0.14", features = ["full"] } hyper-rustls = { version = "0.24" } -lazy_static = { version = "1", optional = true } +hyper-tls = { version = "0.5.0", optional = true } +lazy_static = { version = "1" } opentelemetry = { version = "0.19", features = ["rt-tokio"], optional = true } prometheus = { version = "0.13", optional = true } quick-error = "2" diff --git a/docker-compose.yml b/docker-compose.yml index 4abbe1d..3cf1bed 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,7 +3,7 @@ version: "3.8" services: consul: container_name: consul - image: consul:1.11.11 + image: consul:1.15.3 command: >- consul agent diff --git a/src/lib.rs b/src/lib.rs index 46259ec..9b3fdc4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,10 +34,12 @@ use std::{env, str::Utf8Error}; use base64::Engine; use hyper::{body::Buf, client::HttpConnector, Body, Method}; #[cfg(any(feature = "rustls-native", feature = "rustls-webpki"))] -#[cfg(feature = "metrics")] +use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder}; +#[cfg(feature = "default-tls")] +use hyper_tls::HttpsConnector; use lazy_static::lazy_static; use quick_error::quick_error; -use serde::{Deserialize, Serialize}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use slog_scope::{error, info}; use tokio::time::timeout; @@ -72,6 +74,8 @@ quick_error! { /// The consul server response could not be deserialized from json. ResponseDeserializationFailed(err: serde_json::error::Error) {} /// The consul server response could not be deserialized from bytes. + RequestSerializationFailed(err: serde_json::error::Error) {} + /// The consul server response could not be deserialized from bytes. ResponseStringDeserializationFailed(err: std::str::Utf8Error) {} /// The consul server response was something other than 200. The status code and the body of the response are included. UnexpectedResponseCode(status_code: hyper::http::StatusCode, body: String) {} @@ -137,15 +141,24 @@ lazy_static! { .unwrap(); } +lazy_static! { + static ref DEFAULT_QUERY_OPTS: QueryOptions = Default::default(); +} + const READ_KEY_METHOD_NAME: &str = "read_key"; +const READ_OBJ_METHOD_NAME: &str = "read_obj"; const CREATE_OR_UPDATE_KEY_METHOD_NAME: &str = "create_or_update_key"; +const CREATE_OR_UPDATE_ALL_METHOD_NAME: &str = "create_or_update_all"; const CREATE_OR_UPDATE_KEY_SYNC_METHOD_NAME: &str = "create_or_update_key_sync"; const DELETE_KEY_METHOD_NAME: &str = "delete_key"; const GET_LOCK_METHOD_NAME: &str = "get_lock"; const REGISTER_ENTITY_METHOD_NAME: &str = "register_entity"; +const DEREGISTER_ENTITY_METHOD_NAME: &str = "deregister_entity"; const GET_ALL_REGISTERED_SERVICE_NAMES_METHOD_NAME: &str = "get_all_registered_service_names"; +const GET_SERVICE_NODES_HEALTH_METHOD_NAME: &str = "get_service_nodes_health"; const GET_SERVICE_NODES_METHOD_NAME: &str = "get_service_nodes"; -const GET_SESSION_METHOD_NAME: &str = "get_session"; +const CREATE_SESSION_METHOD_NAME: &str = "create_session"; +const GET_DATACENTERS: &str = "get_datacenters"; pub(crate) type Result = std::result::Result; @@ -178,13 +191,25 @@ impl Config { hyper_builder: Default::default(), } } + + /// Create a new config from an address and token + pub fn new(address: String, token: Option) -> Self { + Config { + address, + token, + hyper_builder: Default::default(), + } + } } /// Represents a lock against Consul. /// The lifetime of this object defines the validity of the lock against consul. /// When the object is dropped, the lock is attempted to be released for the next consumer. #[derive(Clone, Debug)] -pub struct Lock<'a> { +pub struct LockGuard<'a, T> +where + T: Default + std::fmt::Debug + Serialize + Clone, +{ /// The session ID of the lock. pub session_id: String, /// The key for the lock. @@ -196,12 +221,15 @@ pub struct Lock<'a> { /// The datacenter of this lock. pub datacenter: String, /// The data in this lock's key - pub value: Option>, + pub value: Option, /// The consul client this lock was acquired using. pub consul: &'a Consul, } -impl Drop for Lock<'_> { +impl Drop for LockGuard<'_, T> +where + T: Default + std::fmt::Debug + Serialize + Clone, +{ fn drop(&mut self) { let req = CreateOrUpdateKeyRequest { key: &self.key, @@ -223,24 +251,49 @@ impl Drop for Lock<'_> { #[derive(Debug)] /// This struct defines the consul client and allows access to the consul api via method syntax. pub struct Consul { - https_client: hyper::Client, Body>, + https_client: hyper::Client, Body>, config: Config, #[cfg(feature = "trace")] tracer: BoxedTracer, } -fn https_connector() -> hyper_rustls::HttpsConnector { - #[cfg(feature = "rustls-webpki")] - return hyper_rustls::HttpsConnectorBuilder::new() - .with_webpki_roots() +fn https_connector() -> HttpsConnector { + #[cfg(feature = "rustls-native")] + return HttpsConnectorBuilder::new() + .with_native_roots() .https_or_http() .enable_http1() .build(); - hyper_rustls::HttpsConnectorBuilder::new() - .with_native_roots() + #[cfg(feature = "rustls-webpki")] + return HttpsConnectorBuilder::new() + .with_webpki_roots() .https_or_http() .enable_http1() - .build() + .build(); + #[cfg(feature = "default-tls")] + { + let mut conn = HttpsConnector::new(); + conn.https_only(false); + return conn; + } +} + +impl Clone for Consul { + #[cfg(feature = "trace")] + fn clone(&self) -> Self { + Consul { + https_client: self.https_client.clone(), + config: self.config.clone(), + tracer: global::tracer("consul"), + } + } + #[cfg(not(feature = "trace"))] + fn clone(&self) -> Self { + Consul { + https_client: self.https_client.clone(), + config: self.config.clone(), + } + } } impl Consul { @@ -259,18 +312,22 @@ impl Consul { } } - /// Reads a key from Consul's KV store. See the [consul docs](https://www.consul.io/api-docs/kv#read-key) for more information. + /// Reads keys from Consul's KV store and returns them as `String`s + /// See the [consul docs](https://www.consul.io/api-docs/kv#read-key) for more information. /// # Arguments: /// - request - the [ReadKeyRequest](consul::types::ReadKeyRequest) /// # Errors: /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. - pub async fn read_key(&self, request: ReadKeyRequest<'_>) -> Result> { + pub async fn read_string( + &self, + request: ReadKeyRequest<'_>, + ) -> Result>> { let req = self.build_read_key_req(request); let (mut response_body, _index) = self .execute_request(req, hyper::Body::empty(), None, READ_KEY_METHOD_NAME) .await?; let bytes = response_body.copy_to_bytes(response_body.remaining()); - serde_json::from_slice::>(&bytes) + serde_json::from_slice::>>(&bytes) .map_err(ConsulError::ResponseDeserializationFailed)? .into_iter() .map(|mut r| { @@ -289,25 +346,86 @@ impl Consul { .collect() } + /// Reads keys from Consul's KV store and returns them as `Vec`s + /// See the [consul docs](https://www.consul.io/api-docs/kv#read-key) for more information. + /// # Arguments: + /// - request - the [ReadKeyRequest](consul::types::ReadKeyRequest) + /// # Errors: + /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. + pub async fn read_vec(&self, request: ReadKeyRequest<'_>) -> Result> { + let req = self.build_read_key_req(request); + let (mut response_body, _index) = self + .execute_request(req, hyper::Body::empty(), None, READ_KEY_METHOD_NAME) + .await?; + let bytes = response_body.copy_to_bytes(response_body.remaining()); + serde_json::from_slice::>(&bytes) + .map_err(ConsulError::ResponseDeserializationFailed) + } + + /// Reads keys from Consul's KV store and attempts to deserialize them into + /// type T from JSON. + /// See the [consul docs](https://www.consul.io/api-docs/kv#read-key) for more information. + /// # Arguments: + /// - request - the [ReadKeyRequest](consul::types::ReadKeyRequest) + /// # Errors: + /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. + pub async fn read_key(&self, request: ReadKeyRequest<'_>) -> Result>> + where + T: DeserializeOwned + Default + std::fmt::Debug, + { + let req = self.build_read_key_req(request); + let (mut response_body, _index) = self + .execute_request(req, hyper::Body::empty(), None, READ_OBJ_METHOD_NAME) + .await?; + let bytes = response_body.copy_to_bytes(response_body.remaining()); + serde_json::from_slice::>>(&bytes) + .map_err(ConsulError::ResponseDeserializationFailed)? + .into_iter() + .map(|r| { + let v = match r.value { + Some(ref val) => Some( + serde_json::from_slice::(&val.0) + .map_err(ConsulError::ResponseDeserializationFailed)?, + ), + None => None, + }; + Ok(ReadKeyResponse { + value: v, + create_index: r.create_index, + modify_index: r.modify_index, + lock_index: r.lock_index, + key: r.key, + flags: r.flags, + session: r.session, + }) + }) + .collect::>>>() + } + /// Creates or updates a key in Consul's KV store. See the [consul docs](https://www.consul.io/api-docs/kv#create-update-key) for more information. /// # Arguments: /// - request - the [CreateOrUpdateKeyRequest](consul::types::CreateOrUpdateKeyRequest) - /// - value - the data to store as [Bytes](bytes::Bytes) + /// - value - the data to store, must implement `Serialize` and `Default` /// # Returns: /// A tuple of a boolean and a 64 bit unsigned integer representing whether the operation was successful and the index for a subsequent blocking query. /// # Errors: /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. - pub async fn create_or_update_key( + pub async fn create_or_update_key( &self, request: CreateOrUpdateKeyRequest<'_>, - value: Vec, - ) -> Result<(bool, u64)> { + value: &T, + ) -> Result<(bool, u64)> + where + T: Default + Serialize + std::fmt::Debug, + { let url = self.build_create_or_update_url(request); let req = hyper::Request::builder().method(Method::PUT).uri(url); + let req_bytes = + serde_json::to_vec(value).map_err(ConsulError::RequestSerializationFailed)?; let (mut response_body, index) = self .execute_request( req, - Body::from(value), + Body::from(req_bytes), None, CREATE_OR_UPDATE_KEY_METHOD_NAME, ) @@ -319,6 +437,36 @@ impl Consul { )) } + /// Executes a transaction in Consul's KV store. + /// This takes a vector of operations, and only succeeds if all operations within the Vec of operations + /// See https://developer.hashicorp.com/consul/api-docs/txn for more information + pub async fn create_or_update_all( + &self, + request: Vec>, + datacenter: Option<&str>, + ) -> Result> { + let url = self.build_create_txn_url(datacenter); + let req = hyper::Request::builder().method(Method::PUT).uri(url); + let txn_request: Vec> = request + .into_iter() + .map(|r| HashMap::from([("KV", r)])) + .collect(); + let data = + serde_json::to_vec(&txn_request).map_err(ConsulError::RequestSerializationFailed)?; + let (mut response_body, _index) = self + .execute_request( + req, + Body::from(data), + None, + CREATE_OR_UPDATE_ALL_METHOD_NAME, + ) + .await?; + let bytes = response_body.copy_to_bytes(response_body.remaining()); + let resp = serde_json::from_slice::>(&bytes) + .map_err(ConsulError::ResponseDeserializationFailed)?; + Ok(resp) + } + /// Creates or updates a key in Consul's KV store. See the [consul docs](https://www.consul.io/api-docs/kv#create-update-key) for more information. /// This is the synchronous version of create_or_update_key /// # Arguments: @@ -328,23 +476,28 @@ impl Consul { /// A tuple of a boolean and a 64 bit unsigned integer representing whether the operation was successful and the index for a subsequent blocking query. /// # Errors: /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. - pub fn create_or_update_key_sync( + pub fn create_or_update_key_sync( &self, request: CreateOrUpdateKeyRequest<'_>, - value: Vec, - ) -> Result { + value: T, + ) -> Result + where + T: Default + Serialize + std::fmt::Debug, + { // TODO: Emit OpenTelemetry span for this request let url = self.build_create_or_update_url(request); record_request_metric_if_enabled(&Method::PUT, CREATE_OR_UPDATE_KEY_SYNC_METHOD_NAME); + let req_bytes = + serde_json::to_vec(&value).map_err(ConsulError::RequestSerializationFailed)?; let step_start_instant = Instant::now(); let result = ureq::put(&url) .set( "X-Consul-Token", &self.config.token.clone().unwrap_or_default(), ) - .send_bytes(&value); + .send_bytes(&req_bytes); record_duration_metric_if_enabled( &Method::PUT, @@ -388,8 +541,8 @@ impl Consul { "{}/v1/kv/{}?recurse={}", self.config.address, request.key, request.recurse )); - if request.check_and_set != 0 { - url.push_str(&format!("&cas={}", request.check_and_set)); + if let Some(cas) = request.check_and_set { + url.push_str(&format!("&cas={}", cas)); } url = add_namespace_and_datacenter(url, request.namespace, request.datacenter); @@ -401,33 +554,60 @@ impl Consul { serde_json::from_slice(&bytes).map_err(ConsulError::ResponseDeserializationFailed) } - /// Obtains a lock against a specific key in consul. See the [consul docs](https://learn.hashicorp.com/tutorials/consul/application-leader-elections?in=consul/developer-configuration) for more information. + /// Attempts to acquire a lock for a given key. + /// This creates a new session that will own the lock, + /// if successful acquiring the lock, it will then return a `LockGuard` which will attempt to + /// release the lock when it goes out of scope. + /// If it fails to acquire the lock, it will return a `ConsulError::LockAcquisitionError` + /// See the [consul docs](https://learn.hashicorp.com/tutorials/consul/application-leader-elections?in=consul/developer-configuration) for more information. /// # Arguments: /// - request - the [LockRequest](consul::types::LockRequest) /// # Errors: /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. - pub async fn get_lock(&self, request: LockRequest<'_>, value: &[u8]) -> Result> { - let session = self.get_session(request).await?; + pub async fn get_lock( + &self, + mut request: LockRequest<'_>, + value: T, + ) -> Result> + where + T: Default + Serialize + std::fmt::Debug + Clone, + { + let session = self.create_session(&request).await?; + request.session_id = session.id.as_str(); + let _lock_res = self.get_lock_inner(&request, &value).await?; + // No need to check lock_res, if it didn't return Err, then the lock was successful + Ok(LockGuard { + timeout: request.timeout, + key: request.key.to_string(), + session_id: request.session_id.to_owned(), + consul: self, + datacenter: request.datacenter.to_string(), + namespace: request.namespace.to_string(), + value: Some(value), + }) + } + + /// Lower-level lock function wich acquires the lock in consul and returns true if the lock + /// succeeded, otherwise it returns a LockAcquisitionFailure error + /// See the [consul docs](https://learn.hashicorp.com/tutorials/consul/application-leader-elections?in=consul/developer-configuration) for more information. + /// # Arguments: + /// - request - the [LockRequest](consul::types::LockRequest) + /// # Errors: + /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. + pub async fn get_lock_inner(&self, request: &LockRequest<'_>, value: &T) -> Result + where + T: Default + Serialize + std::fmt::Debug, + { let req = CreateOrUpdateKeyRequest { key: request.key, namespace: request.namespace, datacenter: request.datacenter, - acquire: &session.id, + acquire: request.session_id, ..Default::default() }; - let value_copy = value.to_vec(); - let (lock_acquisition_result, _index) = self.create_or_update_key(req, value_copy).await?; + let (lock_acquisition_result, _index) = self.create_or_update_key(req, value).await?; if lock_acquisition_result { - let value_copy = value.to_vec(); - Ok(Lock { - timeout: request.timeout, - key: request.key.to_string(), - session_id: session.id, - consul: self, - datacenter: request.datacenter.to_string(), - namespace: request.namespace.to_string(), - value: Some(value_copy), - }) + Ok(lock_acquisition_result) } else { let watch_req = ReadKeyRequest { key: request.key, @@ -455,10 +635,13 @@ impl Consul { /// - request - the [LockWatchRequest](consul::types::LockWatchRequest) /// # Errors: /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. - pub async fn watch_lock<'a>( + pub async fn watch_lock<'a, T>( &self, request: LockWatchRequest<'_>, - ) -> Result> { + ) -> Result>> + where + T: Default + DeserializeOwned + std::fmt::Debug, + { let req = ReadKeyRequest { key: request.key, namespace: request.namespace, @@ -477,7 +660,7 @@ impl Consul { /// - payload: The [`RegisterEntityPayload`](RegisterEntityPayload) to provide the register entity API. /// # Errors: /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. - pub async fn register_entity(&self, payload: &RegisterEntityPayload) -> Result<()> { + pub async fn register_entity(&self, payload: &RegisterEntityRequest<'_>) -> Result<()> { let uri = format!("{}/v1/catalog/register", self.config.address); let request = hyper::Request::builder().method(Method::PUT).uri(uri); let payload = serde_json::to_string(payload).map_err(ConsulError::InvalidRequest)?; @@ -491,6 +674,26 @@ impl Consul { Ok(()) } + /// Un-Registers an entity from the consul Catalog + /// See https://www.consul.io/api-docs/catalog#deregister-entity for more information. + /// # Arguments: + /// - payload: The [`DeRegisterEntityPayload`](DeRegisterEntityPayload) to provide the register entity API. + /// # Errors: + /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. + pub async fn deregister_entity(&self, payload: &DeregisterEntityRequest<'_>) -> Result<()> { + let uri = format!("{}/v1/catalog/deregister", self.config.address); + let request = hyper::Request::builder().method(Method::PUT).uri(uri); + let payload = serde_json::to_string(payload).map_err(ConsulError::InvalidRequest)?; + self.execute_request( + request, + payload.into(), + Some(Duration::from_secs(5)), + DEREGISTER_ENTITY_METHOD_NAME, + ) + .await?; + Ok(()) + } + /// Returns all services currently registered with consul. /// See https://www.consul.io/api-docs/catalog#list-services for more information. /// # Arguments: @@ -499,20 +702,15 @@ impl Consul { /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. pub async fn get_all_registered_service_names( &self, - query_opts: Option, + query_opts: Option<&QueryOptions>, ) -> Result>> { - let mut uri = format!("{}/v1/catalog/services", self.config.address); - let query_opts = query_opts.unwrap_or_default(); - add_query_option_params(&mut uri, &query_opts, '?'); - - let request = hyper::Request::builder() - .method(Method::GET) - .uri(uri.clone()); + let opts = query_opts.unwrap_or(&(*DEFAULT_QUERY_OPTS)); + let request = self.create_get_catalog_request("services", opts); let (mut response_body, index) = self .execute_request( request, hyper::Body::empty(), - query_opts.timeout, + opts.timeout, GET_ALL_REGISTERED_SERVICE_NAMES_METHOD_NAME, ) .await?; @@ -526,9 +724,88 @@ impl Consul { }) } + /// Returns all datacenters currently registered in consul + /// See https://developer.hashicorp.com/consul/api-docs/catalog#list-datacenters + /// # Arguments: + /// - query_opts: The [`QueryOptions`](QueryOptions) to apply for this endpoint. + /// # Errors: + /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. + pub async fn get_datacenters( + &self, + query_opts: Option<&QueryOptions>, + ) -> Result>> { + let opts = query_opts.unwrap_or(&(*DEFAULT_QUERY_OPTS)); + let request = self.create_get_catalog_request("datacenters", opts); + let (mut response_body, index) = self + .execute_request(request, hyper::Body::empty(), opts.timeout, GET_DATACENTERS) + .await?; + let bytes = response_body.copy_to_bytes(response_body.remaining()); + let service_tags_by_name = serde_json::from_slice::>>(&bytes) + .map_err(ConsulError::ResponseDeserializationFailed)?; + + Ok(ResponseMeta { + response: service_tags_by_name.keys().cloned().collect(), + index, + }) + } + + fn create_get_catalog_request( + &self, + name: &str, + query_opts: &QueryOptions, + ) -> http::request::Builder { + let mut uri = format!("{}/v1/catalog/{}", self.config.address, name); + add_query_option_params(&mut uri, query_opts, '?'); + hyper::Request::builder() + .method(Method::GET) + .uri(uri.clone()) + } + + /// returns the Node by a given service and name + /// + /// If the node is not present, returns none. + /// + /// This uses the catalog#list-nodes function in the Consul API + /// then supplies a filter by the Node name + pub async fn get_node_by_name_and_service( + &self, + service: &str, + node_name: &str, + query_opts: Option, + ) -> Result>> { + let query_opts = query_opts.unwrap_or_default(); + let filter = format!("Node == {node_name}"); + let request = GetServiceNodesRequest { + service, + near: None, + passing: false, + filter: Some(filter.as_str()), + }; + let req = self.build_get_service_nodes_catalog_req(request, &query_opts); + let (mut response_body, index) = self + .execute_request( + req, + hyper::Body::empty(), + query_opts.timeout, + GET_SERVICE_NODES_METHOD_NAME, + ) + .await?; + let bytes = response_body.copy_to_bytes(response_body.remaining()); + let mut response = serde_json::from_slice::>(&bytes) + .map_err(ConsulError::ResponseDeserializationFailed)?; + if let Some(node) = response.pop() { + Ok(Some(ResponseMeta { + response: node, + index, + })) + } else { + Ok(None) + } + } + /// returns the nodes providing the service indicated on the path. /// Users can also build in support for dynamic load balancing and other features by incorporating the use of health checks. - /// See the [consul docs](https://www.consul.io/api-docs/health#list-nodes-for-service) for more information. + /// See the [consul docs](https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes-for-service) for more information. /// # Arguments: /// - request - the [GetServiceNodesRequest](consul::types::GetServiceNodesRequest) /// # Errors: @@ -537,9 +814,9 @@ impl Consul { &self, request: GetServiceNodesRequest<'_>, query_opts: Option, - ) -> Result> { + ) -> Result>> { let query_opts = query_opts.unwrap_or_default(); - let req = self.build_get_service_nodes_req(request, &query_opts); + let req = self.build_get_service_nodes_catalog_req(request, &query_opts); let (mut response_body, index) = self .execute_request( req, @@ -549,6 +826,34 @@ impl Consul { ) .await?; let bytes = response_body.copy_to_bytes(response_body.remaining()); + let response = serde_json::from_slice::>(&bytes) + .map_err(ConsulError::ResponseDeserializationFailed)?; + Ok(ResponseMeta { response, index }) + } + + /// returns the nodes providing the service indicated on the path. + /// Users can also build in support for dynamic load balancing and other features by incorporating the use of health checks. + /// See the [consul docs](https://www.consul.io/api-docs/health#list-nodes-for-service) for more information. + /// # Arguments: + /// - request - the [GetServiceNodesRequest](consul::types::GetServiceNodesRequest) + /// # Errors: + /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. + pub async fn get_service_nodes_health( + &self, + request: GetServiceNodesRequest<'_>, + query_opts: Option, + ) -> Result> { + let query_opts = query_opts.unwrap_or_default(); + let req = self.build_get_service_nodes_health_req(request, &query_opts); + let (mut response_body, index) = self + .execute_request( + req, + hyper::Body::empty(), + query_opts.timeout, + GET_SERVICE_NODES_HEALTH_METHOD_NAME, + ) + .await?; + let bytes = response_body.copy_to_bytes(response_body.remaining()); let response = serde_json::from_slice::(&bytes) .map_err(ConsulError::ResponseDeserializationFailed)?; Ok(ResponseMeta { response, index }) @@ -559,16 +864,15 @@ impl Consul { &self, service_name: &str, query_opts: Option, - ) -> Result> { + ) -> Result)>> { let request = GetServiceNodesRequest { service: service_name, passing: true, ..Default::default() }; - let services = self.get_service_nodes(request, query_opts).await.map_err(|e| { + let services = self.get_service_nodes_health(request, query_opts).await.map_err(|e| { let err = format!( - "Unable to query consul to resolve service '{}' to a list of addresses and ports: {:?}", - service_name, e + "Unable to query consul to resolve service '{service_name}' to a list of addresses and ports: {e:?}" ); error!("{}", err); ConsulError::ServiceInstanceResolutionFailed(service_name.to_string()) @@ -598,12 +902,12 @@ impl Consul { /// in the health endpoint. These requests models are primarily for the /// health endpoints /// https://www.consul.io/api-docs/health#list-nodes-for-service - fn parse_host_port_from_service_node_response(sn: ServiceNode) -> (String, u16) { + fn parse_host_port_from_service_node_response(sn: ServiceNode) -> (String, Option) { ( if sn.service.address.is_empty() { info!( - "Consul service {service_name} instance had an empty Service address, with port:{port}", - service_name = &sn.service.service, port = sn.service.port + "Consul service {} instance had an empty Service address, with port:{:?}", + &sn.service.service, sn.service.port ); sn.node.address } else { @@ -643,10 +947,11 @@ impl Consul { req.uri(url) } - async fn get_session(&self, request: LockRequest<'_>) -> Result { + /// Create a new session + pub async fn create_session(&self, request: &LockRequest<'_>) -> Result { let session_req = CreateSessionRequest { lock_delay: request.lock_delay, - behavior: request.behavior, + behavior: request.behavior.clone(), ttl: request.timeout, ..Default::default() }; @@ -663,14 +968,35 @@ impl Consul { req, hyper::Body::from(create_session_json), None, - GET_SESSION_METHOD_NAME, + CREATE_SESSION_METHOD_NAME, ) .await?; let bytes = response_body.copy_to_bytes(response_body.remaining()); serde_json::from_slice(&bytes).map_err(ConsulError::ResponseDeserializationFailed) } - fn build_get_service_nodes_req( + fn build_get_service_nodes_catalog_req( + &self, + request: GetServiceNodesRequest<'_>, + query_opts: &QueryOptions, + ) -> http::request::Builder { + let req = hyper::Request::builder().method(Method::GET); + let mut url = String::new(); + url.push_str(&format!( + "{}/v1/catalog/service/{}", + self.config.address, request.service + )); + if request.passing { + url.push_str(&format!("?passing={}", request.passing)); + } + if let Some(filter) = request.filter { + url.push_str(&format!("&filter={filter}")); + } + add_query_option_params(&mut url, query_opts, '&'); + req.uri(url) + } + + fn build_get_service_nodes_health_req( &self, request: GetServiceNodesRequest<'_>, query_opts: &QueryOptions, @@ -681,12 +1007,14 @@ impl Consul { "{}/v1/health/service/{}", self.config.address, request.service )); - url.push_str(&format!("?passing={}", request.passing)); + if request.passing { + url.push_str(&format!("?passing={}", request.passing)); + } if let Some(near) = request.near { - url.push_str(&format!("&near={}", near)); + url.push_str(&format!("&near={near}")); } if let Some(filter) = request.filter { - url.push_str(&format!("&filter={}", filter)); + url.push_str(&format!("&filter={filter}")); } add_query_option_params(&mut url, query_opts, '&'); req.uri(url) @@ -769,9 +1097,16 @@ impl Consul { } } + fn build_create_txn_url(&self, datacenter: Option<&str>) -> String { + let mut url = format!("{}/v1/txn", self.config.address); + if let Some(dc) = datacenter { + url.push_str(&format!("?datacenter={dc}")); + } + url + } + fn build_create_or_update_url(&self, request: CreateOrUpdateKeyRequest<'_>) -> String { - let mut url = String::new(); - url.push_str(&format!("{}/v1/kv/{}", self.config.address, request.key)); + let mut url = format!("{}/v1/kv/{}", self.config.address, request.key); let mut added_query_param = false; if request.flags != 0 { url = add_query_param_separator(url, added_query_param); @@ -790,7 +1125,7 @@ impl Consul { } if let Some(cas_idx) = request.check_and_set { url = add_query_param_separator(url, added_query_param); - url.push_str(&format!("cas={}", cas_idx)); + url.push_str(&format!("cas={cas_idx}")); } add_namespace_and_datacenter(url, request.namespace, request.datacenter) @@ -800,18 +1135,18 @@ impl Consul { fn add_query_option_params(uri: &mut String, query_opts: &QueryOptions, mut separator: char) { if let Some(ns) = &query_opts.namespace { if !ns.is_empty() { - uri.push_str(&format!("{}ns={}", separator, ns)); + uri.push_str(&format!("{separator}ns={ns}")); separator = '&'; } } if let Some(dc) = &query_opts.datacenter { if !dc.is_empty() { - uri.push_str(&format!("{}dc={}", separator, dc)); + uri.push_str(&format!("{separator}dc={dc}")); separator = '&'; } } if let Some(idx) = query_opts.index { - uri.push_str(&format!("{}index={}", separator, idx)); + uri.push_str(&format!("{separator}index={idx}")); separator = '&'; if let Some(wait) = query_opts.wait { uri.push_str(&format!( @@ -884,22 +1219,76 @@ mod tests { use super::*; #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn create_and_read_key() { + async fn clear_and_create_and_read_string() { let consul = get_client(); let key = "test/consul/read"; - let string_value = "This is a test"; - let res = create_or_update_key_value(&consul, key, string_value).await; - assert_expected_result_with_index(res); + consul + .delete_key(DeleteKeyRequest { + key, + ..Default::default() + }) + .await + .unwrap(); + let string_value = "This is a test".to_owned(); + let res = create_or_update_key_value(&consul, key, &string_value) + .await + .unwrap(); + assert!(res.0); + let res = read_string(&consul, key).await.unwrap(); + assert_eq!(string_value, res.into_iter().next().unwrap().value.unwrap()); + } - let res = read_key(&consul, key).await; - verify_single_value_matches(res, string_value); + #[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)] + struct ComplexStruct { + stuff: HashMap, + wat: String, + num: u64, + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn clear_and_create_and_read_complex() { + let consul = get_client(); + let key = "test/consul/complex"; + consul + .delete_key(DeleteKeyRequest { + key, + ..Default::default() + }) + .await + .unwrap(); + let value = ComplexStruct { + stuff: HashMap::from([("hmmm".to_owned(), 1234234), ("lalala".to_owned(), 42)]), + wat: "this is wat".into(), + num: 424242424242424242, + }; + let res = create_or_update_key_value(&consul, key, &value) + .await + .unwrap(); + assert!(res.0); + let req = ReadKeyRequest::new().set_key(key); + let res = consul.read_key::(req).await.unwrap(); + assert_eq!(value, res.into_iter().next().unwrap().value.unwrap()); } #[tokio::test(flavor = "multi_thread")] async fn test_register_and_retrieve_services() { let consul = get_client(); - let new_service_name = "test-service-44".to_string(); + let new_service_name = "test-service-44"; + let list_request = GetServiceNodesRequest { + service: new_service_name, + ..Default::default() + }; + let list_response = consul.get_service_nodes(list_request, None).await.unwrap(); + + for sn in list_response.response.iter() { + let dereg_request = DeregisterEntityRequest { + node: "local", + service_id: Some(sn.service_id.as_ref().unwrap().as_str()), + ..Default::default() + }; + consul.deregister_entity(&dereg_request).await.unwrap(); + } // verify a service by this name is currently not registered let ResponseMeta { @@ -909,27 +1298,27 @@ mod tests { .get_all_registered_service_names(None) .await .expect("expected get_registered_service_names request to succeed"); - assert!(!service_names_before_register.contains(&new_service_name)); + assert!(!service_names_before_register.contains(&new_service_name.to_owned())); // register a new service - let payload = RegisterEntityPayload { - ID: None, - Node: "local".to_string(), - Address: "127.0.0.1".to_string(), - Datacenter: None, - TaggedAddresses: Default::default(), - NodeMeta: Default::default(), - Service: Some(RegisterEntityService { - ID: None, - Service: new_service_name.clone(), - Tags: vec![], - TaggedAddresses: Default::default(), - Meta: Default::default(), - Port: Some(42424), - Namespace: None, + let payload = RegisterEntityRequest { + id: None, + node: "local", + address: "127.0.0.1", + datacenter: None, + tagged_addresses: Default::default(), + node_meta: Default::default(), + service: Some(RegisterEntityService { + id: None, + service: new_service_name, + tags: vec![], + tagged_addresses: Default::default(), + meta: Default::default(), + port: Some(42424), + namespace: None, }), - Check: None, - SkipNodeUpdate: None, + check: None, + skip_node_update: None, }; consul .register_entity(&payload) @@ -944,7 +1333,7 @@ mod tests { .get_all_registered_service_names(None) .await .expect("expected get_registered_service_names request to succeed"); - assert!(service_names_after_register.contains(&new_service_name)); + assert!(service_names_after_register.contains(&new_service_name.to_owned())); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -968,7 +1357,7 @@ mod tests { let addresses: Vec = response .iter() - .map(|sn| sn.service.address.clone()) + .map(|sn| sn.service_address.as_ref().unwrap().clone()) .collect(); let expected_addresses = vec![ "1.1.1.1".to_string(), @@ -981,7 +1370,7 @@ mod tests { let _: Vec<_> = response .iter() - .map(|sn| assert_eq!("dc1", sn.node.datacenter)) + .map(|sn| assert_eq!("dc1", sn.datacenter)) .collect(); } @@ -989,14 +1378,14 @@ mod tests { async fn create_and_delete_key() { let consul = get_client(); let key = "test/consul/again"; - let string_value = "This is a new test"; - let res = create_or_update_key_value(&consul, key, string_value).await; + let string_value = "This is a new test".to_owned(); + let res = create_or_update_key_value(&consul, key, &string_value).await; assert_expected_result_with_index(res); let res = delete_key(&consul, key).await; assert_expected_result(res); - let res = read_key(&consul, key).await.unwrap_err(); + let res = read_string(&consul, key).await.unwrap_err(); match res { ConsulError::UnexpectedResponseCode(code, _body) => { assert_eq!(code, hyper::http::StatusCode::NOT_FOUND) @@ -1022,10 +1411,10 @@ mod tests { }; let session_id: String; { - let res = consul.get_lock(req, string_value.as_bytes()).await; + let res = consul.get_lock(req.clone(), string_value).await; assert!(res.is_ok()); let mut lock = res.unwrap(); - let res2 = consul.get_lock(req, string_value.as_bytes()).await; + let res2 = consul.get_lock(req, string_value).await; assert!(res2.is_err()); let err = res2.unwrap_err(); match err { @@ -1037,13 +1426,13 @@ mod tests { } session_id = lock.session_id.to_string(); // Lets change the value before dropping the lock to ensure the change is persisted when the lock is dropped. - lock.value = Some(new_string_value.as_bytes().to_vec()) + lock.value = Some(new_string_value) // lock gets dropped here. } sleep(Duration::from_secs(2)).await; - let key_resp = read_key(&consul, key).await; - verify_single_value_matches(key_resp, new_string_value); + let key_resp = read_string(&consul, key).await; + verify_single_value_matches(key_resp, &new_string_value.to_owned()); let req = LockRequest { key, @@ -1052,7 +1441,7 @@ mod tests { session_id: &session_id, ..Default::default() }; - let res = consul.get_lock(req, string_value.as_bytes()).await; + let res = consul.get_lock(req, string_value).await; assert!(res.is_ok()); } @@ -1060,7 +1449,14 @@ mod tests { async fn create_and_watch_lock() { let consul = get_client(); let key = "test/consul/watchedlock"; - let string_value = "This is a lock test"; + let _res = consul + .delete_key(DeleteKeyRequest { + key, + ..Default::default() + }) + .await + .unwrap(); + let string_value = "This is a lock test".to_owned(); let req = LockRequest { key, behavior: LockExpirationBehavior::Release, @@ -1068,12 +1464,14 @@ mod tests { ..Default::default() }; let start_index: u64; - let res = consul.get_lock(req, string_value.as_bytes()).await; - assert!(res.is_ok()); - let lock = res.unwrap(); - let res2 = consul.get_lock(req, string_value.as_bytes()).await; - assert!(res2.is_err()); - let err = res2.unwrap_err(); + let lock = consul + .get_lock(req.clone(), string_value.clone()) + .await + .unwrap(); + let err = consul + .get_lock(req.clone(), string_value.clone()) + .await + .unwrap_err(); match err { ConsulError::LockAcquisitionFailure(index) => start_index = index, _ => panic!( @@ -1087,16 +1485,14 @@ mod tests { key, consistency: ConsistencyMode::Consistent, index: Some(start_index), - wait: Duration::from_secs(60), + wait: Duration::from_secs(5), ..Default::default() }; // The lock will timeout and this this will return. - let res = consul.watch_lock(watch_req).await; - assert!(res.is_ok()); + let _res = consul.watch_lock::(watch_req).await.unwrap(); std::mem::drop(lock); // This ensures the lock is not dropped until after the request to watch it completes. - let res = consul.get_lock(req, string_value.as_bytes()).await; - assert!(res.is_ok()); + let _res = consul.get_lock(req, string_value).await.unwrap(); } #[test] @@ -1106,20 +1502,22 @@ mod tests { node: "node".to_string(), address: "1.1.1.1".to_string(), datacenter: "datacenter".to_string(), + tagged_addresses: HashMap::new(), + meta: HashMap::new(), }; let service = Service { id: "node".to_string(), service: "node".to_string(), address: "2.2.2.2".to_string(), - port: 32, + port: Some(32), }; let empty_service = Service { id: "".to_string(), service: "".to_string(), address: "".to_string(), - port: 32, + port: Some(32), }; let sn = ServiceNode { @@ -1145,7 +1543,7 @@ mod tests { async fn properly_handle_check_and_set() { let consul = get_client(); let key = "test/consul/proper_cas_handling"; - let string_value1 = "This is CAS test"; + let string_value1 = "This is CAS test".to_owned(); let req = CreateOrUpdateKeyRequest { key, check_and_set: Some(0), @@ -1155,24 +1553,24 @@ mod tests { // Key does not exist, with CAS set and modify index set to 0 // it should be created. let (set, _) = consul - .create_or_update_key(req.clone(), string_value1.as_bytes().to_vec()) + .create_or_update_key(req.clone(), &string_value1) .await .expect("failed to create key initially"); assert!(set); let (value, mod_idx1) = get_single_key_value_with_index(&consul, key).await; - assert_eq!(string_value1, &value.unwrap()); + assert_eq!(&string_value1, &value.unwrap()); // Subsequent request with CAS set to 0 should not override the // value. - let string_value2 = "This is CAS test - not valid"; + let string_value2 = "This is CAS test - not valid".to_owned(); let (set, _) = consul - .create_or_update_key(req, string_value2.as_bytes().to_vec()) + .create_or_update_key(req, &string_value2) .await .expect("failed to run subsequent create_or_update_key"); assert!(!set); // Value and modify index should not have changed because set failed. let (value, mod_idx2) = get_single_key_value_with_index(&consul, key).await; - assert_eq!(string_value1, &value.unwrap()); + assert_eq!(&string_value1, &value.unwrap()); assert_eq!(mod_idx1, mod_idx2); // Successfully set value with proper CAS value. @@ -1181,15 +1579,15 @@ mod tests { check_and_set: Some(mod_idx1), ..Default::default() }; - let string_value3 = "This is correct CAS updated"; + let string_value3 = "This is correct CAS updated".to_owned(); let (set, _) = consul - .create_or_update_key(req, string_value3.as_bytes().to_vec()) + .create_or_update_key(req, &string_value3) .await .expect("failed to run create_or_update_key with proper CAS value"); assert!(set); // Verify that value was updated and the index changed. let (value, mod_idx3) = get_single_key_value_with_index(&consul, key).await; - assert_eq!(string_value3, &value.unwrap()); + assert_eq!(&string_value3, &value.unwrap()); assert_ne!(mod_idx1, mod_idx3); // Successfully set value without CAS. @@ -1198,43 +1596,47 @@ mod tests { check_and_set: None, ..Default::default() }; - let string_value4 = "This is non CAS update"; + let string_value4 = "This is non CAS update".to_owned(); let (set, _) = consul - .create_or_update_key(req, string_value4.as_bytes().to_vec()) + .create_or_update_key(req, &string_value4) .await .expect("failed to run create_or_update_key without CAS"); assert!(set); // Verify that value was updated and the index changed. let (value, mod_idx4) = get_single_key_value_with_index(&consul, key).await; - assert_eq!(string_value4, &value.unwrap()); + assert_eq!(&string_value4, &value.unwrap()); assert_ne!(mod_idx3, mod_idx4); } + async fn get_single_key_value_with_index(consul: &Consul, key: &str) -> (Option, u64) { + let res = read_string(consul, key).await.expect("failed to read key"); + let r = res.into_iter().next().unwrap(); + (r.value, r.modify_index) + } + fn get_client() -> Consul { let conf: Config = Config::from_env(); Consul::new(conf) } - async fn create_or_update_key_value( + async fn create_or_update_key_value( consul: &Consul, key: &str, - value: &str, - ) -> Result<(bool, u64)> { + value: &T, + ) -> Result<(bool, u64)> + where + T: Serialize + std::fmt::Debug + ?Sized + Default, + { let req = CreateOrUpdateKeyRequest { key, ..Default::default() }; - consul - .create_or_update_key(req, value.as_bytes().to_vec()) - .await + Ok(consul.create_or_update_key(req, value).await?) } - async fn read_key(consul: &Consul, key: &str) -> Result> { - let req = ReadKeyRequest { - key, - ..Default::default() - }; - consul.read_key(req).await + async fn read_string(consul: &Consul, key: &str) -> Result>> { + let req = ReadKeyRequest::new().set_key(key); + consul.read_key::(req).await } async fn delete_key(consul: &Consul, key: &str) -> Result { @@ -1256,16 +1658,13 @@ mod tests { assert!(res.unwrap()); } - async fn get_single_key_value_with_index(consul: &Consul, key: &str) -> (Option, i64) { - let res = read_key(consul, key).await.expect("failed to read key"); - let r = res.into_iter().next().unwrap(); - (r.value, r.modify_index) - } - - fn verify_single_value_matches(res: Result>, value: &str) { + fn verify_single_value_matches<'a, T>(res: Result>>, value: &'a T) + where + T: PartialEq + std::fmt::Debug + Default, + { assert!(res.is_ok()); assert_eq!( - res.unwrap().into_iter().next().unwrap().value.unwrap(), + &res.unwrap().into_iter().next().unwrap().value.unwrap(), value ) } diff --git a/src/types.rs b/src/types.rs index b78e0c4..eb8285c 100644 --- a/src/types.rs +++ b/src/types.rs @@ -25,7 +25,8 @@ SOFTWARE. use std::collections::HashMap; use std::time::Duration; -use serde::{self, Deserialize, Serialize, Serializer}; +use base64::{engine::general_purpose::STANDARD as B64, Engine}; +use serde::{self, de::Deserializer, de::Error as SerdeError, Deserialize, Serialize, Serializer}; use smart_default::SmartDefault; // TODO retrofit other get APIs to use this struct @@ -71,7 +72,7 @@ pub struct ResponseMeta { } /// Represents a request to delete a key or all keys sharing a prefix from Consul's Key Value store. -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, SmartDefault, Serialize, PartialEq, Eq)] pub struct DeleteKeyRequest<'a> { /// Specifies the path of the key to delete. pub key: &'a str, @@ -84,14 +85,14 @@ pub struct DeleteKeyRequest<'a> { /// This is very useful as a building block for more complex synchronization primitives. /// The index must be greater than 0 for Consul to take any action: a 0 index will not delete the key. /// If the index is non-zero, the key is only deleted if the index matches the ModifyIndex of that key. - pub check_and_set: u32, + pub check_and_set: Option, /// Specifies the namespace to query. /// If not provided, the namespace will be inferred from the request's ACL token, or will default to the default namespace. pub namespace: &'a str, } /// Represents a request to read a key from Consul's Key Value store. -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, SmartDefault, Serialize, PartialEq, Eq)] pub struct ReadKeyRequest<'a> { /// Specifies the path of the key to read. pub key: &'a str, @@ -117,8 +118,40 @@ pub struct ReadKeyRequest<'a> { pub wait: Duration, } +macro_rules! builder_fun { + ($nm:ident, $fun:ident, $parm:ty) => { + /// Builder-style method to set $nm on the object and return `self` + pub fn $fun(self, $nm: $parm) -> Self { + ReadKeyRequest { $nm, ..self } + } + }; +} +impl<'a> ReadKeyRequest<'a> { + /// Construct a default ReadKeyRequest to be used with the builder API + /// e.g. + /// ```rust + /// use rs_consul::ReadKeyRequest; + /// let req = ReadKeyRequest::new() + /// .set_key("bar") + /// .set_namespace("foo") + /// .set_recurse(true); + /// ``` + pub fn new() -> Self { + Default::default() + } + + builder_fun!(key, set_key, &'a str); + builder_fun!(namespace, set_namespace, &'a str); + builder_fun!(datacenter, set_datacenter, &'a str); + builder_fun!(recurse, set_recurse, bool); + builder_fun!(separator, set_separator, &'a str); + builder_fun!(consistency, set_consistency, ConsistencyMode); + builder_fun!(index, set_index, Option); + builder_fun!(wait, set_wait, Duration); +} + /// Represents a request to read a key from Consul's Key Value store. -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, SmartDefault, Serialize, PartialEq, Eq)] pub struct LockWatchRequest<'a> { /// Specifies the path of the key to read. pub key: &'a str, @@ -140,7 +173,7 @@ pub struct LockWatchRequest<'a> { } /// Represents a request to read a key from Consul Key Value store. -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, SmartDefault, Serialize, PartialEq, Eq)] pub struct CreateOrUpdateKeyRequest<'a> { /// Specifies the path of the key. pub key: &'a str, @@ -158,7 +191,7 @@ pub struct CreateOrUpdateKeyRequest<'a> { /// This is very useful as a building block for more complex synchronization primitives. /// If the index is 0, Consul will only put the key if it does not already exist. /// If the index is non-zero, the key is only set if the index matches the ModifyIndex of that key. - pub check_and_set: Option, + pub check_and_set: Option, /// Supply a session ID to use in a lock acquisition operation. /// This is useful as it allows leader election to be built on top of Consul. /// If the lock is not held and the session is valid, this increments the LockIndex and sets the Session value of the key in addition to updating the key contents. @@ -173,32 +206,69 @@ pub struct CreateOrUpdateKeyRequest<'a> { pub release: &'a str, } -/// Represents a request to read a key from Consul Key Value store. -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] +/// An operation to be executed within a transaction +/// See https://developer.hashicorp.com/consul/api-docs/txn for more info +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "PascalCase")] +pub struct TransactionOp<'a> { + /// The type of operation to execute + pub verb: TransactionOpVerb, + /// The key on which to operate + pub key: &'a str, + /// The value to set (if applicable) + pub value: Base64Vec, + #[serde(rename = "Index")] + /// The modify_index if it is a cas operation + #[serde(skip_serializing_if = "Option::is_none")] + pub check_and_set: Option, + /// Optional flags to associate with the key + pub flags: u64, + /// Namespace on which to operate + pub namespace: &'a str, +} + +/// Response from Consul for a txn request +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "PascalCase")] +pub struct TransactionResponse { + /// The key on which the operation was executed + pub key: String, + /// The resulting value from the key (if applicable) + pub value: Option>, + /// The index at which the key was created + pub create_index: u64, + /// The index at which the key was locked + pub lock_index: u64, + /// The index at which the key was modified + pub modify_index: u64, +} + +/// Represents a response from reading a key from Consul Key Value store. +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] -pub struct ReadKeyResponse { +pub struct ReadKeyResponse { /// CreateIndex is the internal index value that represents when the entry was created. - pub create_index: i64, + pub create_index: u64, /// ModifyIndex is the last index that modified this key. /// It can be used to establish blocking queries by setting the ?index query parameter. /// You can even perform blocking queries against entire subtrees of the KV store: if ?recurse is provided, the returned X-Consul-Index corresponds to the latest ModifyIndex within the prefix, and a blocking query using that ?index will wait until any key within that prefix is updated. - pub modify_index: i64, + pub modify_index: u64, /// LockIndex is the number of times this key has successfully been acquired in a lock. /// If the lock is held, the Session key provides the session that owns the lock. - pub lock_index: i64, + pub lock_index: u64, /// Key is simply the full path of the entry. pub key: String, /// Flags is an opaque unsigned integer that can be attached to each entry. /// Clients can choose to use this however makes sense for their application. pub flags: u64, /// Value is a base64-encoded blob of data. - pub value: Option, + pub value: Option, /// If a lock is held, the Session key provides the session that owns the lock. pub session: Option, } /// Represents a request to create a lock . -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Copy)] +#[derive(Clone, Debug, SmartDefault, Serialize, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] pub struct LockRequest<'a> { /// The key to use for locking. @@ -229,7 +299,7 @@ pub struct LockRequest<'a> { } /// Controls the behavior of locks when a session is invalidated. See [consul docs](https://www.consul.io/api-docs/session#behavior) for more information. -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Copy)] +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum LockExpirationBehavior { #[default] @@ -241,7 +311,7 @@ pub enum LockExpirationBehavior { /// Most of the read query endpoints support multiple levels of consistency. /// Since no policy will suit all clients' needs, these consistency modes allow the user to have the ultimate say in how to balance the trade-offs inherent in a distributed system. -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] pub enum ConsistencyMode { /// If not specified, the default is strongly consistent in almost all cases. @@ -263,13 +333,15 @@ pub enum ConsistencyMode { Stale, } -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] -pub(crate) struct SessionResponse { +/// Response from the session-creation step +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] +pub struct SessionResponse { #[serde(rename = "ID")] - pub(crate) id: String, + /// The Id of the created session + pub id: String, } -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, SmartDefault, Serialize, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] pub(crate) struct CreateSessionRequest { #[default(_code = "Duration::from_secs(0)")] @@ -290,93 +362,122 @@ pub(crate) struct CreateSessionRequest { /// Payload struct to register or update entries in consul's catalog. /// See https://www.consul.io/api-docs/catalog#register-entity for more information. -#[allow(non_snake_case)] -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct RegisterEntityPayload { +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "PascalCase")] +pub struct RegisterEntityRequest<'a> { /// Optional UUID to assign to the node. This string is required to be 36-characters and UUID formatted. #[serde(skip_serializing_if = "Option::is_none")] - pub ID: Option, + #[serde(rename = "ID")] + pub id: Option<&'a str>, /// Node ID to register. - pub Node: String, + pub node: &'a str, /// The address to register. - pub Address: String, + pub address: &'a str, /// The datacenter to register in, defaults to the agent's datacenter. #[serde(skip_serializing_if = "Option::is_none")] - pub Datacenter: Option, + pub datacenter: Option<&'a str>, /// Tagged addressed to register with. #[serde(skip_serializing_if = "HashMap::is_empty")] - pub TaggedAddresses: HashMap, + pub tagged_addresses: HashMap<&'a str, &'a str>, /// KV metadata paris to register with. #[serde(skip_serializing_if = "HashMap::is_empty")] - pub NodeMeta: HashMap, + pub node_meta: HashMap<&'a str, &'a str>, /// Optional service to register. #[serde(skip_serializing_if = "Option::is_none")] - pub Service: Option, + pub service: Option>, /// Optional check to register #[serde(skip_serializing_if = "Option::is_none")] - pub Check: Option, + pub check: Option, /// Whether to skip updating the nodes information in the registration. #[serde(skip_serializing_if = "Option::is_none")] - pub SkipNodeUpdate: Option, + pub skip_node_update: Option, } /// The service to register with consul's global catalog. /// See https://www.consul.io/api/agent/service for more information. -#[allow(non_snake_case)] -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct RegisterEntityService { +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "PascalCase")] +pub struct RegisterEntityService<'a> { /// ID to register service will, defaults to Service.Service property. #[serde(skip_serializing_if = "Option::is_none")] - pub ID: Option, + #[serde(rename = "ID")] + pub id: Option<&'a str>, /// The name of the service. - pub Service: String, + pub service: &'a str, /// Optional tags associated with the service. #[serde(skip_serializing_if = "Vec::is_empty")] - pub Tags: Vec, + pub tags: Vec<&'a str>, /// Optional map of explicit LAN and WAN addresses for the service. #[serde(skip_serializing_if = "HashMap::is_empty")] - pub TaggedAddresses: HashMap, + pub tagged_addresses: HashMap<&'a str, &'a str>, /// Optional key value meta associated with the service. #[serde(skip_serializing_if = "HashMap::is_empty")] - pub Meta: HashMap, + pub meta: HashMap<&'a str, &'a str>, /// The port of the service #[serde(skip_serializing_if = "Option::is_none")] - pub Port: Option, + pub port: Option, /// The consul namespace to register the service in. #[serde(skip_serializing_if = "Option::is_none")] - pub Namespace: Option, + pub namespace: Option<&'a str>, } /// Information related to registering a check. /// See https://www.consul.io/docs/discovery/checks for more information. -#[allow(non_snake_case)] #[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] pub struct RegisterEntityCheck { /// The node to execute the check on. #[serde(skip_serializing_if = "Option::is_none")] - pub Node: Option, + pub node: Option, /// Optional check id, defaults to the name of the check. #[serde(skip_serializing_if = "Option::is_none")] - pub CheckID: Option, + #[serde(rename = "CheckID")] + pub check_id: Option, /// The name associated with the check - pub Name: String, + pub name: String, /// Opaque field encapsulating human-readable text. #[serde(skip_serializing_if = "Option::is_none")] - pub Notes: Option, + pub notes: Option, /// The status of the check. Must be one of 'passing', 'warning', or 'critical'. #[serde(skip_serializing_if = "Option::is_none")] - pub Status: Option, + pub status: Option, /// ID of the service this check is for. If no ID of a service running on the node is provided, /// the check is treated as a node level check #[serde(skip_serializing_if = "Option::is_none")] - pub ServiceID: Option, + #[serde(rename = "ServiceID")] + pub service_id: Option, /// Details for a TCP or HTTP health check. #[serde(skip_serializing_if = "HashMap::is_empty")] - pub Definition: HashMap, + pub definition: HashMap, +} + +/// Request body for de-registering a check or service from the Catalog +/// See https://developer.hashicorp.com/consul/api-docs/catalog#deregister-entity for more +/// information +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct DeregisterEntityRequest<'a> { + /// The node on which to execute the registration + pub node: &'a str, + #[serde(skip_serializing_if = "Option::is_none")] + /// Optional string to specify which datacenter to find the node. If not supplied, defaults to + /// the DC of the agent to which this client is connected + pub datacenter: Option<&'a str>, + /// Specifies the ID of the Check to remove + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "CheckID")] + pub check_id: Option<&'a str>, + /// Specifies the ID of the Service to remove + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "ServiceID")] + pub service_id: Option<&'a str>, + /// The consul namespace to register the service in. + #[serde(skip_serializing_if = "Option::is_none")] + pub namespace: Option<&'a str>, } /// Request for the nodes providing a specified service registered in Consul. -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, SmartDefault, Serialize, PartialEq, Eq)] pub struct GetServiceNodesRequest<'a> { /// Specifies the service to list services for. This is provided as part of the URL. pub service: &'a str, @@ -394,7 +495,7 @@ pub struct GetServiceNodesRequest<'a> { pub(crate) type GetServiceNodesResponse = Vec; -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] /// An instance of a node providing a Consul service. pub struct ServiceNode { @@ -404,9 +505,10 @@ pub struct ServiceNode { pub service: Service, } -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] /// The node information of an instance providing a Consul service. +/// provided by the Consul Health API pub struct Node { /// The ID of the service node. #[serde(rename = "ID")] @@ -417,9 +519,57 @@ pub struct Node { pub address: String, /// The datacenter where this node is running on. pub datacenter: String, + /// List of explicit WAN and LAN addresses for the node + #[serde(deserialize_with = "null_to_default")] + pub tagged_addresses: HashMap, + /// Map of metadata options + #[serde(deserialize_with = "null_to_default")] + pub meta: HashMap, } -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "PascalCase")] +/// The node information as returned by the Consul Catalog API +pub struct NodeFull { + /// id + #[serde(rename = "ID")] + pub id: String, + /// node + pub node: String, + /// address + pub address: String, + /// datacenter + pub datacenter: String, + /// tagged_addresses + pub tagged_addresses: HashMap, + /// node_meta + pub node_meta: HashMap, + /// create_index + pub create_index: u64, + /// modify_index + pub modify_index: u64, + /// service_address + pub service_address: Option, + /// service_enable_tag_override + pub service_enable_tag_override: Option, + #[serde(rename = "Service_ID")] + /// service_id + pub service_id: Option, + /// service_name + pub service_name: Option, + /// service_port + pub service_port: Option, + /// service_meta + pub service_meta: HashMap, + /// service_tagged_addresses + pub service_tagged_addresses: HashMap>, + /// service_tags + pub service_tags: Vec, + /// namespace + pub namespace: Option, +} + +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] /// The service information of an instance providing a Consul service. pub struct Service { @@ -431,7 +581,7 @@ pub struct Service { /// The address of the instance. pub address: String, /// The port of the instance. - pub port: u16, + pub port: Option, } pub(crate) fn serialize_duration_as_string( @@ -451,3 +601,87 @@ pub(crate) fn duration_as_string(duration: &Duration) -> String { res.push('s'); res } + +/// Operation types for all available verbs within a Consul Transaction +/// See https://developer.hashicorp.com/consul/api-docs/txn#tables-of-operations for more +/// information +/// NOTE: Presently only the KV-based operations are supported by this client +#[derive(Clone, SmartDefault, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "kebab-case")] +pub enum TransactionOpVerb { + #[default] + /// Sets the Key to the given Value + Set, + /// Sets, but with CAS semantics + Cas, + /// Lock with the given session + Lock, + /// Unlock with the given session + Unlock, + /// Get the key, fails if the key doesn't exist + Get, + /// Get all keys using the 'key' field as a prefix + GetTree, + /// Fail if modify_index != index + CheckIndex, + /// Fail if not locked by the supplied session + CheckSession, + /// Fail if key exists + CheckNotExists, + /// Delete the key (and value at the key) + Delete, + /// Delete all keys/vals starting with prefix + DeleteTree, + /// Delete, but with CAS semantics + DeleteCas, +} + +/// A helper type which serializes a `Vec` from/to a bas64 encoded String +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct Base64Vec(pub Vec); + +impl Serialize for Base64Vec { + fn serialize(&self, serializer: S) -> Result { + serializer.collect_str(&B64.encode(&self.0)) + } +} + +impl<'de> Deserialize<'de> for Base64Vec { + fn deserialize>(deserializer: D) -> Result { + struct Vis; + impl serde::de::Visitor<'_> for Vis { + type Value = Base64Vec; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a base64 string") + } + + fn visit_str(self, v: &str) -> Result { + B64.decode(v).map(Base64Vec).map_err(SerdeError::custom) + } + } + deserializer.deserialize_str(Vis) + } +} + +impl From> for Base64Vec { + fn from(a: Vec) -> Base64Vec { + Base64Vec(a) + } +} + +impl From for Vec { + fn from(a: Base64Vec) -> Vec { + a.0 + } +} + +fn null_to_default<'de, D, T>(d: D) -> Result +where + D: Deserializer<'de>, + T: Default + Deserialize<'de>, +{ + let opt = Option::deserialize(d)?; + let val = opt.unwrap_or_default(); + Ok(val) +}