Skip to content

Commit

Permalink
Merge branch 'rolling' into FEAT/528_new-experimental-feature-to-impr…
Browse files Browse the repository at this point in the history
…ove-performance-using-io-uring-on-linux-distributions
  • Loading branch information
neon-mmd authored Mar 11, 2024
2 parents 4bac2a5 + 991f3f5 commit c3da0b3
Show file tree
Hide file tree
Showing 25 changed files with 379 additions and 222 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ path = "src/bin/websurfx.rs"

[dependencies]
reqwest = {version="0.11.24", default-features=false, features=["rustls-tls","brotli", "gzip"]}
tokio = {version="1.32.0",features=["rt-multi-thread","macros"], default-features = false}
tokio = {version="1.32.0",features=["rt-multi-thread","macros", "fs", "io-util"], default-features = false}
serde = {version="1.0.196", default-features=false, features=["derive"]}
serde_json = {version="1.0.109", default-features=false}
maud = {version="0.25.0", default-features=false, features=["actix-web"]}
Expand All @@ -32,13 +32,13 @@ error-stack = {version="0.4.0", default-features=false, features=["std"]}
async-trait = {version="0.1.76", default-features=false}
regex = {version="1.9.4", features=["perf"], default-features = false}
smallvec = {version="1.13.1", features=["union", "serde"], default-features=false}
futures = {version="0.3.28", default-features=false}
dhat = {version="0.3.3", optional = true, default-features=false}
futures = {version="0.3.30", default-features=false, features=["alloc"]}
dhat = {version="0.3.2", optional = true, default-features=false}
mimalloc = { version = "0.1.38", default-features = false }
async-once-cell = {version="0.5.3", default-features=false}
actix-governor = {version="0.5.0", default-features=false}
mini-moka = { version="0.10", optional = true, default-features=false, features=["sync"]}
brotli = { version = "3.4.0", default-features = false, features=["std"], optional=true}
async-compression = { version = "0.4.6", default-features = false, features=["brotli","tokio"], optional=true}
chacha20poly1305={version="0.10.1", default-features=false, features=["alloc","getrandom"], optional=true}
chacha20 = {version="0.9.1", default-features=false, optional=true}
base64 = {version="0.21.5", default-features=false, features=["std"], optional=true}
Expand Down Expand Up @@ -84,7 +84,7 @@ default = ["memory-cache"]
dhat-heap = ["dep:dhat"]
memory-cache = ["dep:mini-moka"]
redis-cache = ["dep:redis","dep:base64"]
compress-cache-results = ["dep:brotli","dep:cfg-if"]
compress-cache-results = ["dep:async-compression","dep:cfg-if"]
encrypt-cache-results = ["dep:chacha20poly1305","dep:chacha20"]
cec-cache-results = ["compress-cache-results","encrypt-cache-results"]
experimental-io-uring = ["actix-web/experimental-io-uring"]
13 changes: 8 additions & 5 deletions src/bin/websurfx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#[cfg(not(feature = "dhat-heap"))]
use mimalloc::MiMalloc;

use std::net::TcpListener;
use std::{net::TcpListener, sync::OnceLock};
use websurfx::{cache::cacher::create_cache, config::parser::Config, run};

/// A dhat heap memory profiler
Expand All @@ -17,6 +17,9 @@ static ALLOC: dhat::Alloc = dhat::Alloc;
#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;

/// A static constant for holding the parsed config.
static CONFIG: OnceLock<Config> = OnceLock::new();

