Skip to content

Commit

Permalink
Add request coalescer to kv
Browse files Browse the repository at this point in the history
  • Loading branch information
mendess committed Nov 11, 2024
1 parent 7a28621 commit ab6d381
Show file tree
Hide file tree
Showing 6 changed files with 440 additions and 25 deletions.
4 changes: 2 additions & 2 deletions crates/daphne-server/src/roles/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ impl DapAggregator for crate::App {
Some(task_config.task_expiration),
)
.await
.map_err(|e| match e {
kv::GetOrInsertError::Other(e) => e,
.map_err(|e| match &*e {
kv::GetOrInsertError::Other(e) => e.clone(),
kv::GetOrInsertError::StorageProxy(e) => {
fatal_error!(err = ?e, "failed to get TaskprovOptInParam from kv")
}
Expand Down
5 changes: 3 additions & 2 deletions crates/daphne-server/src/roles/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::storage_proxy_connection::{
self,
kv::{self, Kv, KvGetOptions},
};
use mappable_rc::Marc;

mod aggregator;
mod helper;
Expand Down Expand Up @@ -74,7 +75,7 @@ impl BearerTokens<'_> {
role: DapSender,
task_id: TaskId,
token: &BearerToken,
) -> Result<bool, storage_proxy_connection::Error> {
) -> Result<bool, Marc<storage_proxy_connection::Error>> {
self.kv
.peek::<kv::prefix::KvBearerToken, _, _>(
&(role, task_id).into(),
Expand All @@ -91,7 +92,7 @@ impl BearerTokens<'_> {
&self,
role: DapSender,
task_id: TaskId,
) -> Result<Option<BearerToken>, storage_proxy_connection::Error> {
) -> Result<Option<BearerToken>, Marc<storage_proxy_connection::Error>> {
self.kv
.get_cloned::<kv::prefix::KvBearerToken>(
&(role, task_id).into(),
Expand Down
72 changes: 53 additions & 19 deletions crates/daphne-server/src/storage_proxy_connection/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: BSD-3-Clause

mod cache;
mod request_coalescer;

use std::{any::Any, fmt::Display, future::Future};

Expand All @@ -22,13 +23,18 @@ use daphne_service_utils::http_headers::STORAGE_PROXY_PUT_KV_EXPIRATION;
#[derive(Default)]
pub struct State {
cache: RwLock<Cache>,
coalescer: request_coalescer::RequestCoalescer,
}

impl State {
#[cfg(feature = "test-utils")]
pub async fn reset(&self) {
let Self { cache } = self;
*cache.write().await = Default::default();
let Self { cache, coalescer } = self;

let clear_cache = async {
*cache.write().await = Default::default();
};
tokio::join!(clear_cache, coalescer.reset());
}
}

Expand Down Expand Up @@ -190,7 +196,7 @@ impl<'h> Kv<'h> {
&self,
key: &P::Key,
opt: &KvGetOptions,
) -> Result<Option<Marc<P::Value>>, Error>
) -> Result<Option<Marc<P::Value>>, Marc<Error>>
where
P: KvPrefix,
P::Key: std::fmt::Debug,
Expand All @@ -202,7 +208,7 @@ impl<'h> Kv<'h> {
&self,
key: &P::Key,
opt: &KvGetOptions,
) -> Result<Option<P::Value>, Error>
) -> Result<Option<P::Value>, Marc<Error>>
where
P: KvPrefix,
P::Key: std::fmt::Debug,
Expand All @@ -216,17 +222,16 @@ impl<'h> Kv<'h> {
key: &P::Key,
opt: &KvGetOptions,
mapper: F,
) -> Result<Option<Marc<R>>, Error>
) -> Result<Option<Marc<R>>, Marc<Error>>
where
P: KvPrefix,
P::Key: std::fmt::Debug,
F: for<'s> FnOnce(&'s P::Value) -> Option<&'s R>,
R: 'static,
R: Send + Sync + 'static,
{
Ok(self
.get_internal::<P, _, _>(key, opt, |marc| Marc::try_map(marc, mapper).ok())
.await?
.flatten())
self.get_coalesced::<P, _, _>(key, opt, |marc| Marc::try_map(marc, mapper).ok())
.await
.map(Option::flatten)
}

pub async fn get_or_insert_with<P, Fut, E>(
Expand All @@ -235,33 +240,62 @@ impl<'h> Kv<'h> {
opt: &KvGetOptions,
default: impl FnOnce() -> Fut,
expiration: Option<Time>,
) -> Result<Marc<P::Value>, GetOrInsertError<E>>
) -> Result<Marc<P::Value>, Marc<GetOrInsertError<E>>>
where
P: KvPrefix,
P::Key: std::fmt::Debug,
E: Send + Sync + 'static,
Fut: Future<Output = Result<P::Value, E>>,
{
if let Some(v) = self.get::<P>(key, opt).await? {
return Ok(v);
}
let default = default().await.map_err(GetOrInsertError::Other)?;
let cached = self.put_internal::<P>(key, default, expiration).await?;
Ok(cached)
self.state
.coalescer
.coalesce(Self::to_key::<P>(key), || async {
if let Some(v) = self.get_internal::<P, _, _>(key, opt, |marc| marc).await? {
return Ok(Some(v));
}
let default = default().await.map_err(GetOrInsertError::Other)?;
let cached = self.put_internal::<P>(key, default, expiration).await?;
Ok(Some(cached))
})
.await
.map(|v| v.unwrap()) // all paths of the previous closure return Some
}

pub async fn peek<P, R, F>(
&self,
key: &P::Key,
opt: &KvGetOptions,
peeker: F,
) -> Result<Option<R>, Error>
) -> Result<Option<R>, Marc<Error>>
where
P: KvPrefix,
P::Key: std::fmt::Debug,
F: FnOnce(&P::Value) -> R,
{
self.get_internal::<P, _, _>(key, opt, |marc| peeker(&marc))
self.get_coalesced::<P, _, _>(key, opt, |marc| peeker(&marc))
.await
}

async fn get_coalesced<P, R, F>(
&self,
key: &P::Key,
opt: &KvGetOptions,
mapper: F,
) -> Result<Option<R>, Marc<Error>>
where
P: KvPrefix,
P::Key: std::fmt::Debug,
F: FnOnce(Marc<P::Value>) -> R,
{
self.state
.coalescer
.coalesce(Self::to_key::<P>(key), || async {
self.get_internal::<P, _, _>(key, opt, Some)
.await
.map(Option::flatten)
})
.await
.map(|opt_v| opt_v.map(mapper))
}

async fn get_internal<P, R, F>(
Expand Down
Loading

0 comments on commit ab6d381

Please sign in to comment.