Skip to content

Commit

Permalink
Add cache and retry to avoid to many requests to DynamoDB
Browse files Browse the repository at this point in the history
  • Loading branch information
ThibaudDauce committed Jun 15, 2023
1 parent 5e1f577 commit a029c02
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 9 deletions.
42 changes: 39 additions & 3 deletions src/core.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
#[cfg(feature = "multitenant")]
use std::env;

use std::{collections::HashSet, future::Future, pin::Pin, time::SystemTime};
use std::{
collections::{HashMap, HashSet},
future::Future,
pin::Pin,
sync::RwLock,
time::SystemTime,
};

use actix_web::{
dev::Payload,
Expand All @@ -25,7 +31,7 @@ use crate::auth0::Auth;

use crate::errors::Error;

#[derive(Serialize, Debug)]
#[derive(Serialize, Debug, Clone)]
pub(crate) struct Index {
#[serde(skip_serializing)]
pub(crate) id: i64,
Expand Down Expand Up @@ -157,10 +163,37 @@ pub(crate) trait IndexesDatabase: Sync + Send {
async fn fetch_all_as_json(&self, index: &Index, table: Table) -> Result<String, Error>;
}

pub(crate) type MetadataCache = RwLock<HashMap<String, Index>>;

#[async_trait]
pub(crate) trait MetadataDatabase: Sync + Send {
async fn get_indexes(&self, project_uuid: &str) -> Result<Vec<Index>, Error>;

async fn get_index(&self, public_id: &str) -> Result<Option<Index>, Error>;
async fn get_index_with_cache(
&self,
cache: &MetadataCache,
public_id: &str,
) -> Result<Option<Index>, Error> {
if let Ok(cache) = cache.read() {
if let Some(index) = cache.get(public_id) {
return Ok(Some(index.clone()));
}
}

let index = self.get_index(public_id).await?;

if let Some(index) = index {
if let Ok(mut cache) = cache.write() {
cache.insert(public_id.to_string(), index.clone());
}

return Ok(Some(index));
}

return Ok(None);
}

async fn delete_index(&self, public_id: &str) -> Result<(), Error>;
async fn create_index(&self, new_index: NewIndex) -> Result<Index, Error>;
}
Expand All @@ -173,13 +206,16 @@ impl FromRequest for Index {
let req = req.clone();

Box::pin(async move {
let metadata_cache = req.app_data::<Data<MetadataCache>>().unwrap();
let metadata_database = req.app_data::<Data<dyn MetadataDatabase>>().unwrap();

let public_id: Path<String> = Path::<String>::extract(&req)
.await
.map_err(|_| Error::WrongIndexPublicId)?;

let index = metadata_database.get_index(&public_id).await?;
let index = metadata_database
.get_index_with_cache(&metadata_cache, &public_id)
.await?;

if let Some(index) = index {
Ok(index)
Expand Down
7 changes: 4 additions & 3 deletions src/dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
};

use async_trait::async_trait;
use aws_config::environment::EnvironmentVariableCredentialsProvider;
use aws_config::{environment::EnvironmentVariableCredentialsProvider, retry::RetryConfigBuilder};
use aws_sdk_dynamodb::{
operation::{put_item::PutItemError, update_item::UpdateItemError},
primitives::Blob,
Expand Down Expand Up @@ -59,7 +59,8 @@ const ENTRIES_AND_CHAINS_VALUE_COLUMN_NAME: &str = "value_bytes"; // 'value' is
impl Database {
pub async fn create() -> Self {
let mut config_builder = aws_config::from_env()
.credentials_provider(EnvironmentVariableCredentialsProvider::new());
.credentials_provider(EnvironmentVariableCredentialsProvider::new())
.retry_config(RetryConfigBuilder::new().max_attempts(10).build());

if let Ok(url) = env::var("AWS_DYNAMODB_ENDPOINT_URL") {
config_builder = config_builder.endpoint_url(url)
Expand Down Expand Up @@ -400,7 +401,7 @@ impl MetadataDatabase for Database {
// :UniquePublicId
self.client
.put_item()
.table_name("metadata")
.table_name(&self.metadata_table_name)
.item("id", AttributeValue::N(index.id.to_string()))
.item("public_id", AttributeValue::S(index.public_id.clone()))
.item("authz_id", AttributeValue::S(index.authz_id.clone()))
Expand Down
18 changes: 15 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use actix_web::web::PayloadConfig;
use actix_web::web::Query;

use crate::{
core::{check_body_signature, Index},
core::{check_body_signature, Index, MetadataCache},
errors::{Response, ResponseBytes},
};
use actix_cors::Cors;
Expand Down Expand Up @@ -171,10 +171,13 @@ async fn post_indexes(
async fn get_index(
#[cfg(feature = "multitenant")] auth: Auth,
public_id: Path<String>,
metadata_cache: Data<MetadataCache>,
metadata_db: Data<dyn MetadataDatabase>,
indexes_db: Data<dyn IndexesDatabase>,
) -> Response<Index> {
let index = metadata_db.get_index(&public_id).await?;
let index = metadata_db
.get_index_with_cache(&metadata_cache, &public_id)
.await?;

if let Some(mut index) = index {
#[cfg(feature = "multitenant")]
Expand All @@ -197,11 +200,14 @@ async fn get_index(
async fn delete_index(
#[cfg(feature = "multitenant")] auth: Auth,
public_id: Path<String>,
metadata_cache: Data<MetadataCache>,
metadata_db: Data<dyn MetadataDatabase>,
) -> Response<()> {
#[cfg(feature = "multitenant")]
{
let index = metadata_db.get_index(&public_id).await?;
let index = metadata_db
.get_index_with_cache(&metadata_cache, &public_id)
.await?;
if let Some(index) = index {
if auth.authz_id != index.authz_id {
return Err(Error::BadRequest(format!(
Expand All @@ -212,6 +218,9 @@ async fn delete_index(
}

metadata_db.delete_index(&public_id).await?;
if let Ok(mut cache) = metadata_cache.write() {
cache.remove(public_id.as_str());
}

Ok(Json(()))
}
Expand Down Expand Up @@ -323,6 +332,8 @@ async fn start_server(ipv6: bool) -> std::io::Result<()> {
#[cfg(feature = "multitenant")]
let backend = Data::new(Backend::from_env());

let metadata_cache: Data<MetadataCache> = Data::new(Default::default());

let indexes_database: Data<dyn IndexesDatabase> = match env::var("INDEXES_DATABASE_TYPE").as_deref().unwrap_or("rocksdb") {
#[cfg(feature = "heed")]
"heed" => Data::from(Arc::new(crate::heed::Database::create()) as Arc<dyn IndexesDatabase>),
Expand Down Expand Up @@ -364,6 +375,7 @@ async fn start_server(ipv6: bool) -> std::io::Result<()> {
let mut app = App::new()
.wrap(Cors::permissive())
.wrap(Logger::default())
.app_data(metadata_cache.clone())
.app_data(indexes_database.clone())
.app_data(metadata_database.clone())
.app_data(PayloadConfig::new(50_000_000))
Expand Down

0 comments on commit a029c02

Please sign in to comment.