Skip to content

Commit

Permalink
trying a new implementation for dedupe
Browse files Browse the repository at this point in the history
  • Loading branch information
tusharmath committed Oct 25, 2024
1 parent a889fd8 commit 881e9f4
Showing 1 changed file with 33 additions and 7 deletions.
40 changes: 33 additions & 7 deletions src/core/data_loader/dedupe.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::{Arc, Mutex, Weak};
use std::sync::{Arc, RwLock, Weak};

use futures_util::Future;
use tokio::sync::broadcast;
Expand All @@ -15,7 +15,7 @@ impl<A: Send + Sync + Clone> Value for A {}
/// Allows deduplication of async operations based on a key.
pub struct Dedupe<Key, Value> {
/// Cache storage for the operations.
cache: Arc<Mutex<HashMap<Key, State<Value>>>>,
cache: Arc<RwLock<HashMap<Key, State<Value>>>>,
/// Initial size of the multi-producer, multi-consumer channel.
size: usize,
/// When enabled allows the operations to be cached forever.
Expand Down Expand Up @@ -44,20 +44,23 @@ enum Step<Value> {
/// The operation needs to be executed and the result needs to be sent to
/// the provided sender.
Init(Arc<broadcast::Sender<Value>>),

Retry,
}

impl<K: Key, V: Value> Dedupe<K, V> {
pub fn new(size: usize, persist: bool) -> Self {
Self { cache: Arc::new(Mutex::new(HashMap::new())), size, persist }
Self { cache: Arc::new(RwLock::new(HashMap::new())), size, persist }
}

pub async fn dedupe<'a, Fn, Fut>(&'a self, key: &'a K, or_else: Fn) -> V
where
Fn: FnOnce() -> Fut,
Fut: Future<Output = V>,
{
let mut read = true;
loop {
let value = match self.step(key) {
let value = match self.step(key, read) {
Step::Return(value) => value,
Step::Await(mut rx) => match rx.recv().await {
Ok(value) => value,
Expand All @@ -71,7 +74,7 @@ impl<K: Key, V: Value> Dedupe<K, V> {
},
Step::Init(tx) => {
let value = or_else().await;
let mut guard = self.cache.lock().unwrap();
let mut guard = self.cache.write().unwrap();
if self.persist {
guard.insert(key.to_owned(), State::Ready(value.clone()));
} else {
Expand All @@ -80,14 +83,37 @@ impl<K: Key, V: Value> Dedupe<K, V> {
let _ = tx.send(value.clone());
value
}
Step::Retry => {
read = false;
continue;
}
};

return value;
}
}

fn step(&self, key: &K) -> Step<V> {
let mut this = self.cache.lock().unwrap();
fn step(&self, key: &K, read: bool) -> Step<V> {
if read {
let this = self.cache.read().unwrap();
if let Some(state) = this.get(key) {
match state {
State::Ready(value) => return Step::Return(value.clone()),
State::Pending(tx) => {
// We can upgrade from Weak to Arc only in case when
// original tx is still alive
// otherwise we will create in the code below
if let Some(tx) = tx.upgrade() {
return Step::Await(tx.subscribe());
}
}
}
} else {
return Step::Retry;
}
}

let mut this = self.cache.write().unwrap();

if let Some(state) = this.get(key) {
match state {
Expand Down

0 comments on commit 881e9f4

Please sign in to comment.