Skip to content

Commit

Permalink
- clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
laststylebender14 committed Oct 25, 2024
1 parent e294f26 commit fcc9474
Showing 1 changed file with 14 additions and 8 deletions.
22 changes: 14 additions & 8 deletions src/core/data_loader/dedupe.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::{Arc, Weak};
use std::thread::ThreadId;

use dashmap::DashMap;
use futures_util::Future;
Expand Down Expand Up @@ -57,8 +58,9 @@ impl<K: Key, V: Value> Dedupe<K, V> {
Fn: FnOnce() -> Fut,
Fut: Future<Output = V>,
{
let thread_id = std::thread::current().id();
loop {
let value = match self.step(key) {
let value = match self.step(key, &thread_id) {
Step::Return(value) => value,
Step::Await(mut rx) => match rx.recv().await {
Ok(value) => value,
Expand All @@ -72,14 +74,20 @@ impl<K: Key, V: Value> Dedupe<K, V> {
},
Step::Init(tx) => {
let value = or_else().await;
let thread_id = std::thread::current().id();
let mut is_empty = false;
if let Some(mut inner_map) = self.cache.get_mut(&thread_id) {
if self.persist {
inner_map.insert(key.to_owned(), State::Ready(value.clone()));
} else {
inner_map.remove(key);
}
is_empty = inner_map.is_empty();
}

if is_empty {
self.cache.remove(&thread_id);
}

let _ = tx.send(value.clone());
value
}
Expand All @@ -89,9 +97,7 @@ impl<K: Key, V: Value> Dedupe<K, V> {
}
}

fn step(&self, key: &K) -> Step<V> {
let thread_id = std::thread::current().id();

fn step(&self, key: &K, thread_id: &ThreadId) -> Step<V> {
// Fast path: Try read-only access first
if let Some(inner_data) = self.cache.get(&thread_id) {

Check failure on line 102 in src/core/data_loader/dedupe.rs

View workflow job for this annotation

GitHub Actions / Run Formatter and Lint Check

this expression creates a reference which is immediately dereferenced by the compiler
if let Some(state) = inner_data.get(key) {
Expand All @@ -109,18 +115,18 @@ impl<K: Key, V: Value> Dedupe<K, V> {
self.initialize_cache(thread_id, key)
}

fn initialize_cache(&self, thread_id: std::thread::ThreadId, key: &K) -> Step<V> {
fn initialize_cache(&self, thread_id: &std::thread::ThreadId, key: &K) -> Step<V> {
let (tx, _) = broadcast::channel(self.size);
let tx = Arc::new(tx);

if let Some(mut inner_data) = self.cache.get_mut(&thread_id) {

Check failure on line 122 in src/core/data_loader/dedupe.rs

View workflow job for this annotation

GitHub Actions / Run Formatter and Lint Check

this expression creates a reference which is immediately dereferenced by the compiler
inner_data.insert(key.to_owned(), State::Pending(Arc::downgrade(&tx)));
return Step::Init(tx);
}

let mut local_map = HashMap::default(); // Pre-allocate with reasonable size
local_map.insert(key.to_owned(), State::Pending(Arc::downgrade(&tx)));
self.cache.insert(thread_id, local_map);
self.cache.insert(thread_id.to_owned(), local_map);
Step::Init(tx)
}
}
Expand Down

1 comment on commit fcc9474

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running 30s test @ http://localhost:8000/graphql

4 threads and 100 connections

Thread Stats Avg Stdev Max +/- Stdev
Latency 8.13ms 2.62ms 42.55ms 76.48%
Req/Sec 3.11k 357.32 3.93k 87.17%

370952 requests in 30.02s, 1.86GB read

Requests/sec: 12356.84

Transfer/sec: 63.42MB

Please sign in to comment.