Skip to content

Commit

Permalink
Offset and event duplicate handling for Nostr
Browse files Browse the repository at this point in the history
  • Loading branch information
tompro committed Nov 27, 2024
1 parent 1c0fe9c commit ed75a7d
Show file tree
Hide file tree
Showing 7 changed files with 327 additions and 16 deletions.
19 changes: 19 additions & 0 deletions src/persistence/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use surrealdb::{
};

pub mod contact;
pub mod nostr_event_offset;

/// Configuration for the SurrealDB connection string, namespace and
/// database name
Expand Down Expand Up @@ -56,3 +57,21 @@ pub async fn get_memory_db(namespace: &str, database: &str) -> Result<Surreal<An
db.use_ns(namespace).use_db(database).await?;
Ok(db)
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_get_surreal_db() {
let config = SurrealDbConfig::new("mem://");
let _ = get_surreal_db(&config).await.expect("could not create db");
}

#[tokio::test]
async fn test_get_memory_db() {
let _ = get_memory_db("test", "test")
.await
.expect("could not create db");
}
}
144 changes: 144 additions & 0 deletions src/persistence/db/nostr_event_offset.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
use super::Result;
use crate::util::date::{self, DateTimeUtc};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use surrealdb::{engine::any::Any, Surreal};

use crate::persistence::{NostrEventOffset, NostrEventOffsetStoreApi};

#[derive(Clone)]
pub struct SurrealNostrEventOffsetStore {
db: Surreal<Any>,
}

impl SurrealNostrEventOffsetStore {
const TABLE: &'static str = "nostr_event_offset";

#[allow(dead_code)]
pub fn new(db: Surreal<Any>) -> Self {
Self { db }
}
}

#[async_trait]
impl NostrEventOffsetStoreApi for SurrealNostrEventOffsetStore {
async fn current_offset(&self) -> Result<u64> {
let result: Vec<NostrEventOffsetDb> = self
.db
.query("SELECT * FROM type::table($table) ORDER BY time DESC LIMIT 1")
.bind(("table", Self::TABLE))
.await?
.take(0)?;
let value = result
.first()
.map(|c| c.time.timestamp())
.unwrap_or(0)
.try_into()?;
Ok(value)
}

async fn is_processed(&self, event_id: &str) -> Result<bool> {
let result: Option<NostrEventOffsetDb> = self.db.select((Self::TABLE, event_id)).await?;
Ok(result.is_some())
}

async fn add_event(&self, data: NostrEventOffset) -> Result<()> {
let db: NostrEventOffsetDb = data.into();
let _: Option<NostrEventOffsetDb> = self
.db
.create((Self::TABLE, db.event_id.to_owned()))
.content(db)
.await?;
Ok(())
}
}

/// A nostr event offset.
#[derive(Serialize, Deserialize, Debug, Clone)]
struct NostrEventOffsetDb {
pub event_id: String,
pub time: DateTimeUtc,
pub success: bool,
}

impl From<NostrEventOffsetDb> for NostrEventOffset {
fn from(db: NostrEventOffsetDb) -> Self {
Self {
event_id: db.event_id,
time: db.time.timestamp() as u64,
success: db.success,
}
}

Check warning on line 71 in src/persistence/db/nostr_event_offset.rs

View check run for this annotation

Codecov / codecov/patch

src/persistence/db/nostr_event_offset.rs#L65-L71

Added lines #L65 - L71 were not covered by tests
}

