diff --git a/cli/src/skell.rs b/cli/src/skell.rs index c692081..3d9b5af 100644 --- a/cli/src/skell.rs +++ b/cli/src/skell.rs @@ -69,6 +69,12 @@ fn get_options() -> String { # events before sending them. # max_time = {} +# The maximum number of events that the client should aggregate before +# sending a batch. +# Defaults to unset, meaning that only max_time and max_envelope_size will +# limit the aggregation. +# max_elements = + # The maximum number of bytes in the SOAP envelope used to deliver # the events. # max_envelope_size = {} diff --git a/common/src/database/postgres.rs b/common/src/database/postgres.rs index 9868850..921bbf2 100644 --- a/common/src/database/postgres.rs +++ b/common/src/database/postgres.rs @@ -211,6 +211,7 @@ fn row_to_subscription(row: &Row) -> Result { let connection_retry_interval: i32 = row.try_get("connection_retry_interval")?; let max_envelope_size: i32 = row.try_get("max_envelope_size")?; let max_time: i32 = row.try_get("max_time")?; + let max_elements: Option = row.try_get("max_elements")?; let princs_filter = PrincsFilter::from( row.try_get("princs_filter_op")?, @@ -226,6 +227,10 @@ fn row_to_subscription(row: &Row) -> Result { .set_connection_retry_count(connection_retry_count.try_into()?) .set_connection_retry_interval(connection_retry_interval.try_into()?) .set_max_time(max_time.try_into()?) + .set_max_elements(match max_elements { + Some(x) => Some(x.try_into()?), + None => None, + }) .set_max_envelope_size(max_envelope_size.try_into()?) .set_enabled(row.try_get("enabled")?) .set_read_existing_events(row.try_get("read_existing_events")?) @@ -614,6 +619,11 @@ impl Database for PostgresDatabase { let connection_retry_count: i32 = subscription.connection_retry_count().into(); let connection_retry_interval: i32 = subscription.connection_retry_interval().try_into()?; let max_time: i32 = subscription.max_time().try_into()?; + let max_elements: Option = match subscription.max_elements() { + Some(x) => Some(x.try_into()?), + None => None, + }; + let max_envelope_size: i32 = subscription.max_envelope_size().try_into()?; let count = self .pool @@ -622,10 +632,10 @@ impl Database for PostgresDatabase { .execute( r#"INSERT INTO subscriptions (uuid, version, revision, name, uri, query, heartbeat_interval, connection_retry_count, connection_retry_interval, - max_time, max_envelope_size, enabled, read_existing_events, content_format, + max_time, max_elements, max_envelope_size, enabled, read_existing_events, content_format, ignore_channel_error, princs_filter_op, princs_filter_value, outputs, locale, data_locale) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21) ON CONFLICT (uuid) DO UPDATE SET version = excluded.version, revision = excluded.revision, @@ -636,6 +646,7 @@ impl Database for PostgresDatabase { connection_retry_count = excluded.connection_retry_count, connection_retry_interval = excluded.connection_retry_interval, max_time = excluded.max_time, + max_elements = excluded.max_elements, max_envelope_size = excluded.max_envelope_size, enabled = excluded.enabled, read_existing_events = excluded.read_existing_events, @@ -657,6 +668,7 @@ impl Database for PostgresDatabase { &connection_retry_count, &connection_retry_interval, &max_time, + &max_elements, &max_envelope_size, &subscription.enabled(), &subscription.read_existing_events(), diff --git a/common/src/database/schema/postgres/_001_create_subscriptions_table.rs b/common/src/database/schema/postgres/_001_create_subscriptions_table.rs index ddf2a76..3e58582 100644 --- a/common/src/database/schema/postgres/_001_create_subscriptions_table.rs +++ b/common/src/database/schema/postgres/_001_create_subscriptions_table.rs @@ -20,6 +20,7 @@ impl PostgresMigration for CreateSubscriptionsTable { connection_retry_count INT4, connection_retry_interval INT4, max_time INT4, + max_elements INT4, max_envelope_size INT4, enabled BOOLEAN, read_existing_events BOOLEAN, diff --git a/common/src/database/schema/sqlite/_001_create_subscriptions_table.rs b/common/src/database/schema/sqlite/_001_create_subscriptions_table.rs index 735ab83..7a5f415 100644 --- a/common/src/database/schema/sqlite/_001_create_subscriptions_table.rs +++ b/common/src/database/schema/sqlite/_001_create_subscriptions_table.rs @@ -19,6 +19,7 @@ impl SQLiteMigration for CreateSubscriptionsTable { connection_retry_count INTEGER, connection_retry_interval INTEGER, max_time INTEGER, + max_elements INTEGER, max_envelope_size INTEGER, enabled INTEGER, read_existing_events INTEGER, diff --git a/common/src/database/sqlite.rs b/common/src/database/sqlite.rs index e105b66..ee51e54 100644 --- a/common/src/database/sqlite.rs +++ b/common/src/database/sqlite.rs @@ -175,6 +175,7 @@ fn row_to_subscription(row: &Row) -> Result { .set_connection_retry_count(row.get("connection_retry_count")?) .set_connection_retry_interval(row.get("connection_retry_interval")?) .set_max_time(row.get("max_time")?) + .set_max_elements(row.get("max_elements")?) .set_max_envelope_size(row.get("max_envelope_size")?) .set_enabled(row.get("enabled")?) .set_read_existing_events(row.get("read_existing_events")?) @@ -553,12 +554,12 @@ impl Database for SQLiteDatabase { conn.execute( r#"INSERT INTO subscriptions (uuid, version, revision, name, uri, query, heartbeat_interval, connection_retry_count, connection_retry_interval, - max_time, max_envelope_size, enabled, read_existing_events, content_format, + max_time, max_elements, max_envelope_size, enabled, read_existing_events, content_format, ignore_channel_error, princs_filter_op, princs_filter_value, outputs, locale, data_locale) VALUES (:uuid, :version, :revision, :name, :uri, :query, :heartbeat_interval, :connection_retry_count, :connection_retry_interval, - :max_time, :max_envelope_size, :enabled, :read_existing_events, :content_format, + :max_time, :max_elements, :max_envelope_size, :enabled, :read_existing_events, :content_format, :ignore_channel_error, :princs_filter_op, :princs_filter_value, :outputs, :locale, :data_locale) ON CONFLICT (uuid) DO UPDATE SET @@ -571,6 +572,7 @@ impl Database for SQLiteDatabase { connection_retry_count = excluded.connection_retry_count, connection_retry_interval = excluded.connection_retry_interval, max_time = excluded.max_time, + max_elements = excluded.max_elements, max_envelope_size = excluded.max_envelope_size, enabled = excluded.enabled, read_existing_events = excluded.read_existing_events, @@ -592,6 +594,7 @@ impl Database for SQLiteDatabase { ":connection_retry_count": subscription.connection_retry_count(), ":connection_retry_interval": subscription.connection_retry_interval(), ":max_time": subscription.max_time(), + ":max_elements": subscription.max_elements(), ":max_envelope_size": subscription.max_envelope_size(), ":enabled": subscription.enabled(), ":read_existing_events": subscription.read_existing_events(), diff --git a/common/src/models/config.rs b/common/src/models/config.rs index 2c44018..79372a3 100644 --- a/common/src/models/config.rs +++ b/common/src/models/config.rs @@ -216,6 +216,7 @@ struct SubscriptionOptions { pub connection_retry_count: Option, pub connection_retry_interval: Option, pub max_time: Option, + pub max_elements: Option, pub max_envelope_size: Option, pub enabled: Option, pub read_existing_events: Option, @@ -245,6 +246,8 @@ impl SubscriptionOptions { data.set_max_time(max_time); } + data.set_max_elements(self.max_elements); + if let Some(max_envelope_size) = self.max_envelope_size { data.set_max_envelope_size(max_envelope_size); } @@ -344,6 +347,7 @@ heartbeat_interval = 32 connection_retry_count = 11 connection_retry_interval = 12 max_time = 13 +max_elements = 15 max_envelope_size = 14 read_existing_events = false content_format = "Raw" # or RenderedText @@ -433,6 +437,7 @@ path = "/whatever/you/{ip}/want/{principal}/{ip:2}/{node}/end" .set_connection_retry_count(11) .set_connection_retry_interval(12) .set_max_time(13) + .set_max_elements(Some(15)) .set_max_envelope_size(14) .set_read_existing_events(false) .set_content_format(crate::subscription::ContentFormat::Raw) diff --git a/common/src/models/export.rs b/common/src/models/export.rs index 7d0f0de..d466448 100644 --- a/common/src/models/export.rs +++ b/common/src/models/export.rs @@ -589,6 +589,7 @@ pub mod v2 { pub connection_retry_count: u16, pub connection_retry_interval: u32, pub max_time: u32, + pub max_elements: Option, pub max_envelope_size: u32, pub enabled: bool, pub read_existing_events: bool, @@ -609,6 +610,7 @@ pub mod v2 { .set_connection_retry_count(value.connection_retry_count) .set_connection_retry_interval(value.connection_retry_interval) .set_max_time(value.max_time) + .set_max_elements(value.max_elements) .set_max_envelope_size(value.max_envelope_size) .set_enabled(value.enabled) .set_read_existing_events(value.read_existing_events) @@ -637,6 +639,7 @@ pub mod v2 { connection_retry_count: value.connection_retry_count(), connection_retry_interval: value.connection_retry_interval(), max_time: value.max_time(), + max_elements: value.max_elements(), max_envelope_size: value.max_envelope_size(), enabled: value.enabled(), read_existing_events: value.read_existing_events(), @@ -698,6 +701,7 @@ mod tests { .set_ignore_channel_error(false) .set_max_envelope_size(10000) .set_max_time(1) + .set_max_elements(Some(100)) .set_read_existing_events(false) .set_uri(Some("toto".to_string())) .set_princs_filter(crate::subscription::PrincsFilter::new( diff --git a/common/src/subscription.rs b/common/src/subscription.rs index e1ded7a..81d00ad 100644 --- a/common/src/subscription.rs +++ b/common/src/subscription.rs @@ -395,6 +395,7 @@ pub struct SubscriptionParameters { pub connection_retry_count: u16, pub connection_retry_interval: u32, pub max_time: u32, + pub max_elements: Option, pub max_envelope_size: u32, pub read_existing_events: bool, pub content_format: ContentFormat, @@ -471,6 +472,14 @@ impl Display for SubscriptionData { "\tMax time without heartbeat/events: {}s", self.max_time() )?; + writeln!( + f, + "\tMax events in a batch: {}", + match self.max_elements() { + Some(max_elements) => max_elements.to_string(), + None => "Not configured".to_string(), + } + )?; writeln!(f, "\tMax envelope size: {} bytes", self.max_envelope_size())?; writeln!(f, "\tRead existing events: {}", self.read_existing_events())?; writeln!(f, "\tContent format: {}", self.content_format())?; @@ -535,6 +544,7 @@ impl SubscriptionData { connection_retry_count: DEFAULT_CONNECTION_RETRY_COUNT, connection_retry_interval: DEFAULT_CONNECTION_RETRY_INTERVAL, max_time: DEFAULT_MAX_TIME, + max_elements: None, max_envelope_size: DEFAULT_MAX_ENVELOPE_SIZE, read_existing_events: DEFAULT_READ_EXISTING_EVENTS, content_format: DEFAULT_CONTENT_FORMAT, @@ -616,6 +626,11 @@ impl SubscriptionData { self.parameters.max_time } + /// Get a reference to the subscription's max elements. + pub fn max_elements(&self) -> Option { + self.parameters.max_elements + } + /// Get a reference to the subscription's max envelope size. pub fn max_envelope_size(&self) -> u32 { self.parameters.max_envelope_size @@ -668,6 +683,13 @@ impl SubscriptionData { self } + /// Set the subscription's max elements. + pub fn set_max_elements(&mut self, max_elements: Option) -> &mut Self { + self.parameters.max_elements = max_elements; + self.update_internal_version(); + self + } + /// Set the subscription's max envelope size. pub fn set_max_envelope_size(&mut self, max_envelope_size: u32) -> &mut Self { self.parameters.max_envelope_size = max_envelope_size; diff --git a/doc/subscription.md b/doc/subscription.md index 235cd95..dec0077 100644 --- a/doc/subscription.md +++ b/doc/subscription.md @@ -38,6 +38,7 @@ Subscriptions and their parameters are not defined in OpenWEC configuration file | `connection_retry_count` | No | 5 | Number of times the client will attempt to connect if the subscriber is unreachable. | | `connection_retry_interval` | No | 60 | Interval observed between each connection attempt if the subscriber is unreachable. | | `max_time` | No | 30 | The maximum time, in seconds, that the client should aggregate new events before sending them. | +| `max_elements` | No | *Undefined* | The maximum number of events that the client should aggregate before sending a batch. Defaults to unset, meaning that only max_time and max_envelope_size will limit the aggregation. | | `max_envelope_size` | No | 512000 | The maximum number of bytes in the SOAP envelope used to deliver the events. | | `enabled` | No | `False` | Whether the subscription is enabled or not. Not that a new subscription is **disabled** by default, and **can not** be enabled unless you configure at least one output. As a safe guard, subscriptions without outputs are ignored by openwec server. | | `read_existing_events` | No | `False` | If `True`, the event source should replay all possible events that match the filter and any events that subsequently occur for that event source. | diff --git a/server/src/logic.rs b/server/src/logic.rs index b268206..7a15076 100644 --- a/server/src/logic.rs +++ b/server/src/logic.rs @@ -200,6 +200,7 @@ async fn handle_enumerate( connection_retry_count: subscription_data.connection_retry_count(), connection_retry_interval: subscription_data.connection_retry_interval(), max_time: subscription_data.max_time(), + max_elements: subscription_data.max_elements(), max_envelope_size: subscription_data.max_envelope_size(), thumbprint: match auth_ctx { AuthenticationContext::Tls(_, thumbprint) => Some(thumbprint.clone()), diff --git a/server/src/soap.rs b/server/src/soap.rs index e7c3db1..8f78679 100644 --- a/server/src/soap.rs +++ b/server/src/soap.rs @@ -96,6 +96,7 @@ pub struct SubscriptionBody { pub connection_retry_interval: u32, pub connection_retry_count: u16, pub max_time: u32, + pub max_elements: Option, pub max_envelope_size: u32, pub thumbprint: Option, pub public_version: String, @@ -243,6 +244,11 @@ impl Serializable for SubscriptionBody { .write_text_content(BytesText::new( format!("PT{}.0S", self.connection_retry_interval).as_str(), ))?; + if let Some(max_elements) = &self.max_elements { + writer.create_element("w:MaxElements").write_text_content( + BytesText::new(format!("{}", max_elements).as_str()), + )?; + } writer.create_element("w:MaxTime").write_text_content( BytesText::new(format!("PT{}.000S", self.max_time).as_str()), )?; diff --git a/subscription.sample.toml b/subscription.sample.toml index 350d3c7..6e82630 100644 --- a/subscription.sample.toml +++ b/subscription.sample.toml @@ -40,6 +40,12 @@ query = """ # events before sending them. # max_time = 30 +# The maximum number of events that the client should aggregate before +# sending a batch. +# Defaults to unset, meaning that only max_time and max_envelope_size will +# limit the aggregation. +# max_elements = + # The maximum number of bytes in the SOAP envelope used to deliver # the events. # max_envelope_size = 512000