Skip to content

Commit

Permalink
Add max_elements to subscriptions
Browse files Browse the repository at this point in the history
The maximum number of events that the client should aggregate before
sending a batch.
  • Loading branch information
MrAnno committed Oct 24, 2024
1 parent 16305e8 commit a92984f
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 4 deletions.
6 changes: 6 additions & 0 deletions cli/src/skell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ fn get_options() -> String {
# events before sending them.
# max_time = {}
# The maximum number of events that the client should aggregate before
# sending a batch.
# Defaults to unset, meaning that only max_time and max_envelope_size will
# limit the aggregation.
# max_elements =
# The maximum number of bytes in the SOAP envelope used to deliver
# the events.
# max_envelope_size = {}
Expand Down
16 changes: 14 additions & 2 deletions common/src/database/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ fn row_to_subscription(row: &Row) -> Result<SubscriptionData> {
let connection_retry_interval: i32 = row.try_get("connection_retry_interval")?;
let max_envelope_size: i32 = row.try_get("max_envelope_size")?;
let max_time: i32 = row.try_get("max_time")?;
let max_elements: Option<i32> = row.try_get("max_elements")?;

let princs_filter = PrincsFilter::from(
row.try_get("princs_filter_op")?,
Expand All @@ -226,6 +227,10 @@ fn row_to_subscription(row: &Row) -> Result<SubscriptionData> {
.set_connection_retry_count(connection_retry_count.try_into()?)
.set_connection_retry_interval(connection_retry_interval.try_into()?)
.set_max_time(max_time.try_into()?)
.set_max_elements(match max_elements {
Some(x) => Some(x.try_into()?),
None => None,
})
.set_max_envelope_size(max_envelope_size.try_into()?)
.set_enabled(row.try_get("enabled")?)
.set_read_existing_events(row.try_get("read_existing_events")?)
Expand Down Expand Up @@ -614,6 +619,11 @@ impl Database for PostgresDatabase {
let connection_retry_count: i32 = subscription.connection_retry_count().into();
let connection_retry_interval: i32 = subscription.connection_retry_interval().try_into()?;
let max_time: i32 = subscription.max_time().try_into()?;
let max_elements: Option<i32> = match subscription.max_elements() {
Some(x) => Some(x.try_into()?),
None => None,
};

let max_envelope_size: i32 = subscription.max_envelope_size().try_into()?;
let count = self
.pool
Expand All @@ -622,10 +632,10 @@ impl Database for PostgresDatabase {
.execute(
r#"INSERT INTO subscriptions (uuid, version, revision, name, uri, query,
heartbeat_interval, connection_retry_count, connection_retry_interval,
max_time, max_envelope_size, enabled, read_existing_events, content_format,
max_time, max_elements, max_envelope_size, enabled, read_existing_events, content_format,
ignore_channel_error, princs_filter_op, princs_filter_value, outputs, locale,
data_locale)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21)
ON CONFLICT (uuid) DO UPDATE SET
version = excluded.version,
revision = excluded.revision,
Expand All @@ -636,6 +646,7 @@ impl Database for PostgresDatabase {
connection_retry_count = excluded.connection_retry_count,
connection_retry_interval = excluded.connection_retry_interval,
max_time = excluded.max_time,
max_elements = excluded.max_elements,
max_envelope_size = excluded.max_envelope_size,
enabled = excluded.enabled,
read_existing_events = excluded.read_existing_events,
Expand All @@ -657,6 +668,7 @@ impl Database for PostgresDatabase {
&connection_retry_count,
&connection_retry_interval,
&max_time,
&max_elements,
&max_envelope_size,
&subscription.enabled(),
&subscription.read_existing_events(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ impl PostgresMigration for CreateSubscriptionsTable {
connection_retry_count INT4,
connection_retry_interval INT4,
max_time INT4,
max_elements INT4,
max_envelope_size INT4,
enabled BOOLEAN,
read_existing_events BOOLEAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ impl SQLiteMigration for CreateSubscriptionsTable {
connection_retry_count INTEGER,
connection_retry_interval INTEGER,
max_time INTEGER,
max_elements INTEGER,
max_envelope_size INTEGER,
enabled INTEGER,
read_existing_events INTEGER,
Expand Down
7 changes: 5 additions & 2 deletions common/src/database/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ fn row_to_subscription(row: &Row) -> Result<SubscriptionData> {
.set_connection_retry_count(row.get("connection_retry_count")?)
.set_connection_retry_interval(row.get("connection_retry_interval")?)
.set_max_time(row.get("max_time")?)
.set_max_elements(row.get("max_elements")?)
.set_max_envelope_size(row.get("max_envelope_size")?)
.set_enabled(row.get("enabled")?)
.set_read_existing_events(row.get("read_existing_events")?)
Expand Down Expand Up @@ -553,12 +554,12 @@ impl Database for SQLiteDatabase {
conn.execute(
r#"INSERT INTO subscriptions (uuid, version, revision, name, uri, query,
heartbeat_interval, connection_retry_count, connection_retry_interval,
max_time, max_envelope_size, enabled, read_existing_events, content_format,
max_time, max_elements, max_envelope_size, enabled, read_existing_events, content_format,
ignore_channel_error, princs_filter_op, princs_filter_value, outputs, locale,
data_locale)
VALUES (:uuid, :version, :revision, :name, :uri, :query,
:heartbeat_interval, :connection_retry_count, :connection_retry_interval,
:max_time, :max_envelope_size, :enabled, :read_existing_events, :content_format,
:max_time, :max_elements, :max_envelope_size, :enabled, :read_existing_events, :content_format,
:ignore_channel_error, :princs_filter_op, :princs_filter_value, :outputs,
:locale, :data_locale)
ON CONFLICT (uuid) DO UPDATE SET
Expand All @@ -571,6 +572,7 @@ impl Database for SQLiteDatabase {
connection_retry_count = excluded.connection_retry_count,
connection_retry_interval = excluded.connection_retry_interval,
max_time = excluded.max_time,
max_elements = excluded.max_elements,
max_envelope_size = excluded.max_envelope_size,
enabled = excluded.enabled,
read_existing_events = excluded.read_existing_events,
Expand All @@ -592,6 +594,7 @@ impl Database for SQLiteDatabase {
":connection_retry_count": subscription.connection_retry_count(),
":connection_retry_interval": subscription.connection_retry_interval(),
":max_time": subscription.max_time(),
":max_elements": subscription.max_elements(),
":max_envelope_size": subscription.max_envelope_size(),
":enabled": subscription.enabled(),
":read_existing_events": subscription.read_existing_events(),
Expand Down
5 changes: 5 additions & 0 deletions common/src/models/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ struct SubscriptionOptions {
pub connection_retry_count: Option<u16>,
pub connection_retry_interval: Option<u32>,
pub max_time: Option<u32>,
pub max_elements: Option<u32>,
pub max_envelope_size: Option<u32>,
pub enabled: Option<bool>,
pub read_existing_events: Option<bool>,
Expand Down Expand Up @@ -245,6 +246,8 @@ impl SubscriptionOptions {
data.set_max_time(max_time);
}

data.set_max_elements(self.max_elements);

if let Some(max_envelope_size) = self.max_envelope_size {
data.set_max_envelope_size(max_envelope_size);
}
Expand Down Expand Up @@ -344,6 +347,7 @@ heartbeat_interval = 32
connection_retry_count = 11
connection_retry_interval = 12
max_time = 13
max_elements = 15
max_envelope_size = 14
read_existing_events = false
content_format = "Raw" # or RenderedText
Expand Down Expand Up @@ -433,6 +437,7 @@ path = "/whatever/you/{ip}/want/{principal}/{ip:2}/{node}/end"
.set_connection_retry_count(11)
.set_connection_retry_interval(12)
.set_max_time(13)
.set_max_elements(Some(15))
.set_max_envelope_size(14)
.set_read_existing_events(false)
.set_content_format(crate::subscription::ContentFormat::Raw)
Expand Down
4 changes: 4 additions & 0 deletions common/src/models/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ pub mod v2 {
pub connection_retry_count: u16,
pub connection_retry_interval: u32,
pub max_time: u32,
pub max_elements: Option<u32>,
pub max_envelope_size: u32,
pub enabled: bool,
pub read_existing_events: bool,
Expand All @@ -609,6 +610,7 @@ pub mod v2 {
.set_connection_retry_count(value.connection_retry_count)
.set_connection_retry_interval(value.connection_retry_interval)
.set_max_time(value.max_time)
.set_max_elements(value.max_elements)
.set_max_envelope_size(value.max_envelope_size)
.set_enabled(value.enabled)
.set_read_existing_events(value.read_existing_events)
Expand Down Expand Up @@ -637,6 +639,7 @@ pub mod v2 {
connection_retry_count: value.connection_retry_count(),
connection_retry_interval: value.connection_retry_interval(),
max_time: value.max_time(),
max_elements: value.max_elements(),
max_envelope_size: value.max_envelope_size(),
enabled: value.enabled(),
read_existing_events: value.read_existing_events(),
Expand Down Expand Up @@ -698,6 +701,7 @@ mod tests {
.set_ignore_channel_error(false)
.set_max_envelope_size(10000)
.set_max_time(1)
.set_max_elements(Some(100))
.set_read_existing_events(false)
.set_uri(Some("toto".to_string()))
.set_princs_filter(crate::subscription::PrincsFilter::new(
Expand Down
22 changes: 22 additions & 0 deletions common/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ pub struct SubscriptionParameters {
pub connection_retry_count: u16,
pub connection_retry_interval: u32,
pub max_time: u32,
pub max_elements: Option<u32>,
pub max_envelope_size: u32,
pub read_existing_events: bool,
pub content_format: ContentFormat,
Expand Down Expand Up @@ -471,6 +472,14 @@ impl Display for SubscriptionData {
"\tMax time without heartbeat/events: {}s",
self.max_time()
)?;
writeln!(
f,
"\tMax events in a batch: {}",
match self.max_elements() {
Some(max_elements) => max_elements.to_string(),
None => "Not configured".to_string(),
}
)?;
writeln!(f, "\tMax envelope size: {} bytes", self.max_envelope_size())?;
writeln!(f, "\tRead existing events: {}", self.read_existing_events())?;
writeln!(f, "\tContent format: {}", self.content_format())?;
Expand Down Expand Up @@ -535,6 +544,7 @@ impl SubscriptionData {
connection_retry_count: DEFAULT_CONNECTION_RETRY_COUNT,
connection_retry_interval: DEFAULT_CONNECTION_RETRY_INTERVAL,
max_time: DEFAULT_MAX_TIME,
max_elements: None,
max_envelope_size: DEFAULT_MAX_ENVELOPE_SIZE,
read_existing_events: DEFAULT_READ_EXISTING_EVENTS,
content_format: DEFAULT_CONTENT_FORMAT,
Expand Down Expand Up @@ -616,6 +626,11 @@ impl SubscriptionData {
self.parameters.max_time
}

/// Get a reference to the subscription's max elements.
pub fn max_elements(&self) -> Option<u32> {
self.parameters.max_elements
}

/// Get a reference to the subscription's max envelope size.
pub fn max_envelope_size(&self) -> u32 {
self.parameters.max_envelope_size
Expand Down Expand Up @@ -668,6 +683,13 @@ impl SubscriptionData {
self
}

/// Set the subscription's max elements.
pub fn set_max_elements(&mut self, max_elements: Option<u32>) -> &mut Self {
self.parameters.max_elements = max_elements;
self.update_internal_version();
self
}

/// Set the subscription's max envelope size.
pub fn set_max_envelope_size(&mut self, max_envelope_size: u32) -> &mut Self {
self.parameters.max_envelope_size = max_envelope_size;
Expand Down
1 change: 1 addition & 0 deletions doc/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Subscriptions and their parameters are not defined in OpenWEC configuration file
| `connection_retry_count` | No | 5 | Number of times the client will attempt to connect if the subscriber is unreachable. |
| `connection_retry_interval` | No | 60 | Interval observed between each connection attempt if the subscriber is unreachable. |
| `max_time` | No | 30 | The maximum time, in seconds, that the client should aggregate new events before sending them. |
| `max_elements` | No | *Undefined* | The maximum number of events that the client should aggregate before sending a batch. Defaults to unset, meaning that only max_time and max_envelope_size will limit the aggregation. |
| `max_envelope_size` | No | 512000 | The maximum number of bytes in the SOAP envelope used to deliver the events. |
| `enabled` | No | `False` | Whether the subscription is enabled or not. Not that a new subscription is **disabled** by default, and **can not** be enabled unless you configure at least one output. As a safe guard, subscriptions without outputs are ignored by openwec server. |
| `read_existing_events` | No | `False` | If `True`, the event source should replay all possible events that match the filter and any events that subsequently occur for that event source. |
Expand Down
1 change: 1 addition & 0 deletions server/src/logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ async fn handle_enumerate(
connection_retry_count: subscription_data.connection_retry_count(),
connection_retry_interval: subscription_data.connection_retry_interval(),
max_time: subscription_data.max_time(),
max_elements: subscription_data.max_elements(),
max_envelope_size: subscription_data.max_envelope_size(),
thumbprint: match auth_ctx {
AuthenticationContext::Tls(_, thumbprint) => Some(thumbprint.clone()),
Expand Down
6 changes: 6 additions & 0 deletions server/src/soap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub struct SubscriptionBody {
pub connection_retry_interval: u32,
pub connection_retry_count: u16,
pub max_time: u32,
pub max_elements: Option<u32>,
pub max_envelope_size: u32,
pub thumbprint: Option<String>,
pub public_version: String,
Expand Down Expand Up @@ -243,6 +244,11 @@ impl Serializable for SubscriptionBody {
.write_text_content(BytesText::new(
format!("PT{}.0S", self.connection_retry_interval).as_str(),
))?;
if let Some(max_elements) = &self.max_elements {
writer.create_element("w:MaxElements").write_text_content(
BytesText::new(format!("{}", max_elements).as_str()),
)?;
}
writer.create_element("w:MaxTime").write_text_content(
BytesText::new(format!("PT{}.000S", self.max_time).as_str()),
)?;
Expand Down
6 changes: 6 additions & 0 deletions subscription.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ query = """
# events before sending them.
# max_time = 30

# The maximum number of events that the client should aggregate before
# sending a batch.
# Defaults to unset, meaning that only max_time and max_envelope_size will
# limit the aggregation.
# max_elements =

# The maximum number of bytes in the SOAP envelope used to deliver
# the events.
# max_envelope_size = 512000
Expand Down

0 comments on commit a92984f

Please sign in to comment.