From ed75a7d9f6378ddf59c738ca0fa27407c2f86601 Mon Sep 17 00:00:00 2001 From: tompro Date: Wed, 27 Nov 2024 18:08:12 +0100 Subject: [PATCH] Offset and event duplicate handling for Nostr --- src/persistence/db/mod.rs | 19 +++ src/persistence/db/nostr_event_offset.rs | 144 ++++++++++++++++++++++ src/persistence/mod.rs | 8 +- src/persistence/nostr.rs | 41 ++++++ src/service/notification_service/nostr.rs | 102 ++++++++++++--- src/util/date.rs | 28 +++++ src/util/mod.rs | 1 + 7 files changed, 327 insertions(+), 16 deletions(-) create mode 100644 src/persistence/db/nostr_event_offset.rs create mode 100644 src/persistence/nostr.rs create mode 100644 src/util/date.rs diff --git a/src/persistence/db/mod.rs b/src/persistence/db/mod.rs index 75c8043..a27df59 100644 --- a/src/persistence/db/mod.rs +++ b/src/persistence/db/mod.rs @@ -6,6 +6,7 @@ use surrealdb::{ }; pub mod contact; +pub mod nostr_event_offset; /// Configuration for the SurrealDB connection string, namespace and /// database name @@ -56,3 +57,21 @@ pub async fn get_memory_db(namespace: &str, database: &str) -> Result, +} + +impl SurrealNostrEventOffsetStore { + const TABLE: &'static str = "nostr_event_offset"; + + #[allow(dead_code)] + pub fn new(db: Surreal) -> Self { + Self { db } + } +} + +#[async_trait] +impl NostrEventOffsetStoreApi for SurrealNostrEventOffsetStore { + async fn current_offset(&self) -> Result { + let result: Vec = self + .db + .query("SELECT * FROM type::table($table) ORDER BY time DESC LIMIT 1") + .bind(("table", Self::TABLE)) + .await? + .take(0)?; + let value = result + .first() + .map(|c| c.time.timestamp()) + .unwrap_or(0) + .try_into()?; + Ok(value) + } + + async fn is_processed(&self, event_id: &str) -> Result { + let result: Option = self.db.select((Self::TABLE, event_id)).await?; + Ok(result.is_some()) + } + + async fn add_event(&self, data: NostrEventOffset) -> Result<()> { + let db: NostrEventOffsetDb = data.into(); + let _: Option = self + .db + .create((Self::TABLE, db.event_id.to_owned())) + .content(db) + .await?; + Ok(()) + } +} + +/// A nostr event offset. +#[derive(Serialize, Deserialize, Debug, Clone)] +struct NostrEventOffsetDb { + pub event_id: String, + pub time: DateTimeUtc, + pub success: bool, +} + +impl From for NostrEventOffset { + fn from(db: NostrEventOffsetDb) -> Self { + Self { + event_id: db.event_id, + time: db.time.timestamp() as u64, + success: db.success, + } + } +} + +impl From for NostrEventOffsetDb { + fn from(offset: NostrEventOffset) -> Self { + Self { + event_id: offset.event_id, + time: date::seconds_unsigned(offset.time), + success: offset.success, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::persistence::db::get_memory_db; + + #[tokio::test] + async fn test_get_offset_from_empty_table() { + let store = get_store().await; + let offset = store.current_offset().await.expect("could not get offset"); + assert_eq!(offset, 0); + } + + #[tokio::test] + async fn test_add_event() { + let store = get_store().await; + let data = NostrEventOffset { + event_id: "test_event".to_string(), + time: 1000, + success: true, + }; + store + .add_event(data) + .await + .expect("Could not add event offset"); + + let offset = store.current_offset().await.expect("could not get offset"); + assert_eq!(offset, 1000); + } + + #[tokio::test] + async fn test_is_processed() { + let store = get_store().await; + let data = NostrEventOffset { + event_id: "test_event".to_string(), + time: 1000, + success: false, + }; + let is_known = store + .is_processed(&data.event_id) + .await + .expect("could not check if processed"); + assert!(!is_known, "new event should not be known"); + + store + .add_event(data.clone()) + .await + .expect("could not add event offset"); + let is_processed = store + .is_processed(&data.event_id) + .await + .expect("could not check if processed"); + assert!(is_processed, "existing event should be known"); + } + + async fn get_store() -> SurrealNostrEventOffsetStore { + let mem_db = get_memory_db("test", "nostr_event_offset") + .await + .expect("could not create memory db"); + SurrealNostrEventOffsetStore::new(mem_db) + } +} diff --git a/src/persistence/mod.rs b/src/persistence/mod.rs index a39ea3e..46abf60 100644 --- a/src/persistence/mod.rs +++ b/src/persistence/mod.rs @@ -4,6 +4,7 @@ pub mod contact; pub mod db; pub mod file_upload; pub mod identity; +pub mod nostr; use bill::FileBasedBillStore; use db::{contact::SurrealContactStore, get_surreal_db, SurrealDbConfig}; @@ -35,9 +36,14 @@ pub enum Error { #[error("no such {0} entity {1}")] NoSuchEntity(String, String), + + #[allow(dead_code)] + #[error("Failed to convert integer {0}")] + FromInt(#[from] std::num::TryFromIntError), } pub use contact::ContactStoreApi; +pub use nostr::{NostrEventOffset, NostrEventOffsetStoreApi}; use crate::config::Config; use company::FileBasedCompanyStore; @@ -77,7 +83,7 @@ pub async fn get_db_context(conf: &Config) -> Result { error!("Error cleaning up temp upload folder for bill: {e}"); } - let contact_store = Arc::new(SurrealContactStore::new(db)); + let contact_store = Arc::new(SurrealContactStore::new(db.clone())); let bill_store = Arc::new(FileBasedBillStore::new(&conf.data_dir, "bills", "files", "bills_keys").await?); diff --git a/src/persistence/nostr.rs b/src/persistence/nostr.rs new file mode 100644 index 0000000..33aa446 --- /dev/null +++ b/src/persistence/nostr.rs @@ -0,0 +1,41 @@ +use super::Result; +use async_trait::async_trait; + +#[cfg(test)] +use mockall::automock; + +/// Allows storing and retrieving time based offsets for subscriptions +/// to Nostr relays. It will also store the event ids that have been +/// received and processed already. +#[cfg_attr(test, automock)] +#[async_trait] +pub trait NostrEventOffsetStoreApi: Send + Sync { + /// Returns the current timestamp offset of our nostr subscription + /// Will return 0 if there are no events in the store yet, otherwise + /// the highest timestamp of all events processed. + /// there is still a possibility that we get events delivered that are + /// older than the current offset just because they were not processed + /// or the faked timestamp on the GiftWrap event was higher than the + /// current offset. + async fn current_offset(&self) -> Result; + + /// Returns whether the given event id has been processed already. This + /// will return true if we never tried to process the event independent + /// of whether it was successful or not. + async fn is_processed(&self, event_id: &str) -> Result; + + /// Stores the given event data in the store. + async fn add_event(&self, data: NostrEventOffset) -> Result<()>; +} + +/// A simple struct to store the event id and the time it was received. +#[derive(Debug, Clone)] +pub struct NostrEventOffset { + /// The nostr event id + pub event_id: String, + /// The timestamp of the inner GiftWrap event. The highest timestamp + /// of all events will be used when we restart the relay subscription. + pub time: u64, + /// Whether the event has been processed successfully on our side + pub success: bool, +} diff --git a/src/service/notification_service/nostr.rs b/src/service/notification_service/nostr.rs index b8e297d..e3847dd 100644 --- a/src/service/notification_service/nostr.rs +++ b/src/service/notification_service/nostr.rs @@ -1,11 +1,14 @@ use async_trait::async_trait; use log::{error, trace, warn}; use nostr_sdk::prelude::*; +use nostr_sdk::Timestamp; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use tokio::task::JoinHandle; +use crate::persistence::NostrEventOffset; +use crate::persistence::NostrEventOffsetStoreApi; use crate::service::contact_service::ContactServiceApi; use super::super::contact_service::IdentityPublicData; @@ -72,14 +75,13 @@ impl NostrClient { pub async fn unwrap_envelope( &self, note: RelayPoolNotification, - ) -> Option<(EventEnvelope, PublicKey, EventId)> { - let mut result: Option<(EventEnvelope, PublicKey, EventId)> = None; + ) -> Option<(EventEnvelope, PublicKey, EventId, Timestamp)> { + let mut result: Option<(EventEnvelope, PublicKey, EventId, Timestamp)> = None; if let RelayPoolNotification::Event { event, .. } = note { if event.kind == Kind::GiftWrap { result = match self.client.unwrap_gift_wrap(&event).await { - Ok(UnwrappedGift { rumor, sender }) => { - extract_event_envelope(rumor).map(|e| (e, sender, event.id)) - } + Ok(UnwrappedGift { rumor, sender }) => extract_event_envelope(rumor) + .map(|e| (e, sender, event.id, event.created_at)), Err(e) => { error!("Unwrapping gift wrap failed: {e}"); None @@ -121,6 +123,7 @@ pub struct NostrConsumer { client: NostrClient, event_handlers: Arc>>, contact_service: Arc, + offset_store: Arc, } impl NostrConsumer { @@ -129,11 +132,13 @@ impl NostrConsumer { client: NostrClient, contact_service: Arc, event_handlers: Vec>, + offset_store: Arc, ) -> Self { Self { client, event_handlers: Arc::new(event_handlers), contact_service, + offset_store, } } @@ -143,10 +148,19 @@ impl NostrConsumer { let client = self.client.clone(); let event_handlers = self.event_handlers.clone(); let contact_service = self.contact_service.clone(); + let offset_store = self.offset_store.clone(); + + // continue where we left off + let offset_ts = get_offset(&offset_store).await; // subscribe only to private messages sent to our pubkey client - .subscribe(Filter::new().pubkey(client.public_key).kind(Kind::GiftWrap)) + .subscribe( + Filter::new() + .pubkey(client.public_key) + .kind(Kind::GiftWrap) + .since(offset_ts), + ) .await .expect("Failed to subscribe to Nostr events"); @@ -155,16 +169,20 @@ impl NostrConsumer { client .client .handle_notifications(|note| async { - if let Some((envelope, sender, _event_id)) = client.unwrap_envelope(note).await + if let Some((envelope, sender, event_id, time)) = + client.unwrap_envelope(note).await { - // TODO: Check if we already processed this event via event_id - // We only want to handle events from known contacts - if let Ok(sender) = sender.to_bech32() { - trace!("Received event: {envelope:?} from {sender:?}"); - if contact_service.is_known_npub(sender.as_str()).await? { + if !offset_store.is_processed(&event_id.to_hex()).await? { + if let Ok(sender) = sender.to_bech32() { trace!("Received event: {envelope:?} from {sender:?}"); - handle_event(envelope, &event_handlers).await?; + if contact_service.is_known_npub(sender.as_str()).await? { + trace!("Received event: {envelope:?} from {sender:?}"); + handle_event(envelope, &event_handlers).await?; + } } + + // store the new event offset + add_offset(&offset_store, event_id, time, true).await; } }; Ok(false) @@ -176,6 +194,32 @@ impl NostrConsumer { } } +async fn get_offset(db: &Arc) -> Timestamp { + Timestamp::from_secs( + db.current_offset() + .await + .map_err(|e| error!("Could not get event offset: {e}")) + .ok() + .unwrap_or(0), + ) +} + +async fn add_offset( + db: &Arc, + event_id: EventId, + time: Timestamp, + success: bool, +) { + db.add_event(NostrEventOffset { + event_id: event_id.to_hex(), + time: time.as_u64(), + success, + }) + .await + .map_err(|e| error!("Could not store event offset: {e}")) + .ok(); +} + fn extract_event_envelope(rumor: UnsignedEvent) -> Option { if rumor.kind == Kind::PrivateDirectMessage { match serde_json::from_str::(rumor.content.as_str()) { @@ -222,6 +266,8 @@ mod tests { use super::super::test_utils::{get_mock_relay, NOSTR_KEY1}; use super::{NostrClient, NostrConfig, NostrConsumer}; + use crate::persistence::nostr::MockNostrEventOffsetStoreApi; + use crate::persistence::NostrEventOffset; use crate::service::notification_service::Event; use crate::service::{ contact_service::MockContactServiceApi, @@ -295,9 +341,35 @@ mod tests { }) .returning(|_| Ok(())); + let mut offset_store = MockNostrEventOffsetStoreApi::new(); + + // expect the offset store to return the current offset once on start + offset_store + .expect_current_offset() + .returning(|| Ok(1000)) + .once(); + + // should also check if the event has been processed already + offset_store + .expect_is_processed() + .withf(|e: &str| !e.is_empty()) + .returning(|_| Ok(false)) + .once(); + + // when done processing the event, add it to the offset store + offset_store + .expect_add_event() + .withf(|e: &NostrEventOffset| e.success) + .returning(|_| Ok(())) + .once(); + // we start the consumer - let consumer = - NostrConsumer::new(client2, Arc::new(contact_service), vec![Box::new(handler)]); + let consumer = NostrConsumer::new( + client2, + Arc::new(contact_service), + vec![Box::new(handler)], + Arc::new(offset_store), + ); let handle = consumer .start() .await diff --git a/src/util/date.rs b/src/util/date.rs new file mode 100644 index 0000000..4ddd80d --- /dev/null +++ b/src/util/date.rs @@ -0,0 +1,28 @@ +use chrono::{DateTime, TimeZone, Utc}; + +pub type DateTimeUtc = DateTime; + +/// Returns the current time as DateTime +#[allow(dead_code)] +pub fn now() -> DateTimeUtc { + DateTime::default() +} + +/// Quickly create a DateTimeUtc from a timestamp. chrone does not +/// really use Results and most of the errors are super unlikely to +/// happen. +#[allow(dead_code)] +pub fn seconds(timestamp: i64) -> DateTimeUtc { + match Utc.timestamp_opt(timestamp, 0).single() { + Some(dt) => dt, + None => panic!("invalid timestamp"), + } +} + +/// Nostr timestamps are unsigned 64 bit integers. This function converts +/// them to a DateTimeUtc which we can directly use in SurrealDB and +/// everywhere else. +#[allow(dead_code)] +pub fn seconds_unsigned(timestamp: u64) -> DateTimeUtc { + seconds(timestamp as i64) +} diff --git a/src/util/mod.rs b/src/util/mod.rs index de548d7..bb85d37 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,3 +1,4 @@ +pub mod date; pub mod file; pub mod numbers_to_words; pub mod rsa;