/// The function that launches the main server and registers all the routes of the website.
///
/// # Error
Expand All @@ -29,10 +32,10 @@ async fn main() -> std::io::Result<()> {
#[cfg(feature = "dhat-heap")]
let _profiler = dhat::Profiler::new_heap();

// Initialize the parsed config file.
let config = Config::parse(false).unwrap();
// Initialize the parsed config globally.
let config = CONFIG.get_or_init(|| Config::parse(false).unwrap());

let cache = create_cache(&config).await;
let cache = create_cache(config).await;

log::info!(
"started server on port {} and IP {}",
Expand All @@ -45,7 +48,7 @@ async fn main() -> std::io::Result<()> {
config.port,
);

let listener = TcpListener::bind((config.binding_ip.clone(), config.port))?;
let listener = TcpListener::bind((config.binding_ip.as_str(), config.port))?;

run(listener, config, cache)?.await
}
68 changes: 40 additions & 28 deletions src/cache/cacher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub trait Cacher: Send + Sync {
feature = "encrypt-cache-results",
feature = "cec-cache-results"
))]
fn encrypt_or_decrypt_results(
async fn encrypt_or_decrypt_results(
&mut self,
mut bytes: Vec<u8>,
encrypt: bool,
Expand Down Expand Up @@ -137,11 +137,19 @@ pub trait Cacher: Send + Sync {
/// Returns the compressed bytes on success otherwise it returns a CacheError
/// on failure.
#[cfg(any(feature = "compress-cache-results", feature = "cec-cache-results"))]
fn compress_results(&mut self, mut bytes: Vec<u8>) -> Result<Vec<u8>, Report<CacheError>> {
use std::io::Write;
let mut writer = brotli::CompressorWriter::new(Vec::new(), 4096, 11, 22);
async fn compress_results(
&mut self,
mut bytes: Vec<u8>,
) -> Result<Vec<u8>, Report<CacheError>> {
use tokio::io::AsyncWriteExt;
let mut writer = async_compression::tokio::write::BrotliEncoder::new(Vec::new());
writer
.write_all(&bytes)
.await
.map_err(|_| CacheError::CompressionError)?;
writer
.shutdown()
.await
.map_err(|_| CacheError::CompressionError)?;
bytes = writer.into_inner();
Ok(bytes)
Expand All @@ -159,17 +167,17 @@ pub trait Cacher: Send + Sync {
/// Returns the compressed and encrypted bytes on success otherwise it returns a CacheError
/// on failure.
#[cfg(feature = "cec-cache-results")]
fn compress_encrypt_compress_results(
async fn compress_encrypt_compress_results(
&mut self,
mut bytes: Vec<u8>,
) -> Result<Vec<u8>, Report<CacheError>> {
// compress first
bytes = self.compress_results(bytes)?;
bytes = self.compress_results(bytes).await?;
// encrypt
bytes = self.encrypt_or_decrypt_results(bytes, true)?;
bytes = self.encrypt_or_decrypt_results(bytes, true).await?;

// compress again;
bytes = self.compress_results(bytes)?;
bytes = self.compress_results(bytes).await?;

Ok(bytes)
}
Expand All @@ -187,19 +195,19 @@ pub trait Cacher: Send + Sync {
/// on failure.
#[cfg(any(feature = "compress-cache-results", feature = "cec-cache-results"))]
fn decompress_results(&mut self, bytes: &[u8]) -> Result<Vec<u8>, Report<CacheError>> {
async fn decompress_results(&mut self, bytes: &[u8]) -> Result<Vec<u8>, Report<CacheError>> {
cfg_if::cfg_if! {
if #[cfg(feature = "compress-cache-results")]
{
decompress_util(bytes)
decompress_util(bytes).await

}
else if #[cfg(feature = "cec-cache-results")]
{
let decompressed = decompress_util(bytes)?;
let decrypted = self.encrypt_or_decrypt_results(decompressed, false)?;

decompress_util(&decrypted)
decompress_util(&decrypted).await

}
}
Expand All @@ -216,27 +224,28 @@ pub trait Cacher: Send + Sync {
/// # Error
/// Returns a Vec of compressed or encrypted bytes on success otherwise it returns a CacheError
/// on failure.
fn pre_process_search_results(
async fn pre_process_search_results(
&mut self,
search_results: &SearchResults,
) -> Result<Vec<u8>, Report<CacheError>> {
#[allow(unused_mut)] // needs to be mutable when any of the features is enabled
let mut bytes: Vec<u8> = search_results.try_into()?;
#[cfg(feature = "compress-cache-results")]
{
let compressed = self.compress_results(bytes)?;
let compressed = self.compress_results(bytes).await?;
bytes = compressed;
}

#[cfg(feature = "encrypt-cache-results")]
{
let encrypted = self.encrypt_or_decrypt_results(bytes, true)?;
let encrypted = self.encrypt_or_decrypt_results(bytes, true).await?;
bytes = encrypted;
}

#[cfg(feature = "cec-cache-results")]
{
let compressed_encrypted_compressed = self.compress_encrypt_compress_results(bytes)?;
let compressed_encrypted_compressed =
self.compress_encrypt_compress_results(bytes).await?;
bytes = compressed_encrypted_compressed;
}

Expand All @@ -256,25 +265,25 @@ pub trait Cacher: Send + Sync {
/// on failure.
#[allow(unused_mut)] // needs to be mutable when any of the features is enabled
fn post_process_search_results(
async fn post_process_search_results(
&mut self,
mut bytes: Vec<u8>,
) -> Result<SearchResults, Report<CacheError>> {
#[cfg(feature = "compress-cache-results")]
{
let decompressed = self.decompress_results(&bytes)?;
let decompressed = self.decompress_results(&bytes).await?;
bytes = decompressed
}

#[cfg(feature = "encrypt-cache-results")]
{
let decrypted = self.encrypt_or_decrypt_results(bytes, false)?;
let decrypted = self.encrypt_or_decrypt_results(bytes, false).await?;
bytes = decrypted
}

#[cfg(feature = "cec-cache-results")]
{
let decompressed_decrypted = self.decompress_results(&bytes)?;
let decompressed_decrypted = self.decompress_results(&bytes).await?;
bytes = decompressed_decrypted;
}

Expand All @@ -295,16 +304,19 @@ pub trait Cacher: Send + Sync {
/// on failure.
#[cfg(any(feature = "compress-cache-results", feature = "cec-cache-results"))]
fn decompress_util(input: &[u8]) -> Result<Vec<u8>, Report<CacheError>> {
use std::io::Write;
let mut writer = brotli::DecompressorWriter::new(Vec::new(), 4096);
async fn decompress_util(input: &[u8]) -> Result<Vec<u8>, Report<CacheError>> {
use tokio::io::AsyncWriteExt;
let mut writer = async_compression::tokio::write::BrotliDecoder::new(Vec::new());

writer
.write_all(input)
.await
.map_err(|_| CacheError::CompressionError)?;
let bytes = writer
.into_inner()
writer
.shutdown()
.await
.map_err(|_| CacheError::CompressionError)?;
let bytes = writer.into_inner();
Ok(bytes)
}

Expand All @@ -329,7 +341,7 @@ impl Cacher for RedisCache {
let bytes = base64::engine::general_purpose::STANDARD_NO_PAD
.decode(base64_string)
.map_err(|_| CacheError::Base64DecodingOrEncodingError)?;
self.post_process_search_results(bytes)
self.post_process_search_results(bytes).await
}

async fn cache_results(
Expand All @@ -345,7 +357,7 @@ impl Cacher for RedisCache {
let mut bytes = Vec::with_capacity(search_results_len);

for result in search_results {
let processed = self.pre_process_search_results(result)?;
let processed = self.pre_process_search_results(result).await?;
bytes.push(processed);
}

Expand Down Expand Up @@ -405,7 +417,7 @@ impl Cacher for InMemoryCache {
async fn cached_results(&mut self, url: &str) -> Result<SearchResults, Report<CacheError>> {
let hashed_url_string = self.hash_url(url);
match self.cache.get(&hashed_url_string) {
Some(res) => self.post_process_search_results(res),
Some(res) => self.post_process_search_results(res).await,
None => Err(Report::new(CacheError::MissingValue)),
}
}
Expand All @@ -417,7 +429,7 @@ impl Cacher for InMemoryCache {
) -> Result<(), Report<CacheError>> {
for (url, search_result) in urls.iter().zip(search_results.iter()) {
let hashed_url_string = self.hash_url(url);
let bytes = self.pre_process_search_results(search_result)?;
let bytes = self.pre_process_search_results(search_result).await?;
self.cache.insert(hashed_url_string, bytes);
}

Expand Down
37 changes: 27 additions & 10 deletions src/cache/redis_cacher.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
//! This module provides the functionality to cache the aggregated results fetched and aggregated
//! from the upstream search engines in a json format.
use super::error::CacheError;
use error_stack::Report;
use futures::future::try_join_all;
use futures::stream::FuturesUnordered;
use redis::{aio::ConnectionManager, AsyncCommands, Client, RedisError};

use super::error::CacheError;
/// A constant holding the redis pipeline size.
const REDIS_PIPELINE_SIZE: usize = 3;

/// A named struct which stores the redis Connection url address to which the client will
/// connect to.
#[derive(Clone)]
pub struct RedisCache {
/// It stores a pool of connections ready to be used.
connection_pool: Vec<ConnectionManager>,
Expand All @@ -20,6 +21,8 @@ pub struct RedisCache {
current_connection: u8,
/// It stores the max TTL for keys.
cache_ttl: u16,
/// It stores the redis pipeline struct of size 3.
pipeline: redis::Pipeline,
}

impl RedisCache {
Expand All @@ -30,6 +33,8 @@ impl RedisCache {
/// * `redis_connection_url` - It takes the redis Connection url address.
/// * `pool_size` - It takes the size of the connection pool (in other words the number of
/// connections that should be stored in the pool).
/// * `cache_ttl` - It takes the the time to live for cached results to live in the redis
/// server.
///
/// # Error
///
Expand All @@ -41,18 +46,28 @@ impl RedisCache {
cache_ttl: u16,
) -> Result<Self, Box<dyn std::error::Error>> {
let client = Client::open(redis_connection_url)?;
let mut tasks: Vec<_> = Vec::new();
let tasks: FuturesUnordered<_> = FuturesUnordered::new();

for _ in 0..pool_size {
tasks.push(client.get_connection_manager());
let client_partially_cloned = client.clone();
tasks.push(tokio::spawn(async move {
client_partially_cloned.get_connection_manager().await
}));
}

let mut outputs = Vec::new();
for task in tasks {
outputs.push(task.await??);
}

let redis_cache = RedisCache {
connection_pool: try_join_all(tasks).await?,
connection_pool: outputs,
pool_size,
current_connection: Default::default(),
cache_ttl,
pipeline: redis::Pipeline::with_capacity(REDIS_PIPELINE_SIZE),
};

Ok(redis_cache)
}

Expand Down Expand Up @@ -122,13 +137,14 @@ impl RedisCache {
keys: impl Iterator<Item = String>,
) -> Result<(), Report<CacheError>> {
self.current_connection = Default::default();
let mut pipeline = redis::Pipeline::with_capacity(3);

for (key, json_result) in keys.zip(json_results) {
pipeline.set_ex(key, json_result, self.cache_ttl.into());
self.pipeline
.set_ex(key, json_result, self.cache_ttl.into());
}

let mut result: Result<(), RedisError> = pipeline
let mut result: Result<(), RedisError> = self
.pipeline
.query_async(&mut self.connection_pool[self.current_connection as usize])
.await;

Expand All @@ -149,7 +165,8 @@ impl RedisCache {
CacheError::PoolExhaustionWithConnectionDropError,
));
}
result = pipeline
result = self
.pipeline
.query_async(
&mut self.connection_pool[self.current_connection as usize],
)
Expand Down
1 change: 0 additions & 1 deletion src/config/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use mlua::Lua;
use std::{collections::HashMap, fs, thread::available_parallelism};

/// A named struct which stores the parsed config file options.
#[derive(Clone)]
pub struct Config {
/// It stores the parsed port number option on which the server should launch.
pub port: u16,
Expand Down
Loading

0 comments on commit c3da0b3

Please sign in to comment.