Skip to content

Commit

Permalink
Durability Provider (#1864)
Browse files Browse the repository at this point in the history
Signed-off-by: Shubham Mishra <shubham@clockworklabs.io>
Co-authored-by: Kim Altintop <kim@eagain.io>
  • Loading branch information
Shubham8287 and kim authored Nov 19, 2024
1 parent ac1d222 commit 9c64d1f
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 168 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/client-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ derive_more = "0.99.17"
uuid.workspace = true
jsonwebtoken.workspace = true
scopeguard.workspace = true
spacetimedb-paths.workspace = true

[dev-dependencies]
jsonwebtoken.workspace = true
130 changes: 120 additions & 10 deletions crates/client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,17 @@ use http::StatusCode;

use spacetimedb::client::ClientActorIndex;
use spacetimedb::energy::{EnergyBalance, EnergyQuanta};
use spacetimedb::host::{HostController, UpdateDatabaseResult};
use spacetimedb::identity::Identity;
use spacetimedb::execution_context::Workload;
use spacetimedb::host::{HostController, ModuleHost, NoSuchModule, UpdateDatabaseResult};
use spacetimedb::identity::{AuthCtx, Identity};
use spacetimedb::json::client_api::StmtResultJson;
use spacetimedb::messages::control_db::{Database, HostType, Node, Replica};
use spacetimedb::sql;
use spacetimedb::sql::execute::translate_col;
use spacetimedb_client_api_messages::name::{DomainName, InsertDomainResult, RegisterTldResult, Tld};
use spacetimedb_lib::ProductTypeElement;
use spacetimedb_paths::server::ModuleLogsDir;
use tokio::sync::watch;

pub mod auth;
pub mod routes;
Expand All @@ -20,13 +27,110 @@ pub mod util;
///
/// Types returned here should be considered internal state and **never** be
/// surfaced to the API.
#[async_trait]
pub trait NodeDelegate: Send + Sync {
fn gather_metrics(&self) -> Vec<prometheus::proto::MetricFamily>;
fn host_controller(&self) -> &HostController;
fn client_actor_index(&self) -> &ClientActorIndex;

type JwtAuthProviderT: auth::JwtAuthProvider;
fn jwt_auth_provider(&self) -> &Self::JwtAuthProviderT;
/// Return the leader [`Host`] of `database_id`.
///
/// Returns `None` if the current leader is not hosted by this node.
/// The [`Host`] is spawned implicitly if not already running.
async fn leader(&self, database_id: u64) -> anyhow::Result<Option<Host>>;
fn module_logs_dir(&self, replica_id: u64) -> ModuleLogsDir;
}

/// Client view of a running module.
pub struct Host {
pub replica_id: u64,
host_controller: HostController,
}

impl Host {
pub fn new(replica_id: u64, host_controller: HostController) -> Self {
Self {
replica_id,
host_controller,
}
}

pub async fn module(&self) -> Result<ModuleHost, NoSuchModule> {
self.host_controller.get_module_host(self.replica_id).await
}

pub async fn module_watcher(&self) -> Result<watch::Receiver<ModuleHost>, NoSuchModule> {
self.host_controller.watch_module_host(self.replica_id).await
}

pub async fn exec_sql(
&self,
auth: AuthCtx,
database: Database,
body: String,
) -> axum::response::Result<Vec<StmtResultJson>> {
let module_host = self
.module()
.await
.map_err(|_| (StatusCode::NOT_FOUND, "module not found".to_string()))?;

let json = self
.host_controller
.using_database(
database,
self.replica_id,
move |db| -> axum::response::Result<_, (StatusCode, String)> {
tracing::info!(sql = body);
let results =
sql::execute::run(db, &body, auth, Some(&module_host.info().subscriptions)).map_err(|e| {
log::warn!("{}", e);
if let Some(auth_err) = e.get_auth_error() {
(StatusCode::UNAUTHORIZED, auth_err.to_string())
} else {
(StatusCode::BAD_REQUEST, e.to_string())
}
})?;

let json = db.with_read_only(Workload::Sql, |tx| {
results
.into_iter()
.map(|result| {
let rows = result.data;
let schema = result
.head
.fields
.iter()
.map(|x| {
let ty = x.algebraic_type.clone();
let name = translate_col(tx, x.field);
ProductTypeElement::new(ty, name)
})
.collect();
StmtResultJson { schema, rows }
})
.collect::<Vec<_>>()
});

Ok(json)
},
)
.await
.map_err(log_and_500)??;

Ok(json)
}

pub async fn update(
&self,
database: Database,
host_type: HostType,
program_bytes: Box<[u8]>,
) -> anyhow::Result<UpdateDatabaseResult> {
self.host_controller
.update_module_host(database, host_type, self.replica_id, program_bytes)
.await
}
}

/// Parameters for publishing a database.
Expand Down Expand Up @@ -155,9 +259,6 @@ impl<T: ControlStateReadAccess + ?Sized> ControlStateReadAccess for Arc<T> {
fn get_replicas(&self) -> anyhow::Result<Vec<Replica>> {
(**self).get_replicas()
}
fn get_leader_replica_by_database(&self, database_id: u64) -> Option<Replica> {
(**self).get_leader_replica_by_database(database_id)
}

// Energy
fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result<Option<EnergyBalance>> {
Expand All @@ -172,6 +273,10 @@ impl<T: ControlStateReadAccess + ?Sized> ControlStateReadAccess for Arc<T> {
fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result<Vec<DomainName>> {
(**self).reverse_lookup(database_identity)
}

fn get_leader_replica_by_database(&self, database_id: u64) -> Option<Replica> {
(**self).get_leader_replica_by_database(database_id)
}
}

#[async_trait]
Expand Down Expand Up @@ -209,23 +314,28 @@ impl<T: ControlStateWriteAccess + ?Sized> ControlStateWriteAccess for Arc<T> {
}
}

#[async_trait]
impl<T: NodeDelegate + ?Sized> NodeDelegate for Arc<T> {
type JwtAuthProviderT = T::JwtAuthProviderT;
fn gather_metrics(&self) -> Vec<prometheus::proto::MetricFamily> {
(**self).gather_metrics()
}

fn host_controller(&self) -> &HostController {
(**self).host_controller()
}

fn client_actor_index(&self) -> &ClientActorIndex {
(**self).client_actor_index()
}

fn jwt_auth_provider(&self) -> &Self::JwtAuthProviderT {
(**self).jwt_auth_provider()
}

async fn leader(&self, database_id: u64) -> anyhow::Result<Option<Host>> {
(**self).leader(database_id).await
}

fn module_logs_dir(&self, replica_id: u64) -> ModuleLogsDir {
(**self).module_logs_dir(replica_id)
}
}

pub fn log_and_500(e: impl std::fmt::Display) -> ErrorResponse {
Expand Down
Loading

2 comments on commit 9c64d1f

@github-actions
Copy link

@github-actions github-actions bot commented on 9c64d1f Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmarking failed. Please check the workflow run for details.

@github-actions
Copy link

@github-actions github-actions bot commented on 9c64d1f Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callgrind benchmark results

Callgrind Benchmark Report

These benchmarks were run using callgrind,
an instruction-level profiler. They allow comparisons between sqlite (sqlite), SpacetimeDB running through a module (stdb_module), and the underlying SpacetimeDB data storage engine (stdb_raw). Callgrind emulates a CPU to collect the below estimates.

Measurement changes larger than five percent are in bold.

In-memory benchmarks

callgrind: empty transaction

db total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw 6426 6426 0.00% 6526 6526 0.00%
sqlite 5579 5579 0.00% 6039 5941 1.65%

callgrind: filter

db schema indices count preload _column data_type total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str no_index 64 128 1 u64 76586 76586 0.00% 76974 77042 -0.09%
stdb_raw u32_u64_str no_index 64 128 2 string 118828 118828 0.00% 119446 119522 -0.06%
stdb_raw u32_u64_str btree_each_column 64 128 2 string 25126 25113 0.05% 25620 25727 -0.42%
stdb_raw u32_u64_str btree_each_column 64 128 1 u64 24078 24078 0.00% 24462 24602 -0.57%
sqlite u32_u64_str no_index 64 128 2 string 144695 144695 0.00% 146085 146119 -0.02%
sqlite u32_u64_str no_index 64 128 1 u64 124044 124044 0.00% 125204 125294 -0.07%
sqlite u32_u64_str btree_each_column 64 128 1 u64 131361 131361 0.00% 132673 132739 -0.05%
sqlite u32_u64_str btree_each_column 64 128 2 string 134494 134494 0.00% 135972 136078 -0.08%

callgrind: insert bulk

db schema indices count preload total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 64 128 878642 878628 0.00% 897890 934724 -3.94%
stdb_raw u32_u64_str btree_each_column 64 128 1029707 1030117 -0.04% 1086439 1089957 -0.32%
sqlite u32_u64_str unique_0 64 128 398320 398330 -0.00% 417048 413148 0.94%
sqlite u32_u64_str btree_each_column 64 128 983637 983637 0.00% 1019991 1012351 0.75%

callgrind: iterate

db schema indices count total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 1024 153810 153810 0.00% 153926 153964 -0.02%
stdb_raw u32_u64_str unique_0 64 16835 16835 0.00% 16935 16973 -0.22%
sqlite u32_u64_str unique_0 1024 1068275 1068275 0.00% 1071443 1071695 -0.02%
sqlite u32_u64_str unique_0 64 76261 76261 0.00% 77207 77427 -0.28%

callgrind: serialize_product_value

count format total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
64 json 47528 47528 0.00% 50214 50248 -0.07%
64 bsatn 25509 25509 0.00% 27787 27651 0.49%
16 bsatn 8200 8200 0.00% 9594 9458 1.44%
16 json 12188 12188 0.00% 14126 14160 -0.24%

callgrind: update bulk

db schema indices count preload total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 1024 1024 20132320 20130326 0.01% 20581686 20782904 -0.97%
stdb_raw u32_u64_str unique_0 64 128 1287088 1287506 -0.03% 1320788 1364462 -3.20%
sqlite u32_u64_str unique_0 1024 1024 1802182 1802188 -0.00% 1811422 1811300 0.01%
sqlite u32_u64_str unique_0 64 128 128528 128534 -0.00% 131418 131458 -0.03%
On-disk benchmarks

callgrind: empty transaction

db total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw 6431 6431 0.00% 6527 6527 0.00%
sqlite 5621 5621 0.00% 6149 6043 1.75%

callgrind: filter

db schema indices count preload _column data_type total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str no_index 64 128 1 u64 76591 76591 0.00% 76955 77027 -0.09%
stdb_raw u32_u64_str no_index 64 128 2 string 118833 118833 0.00% 119531 119403 0.11%
stdb_raw u32_u64_str btree_each_column 64 128 2 string 25115 25115 0.00% 25569 25753 -0.71%
stdb_raw u32_u64_str btree_each_column 64 128 1 u64 24083 24083 0.00% 24447 24587 -0.57%
sqlite u32_u64_str no_index 64 128 1 u64 125965 125965 0.00% 127461 127479 -0.01%
sqlite u32_u64_str no_index 64 128 2 string 146616 146616 0.00% 148354 148340 0.01%
sqlite u32_u64_str btree_each_column 64 128 2 string 136634 136616 0.01% 138678 138606 0.05%
sqlite u32_u64_str btree_each_column 64 128 1 u64 133457 133457 0.00% 135215 135233 -0.01%

callgrind: insert bulk

db schema indices count preload total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 64 128 827836 827301 0.06% 878606 851957 3.13%
stdb_raw u32_u64_str btree_each_column 64 128 978914 978368 0.06% 1035576 1036498 -0.09%
sqlite u32_u64_str unique_0 64 128 415857 415857 0.00% 433783 429935 0.90%
sqlite u32_u64_str btree_each_column 64 128 1021898 1021908 -0.00% 1056934 1049906 0.67%

callgrind: iterate

db schema indices count total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 1024 153815 153815 0.00% 153915 153953 -0.02%
stdb_raw u32_u64_str unique_0 64 16840 16840 0.00% 16940 16978 -0.22%
sqlite u32_u64_str unique_0 1024 1071343 1071343 0.00% 1074993 1075025 -0.00%
sqlite u32_u64_str unique_0 64 78033 78033 0.00% 79279 79291 -0.02%

callgrind: serialize_product_value

count format total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
64 json 47528 47528 0.00% 50214 50248 -0.07%
64 bsatn 25509 25509 0.00% 27787 27651 0.49%
16 bsatn 8200 8200 0.00% 9594 9458 1.44%
16 json 12188 12188 0.00% 14126 14160 -0.24%

callgrind: update bulk

db schema indices count preload total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 1024 1024 19049442 19046452 0.02% 19602572 19765766 -0.83%
stdb_raw u32_u64_str unique_0 64 128 1241286 1240805 0.04% 1306940 1316659 -0.74%
sqlite u32_u64_str unique_0 1024 1024 1809743 1809749 -0.00% 1818347 1818359 -0.00%
sqlite u32_u64_str unique_0 64 128 132654 132654 0.00% 135560 135488 0.05%

Please sign in to comment.