impl From<NostrEventOffset> for NostrEventOffsetDb {
fn from(offset: NostrEventOffset) -> Self {
Self {
event_id: offset.event_id,
time: date::seconds_unsigned(offset.time),
success: offset.success,
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::persistence::db::get_memory_db;

#[tokio::test]
async fn test_get_offset_from_empty_table() {
let store = get_store().await;
let offset = store.current_offset().await.expect("could not get offset");
assert_eq!(offset, 0);
}

#[tokio::test]
async fn test_add_event() {
let store = get_store().await;
let data = NostrEventOffset {
event_id: "test_event".to_string(),
time: 1000,
success: true,
};
store
.add_event(data)
.await
.expect("Could not add event offset");

let offset = store.current_offset().await.expect("could not get offset");
assert_eq!(offset, 1000);
}

#[tokio::test]
async fn test_is_processed() {
let store = get_store().await;
let data = NostrEventOffset {
event_id: "test_event".to_string(),
time: 1000,
success: false,
};
let is_known = store
.is_processed(&data.event_id)
.await
.expect("could not check if processed");
assert!(!is_known, "new event should not be known");

store
.add_event(data.clone())
.await
.expect("could not add event offset");
let is_processed = store
.is_processed(&data.event_id)
.await
.expect("could not check if processed");
assert!(is_processed, "existing event should be known");
}

async fn get_store() -> SurrealNostrEventOffsetStore {
let mem_db = get_memory_db("test", "nostr_event_offset")
.await
.expect("could not create memory db");
SurrealNostrEventOffsetStore::new(mem_db)
}
}
8 changes: 7 additions & 1 deletion src/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod contact;
pub mod db;
pub mod file_upload;
pub mod identity;
pub mod nostr;

use bill::FileBasedBillStore;
use db::{contact::SurrealContactStore, get_surreal_db, SurrealDbConfig};
Expand Down Expand Up @@ -35,9 +36,14 @@ pub enum Error {

#[error("no such {0} entity {1}")]
NoSuchEntity(String, String),

#[allow(dead_code)]
#[error("Failed to convert integer {0}")]
FromInt(#[from] std::num::TryFromIntError),
}

pub use contact::ContactStoreApi;
pub use nostr::{NostrEventOffset, NostrEventOffsetStoreApi};

use crate::config::Config;
use company::FileBasedCompanyStore;
Expand Down Expand Up @@ -77,7 +83,7 @@ pub async fn get_db_context(conf: &Config) -> Result<DbContext> {
error!("Error cleaning up temp upload folder for bill: {e}");
}

let contact_store = Arc::new(SurrealContactStore::new(db));
let contact_store = Arc::new(SurrealContactStore::new(db.clone()));

Check warning on line 86 in src/persistence/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/persistence/mod.rs#L86

Added line #L86 was not covered by tests

let bill_store =
Arc::new(FileBasedBillStore::new(&conf.data_dir, "bills", "files", "bills_keys").await?);
Expand Down
41 changes: 41 additions & 0 deletions src/persistence/nostr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use super::Result;
use async_trait::async_trait;

#[cfg(test)]
use mockall::automock;

/// Allows storing and retrieving time based offsets for subscriptions
/// to Nostr relays. It will also store the event ids that have been
/// received and processed already.
#[cfg_attr(test, automock)]
#[async_trait]
pub trait NostrEventOffsetStoreApi: Send + Sync {
/// Returns the current timestamp offset of our nostr subscription
/// Will return 0 if there are no events in the store yet, otherwise
/// the highest timestamp of all events processed.
/// there is still a possibility that we get events delivered that are
/// older than the current offset just because they were not processed
/// or the faked timestamp on the GiftWrap event was higher than the
/// current offset.
async fn current_offset(&self) -> Result<u64>;

/// Returns whether the given event id has been processed already. This
/// will return true if we never tried to process the event independent
/// of whether it was successful or not.
async fn is_processed(&self, event_id: &str) -> Result<bool>;

/// Stores the given event data in the store.
async fn add_event(&self, data: NostrEventOffset) -> Result<()>;
}

/// A simple struct to store the event id and the time it was received.
#[derive(Debug, Clone)]
pub struct NostrEventOffset {
/// The nostr event id
pub event_id: String,
/// The timestamp of the inner GiftWrap event. The highest timestamp
/// of all events will be used when we restart the relay subscription.
pub time: u64,
/// Whether the event has been processed successfully on our side
pub success: bool,
}
Loading

0 comments on commit ed75a7d

Please sign in to comment.