From 16305e8c3cb25656349705d0b87a7a16a6a106d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A1szl=C3=B3=20V=C3=A1rady?= Date: Wed, 23 Oct 2024 19:52:10 +0200 Subject: [PATCH 1/6] common/doc: remove trailing whitespaces --- common/src/database/postgres.rs | 14 +++++++------- common/src/database/sqlite.rs | 24 ++++++++++++------------ common/src/models/config.rs | 2 +- doc/subscription.md | 28 ++++++++++++++-------------- 4 files changed, 34 insertions(+), 34 deletions(-) diff --git a/common/src/database/postgres.rs b/common/src/database/postgres.rs index 2238091..9868850 100644 --- a/common/src/database/postgres.rs +++ b/common/src/database/postgres.rs @@ -155,8 +155,8 @@ impl PostgresDatabase { client .query( format!( - r#"SELECT * - FROM heartbeats + r#"SELECT * + FROM heartbeats JOIN subscriptions ON subscriptions.uuid = heartbeats.subscription WHERE {} = $1 AND subscription = $2"#, @@ -170,8 +170,8 @@ impl PostgresDatabase { client .query( format!( - r#"SELECT * - FROM heartbeats + r#"SELECT * + FROM heartbeats JOIN subscriptions ON subscriptions.uuid = heartbeats.subscription WHERE {} = $1"#, field @@ -572,7 +572,7 @@ impl Database for PostgresDatabase { .await? .query( r#" - SELECT * + SELECT * FROM subscriptions "#, &[], @@ -596,7 +596,7 @@ impl Database for PostgresDatabase { .get() .await? .query_opt( - r#"SELECT * + r#"SELECT * FROM subscriptions WHERE uuid = $1 OR name = $1"#, &[&identifier], @@ -626,7 +626,7 @@ impl Database for PostgresDatabase { ignore_channel_error, princs_filter_op, princs_filter_value, outputs, locale, data_locale) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20) - ON CONFLICT (uuid) DO UPDATE SET + ON CONFLICT (uuid) DO UPDATE SET version = excluded.version, revision = excluded.revision, name = excluded.name, diff --git a/common/src/database/sqlite.rs b/common/src/database/sqlite.rs index 74bd6f3..e105b66 100644 --- a/common/src/database/sqlite.rs +++ b/common/src/database/sqlite.rs @@ -3,19 +3,19 @@ // license (MIT), we include below its copyright notice and permission notice: // // The MIT License (MIT) -// +// // Copyright (c) 2015 Skyler Lipthay -// +// // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: -// +// // The above copyright notice and this permission notice shall be included in all // copies or substantial portions of the Software. -// +// // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -125,7 +125,7 @@ impl SQLiteDatabase { )?; let rows = statement.query_and_then(&[(":field_value", &field_value), (":subscription", &value)], row_to_heartbeat)?; - let mut heartbeats = Vec::new(); + let mut heartbeats = Vec::new(); for heartbeat in rows { heartbeats.push(heartbeat?); } @@ -134,7 +134,7 @@ impl SQLiteDatabase { let mut statement = conn.prepare( format!( r#"SELECT * - FROM heartbeats + FROM heartbeats JOIN subscriptions ON subscriptions.uuid = heartbeats.subscription WHERE {} = :field_value"#, field @@ -142,7 +142,7 @@ impl SQLiteDatabase { .as_str() )?; let rows = statement.query_and_then(&[(":field_value", &field_value)], row_to_heartbeat)?; - let mut heartbeats = Vec::new(); + let mut heartbeats = Vec::new(); for heartbeat in rows { heartbeats.push(heartbeat?); } @@ -321,12 +321,12 @@ impl Database for SQLiteDatabase { (None, None) => { client.interact(move |conn| { conn.execute("DELETE FROM bookmarks", []) - }).await + }).await } }; future.map_err(|err| anyhow!(format!("{}", err)))??; Ok(()) - + } async fn get_heartbeats_by_machine( @@ -469,7 +469,7 @@ impl Database for SQLiteDatabase { for (key, value) in heartbeats_cloned { match value.last_event_seen { Some(last_event_seen) => { - query_with_event + query_with_event .execute( params![ &key.machine, @@ -561,7 +561,7 @@ impl Database for SQLiteDatabase { :max_time, :max_envelope_size, :enabled, :read_existing_events, :content_format, :ignore_channel_error, :princs_filter_op, :princs_filter_value, :outputs, :locale, :data_locale) - ON CONFLICT (uuid) DO UPDATE SET + ON CONFLICT (uuid) DO UPDATE SET version = excluded.version, revision = excluded.revision, name = excluded.name, @@ -776,7 +776,7 @@ impl Database for SQLiteDatabase { .interact(move |conn| { conn.query_row( r#"SELECT COUNT(machine) - FROM heartbeats + FROM heartbeats WHERE subscription = :subscription"#, &[(":subscription", &subscription_owned)], |row| row.get(0), diff --git a/common/src/models/config.rs b/common/src/models/config.rs index 52a3e3c..2c44018 100644 --- a/common/src/models/config.rs +++ b/common/src/models/config.rs @@ -361,7 +361,7 @@ enabled = true [outputs.config] base = "/tmp/" -split_on_addr_index = 2 +split_on_addr_index = 2 append_node_name = true filename = "courgette" diff --git a/doc/subscription.md b/doc/subscription.md index 192c7fe..235cd95 100644 --- a/doc/subscription.md +++ b/doc/subscription.md @@ -1,6 +1,6 @@ # Subscription -A subscription enables a Windows Event Collector to retrieve a set of events from a set of machines using a dedicated configuration. +A subscription enables a Windows Event Collector to retrieve a set of events from a set of machines using a dedicated configuration. The set of events is defined by a list of XPath filter queries. For example, here is a query list composed of a single query which retrieves all event logs within channels `Application`, `Security`, `Setup` and `System`: ```xml @@ -24,7 +24,7 @@ In addition, each subscription is identified by a GUID called `uuid`, which is n Each Windows machine configured to contact a Windows Event Collector server will send an `Enumerate` request to get a list of subscriptions. It will then create locally these subscriptions and fullfill them. -## Parameters +## Parameters Subscriptions and their parameters are not defined in OpenWEC configuration file but in OpenWEC database. Therefore, you **must** use `openwec` cli to edit them. You should **never update subscription parameters directly in database**. @@ -123,7 +123,7 @@ To use configuration files, edit them and then run `openwec subscriptions load`. ### Revisions -When using the `openwec subscriptions load` command, you can use the `--revision` flag to specify a revision string that represents the configuration version. For example, you can use the output of `git rev-parse --short HEAD` if your configuration files are versioned using `git`. +When using the `openwec subscriptions load` command, you can use the `--revision` flag to specify a revision string that represents the configuration version. For example, you can use the output of `git rev-parse --short HEAD` if your configuration files are versioned using `git`. When a client retrieves its subscriptions, it also receives the associated revision strings. Later, when pushing events or sending heartbeats, the revision string is included as metadata. The revision string received by OpenWEC within events is called `ClientRevision` because it represents the revision "used" by the client at that time. The revision string is not used to compute the subscription version that clients use to determine whether the subscription has been updated since their last `Refresh`. This is because some configuration updates may only affect "server" parameters (i.e. outputs), and we do not want all clients to refresh the subscription unnecessarily. However, if the configuration update affects "client" parameters (such as query), the subscription version is updated and clients will retrieve the new version of the subscription configuration with the new revision string on the next `Refresh`. @@ -147,7 +147,7 @@ You can disable all cli commands that edit subscriptions using the OpenWEC setti List subscriptions in a "short" format. Each line represents a subscription, with its status (enabled or not), its name and its URI. -#### Usage +#### Usage ``` $ openwec subscriptions @@ -156,7 +156,7 @@ $ openwec subscriptions [+] Subscription-toto (/toto) ``` -There are 3 subscriptions: +There are 3 subscriptions: - A subscription named `Old subscription`, disabled with no URI defined. - A subscription named `My-new-subscription`, enabled with no URI defined. - A subscription named `Subscription-toto`, enabled with a URI set to `/toto`. @@ -167,7 +167,7 @@ Otherwise, if a Windows machine sends an Enumerate request using URI `/toto`, it ### `openwec subscriptions new` (deprecated) > [!WARNING] -> Using commands to manage subscriptions and there outputs is **deprecated** and will be removed in future releases. Use subscription configuration files instead. +> Using commands to manage subscriptions and there outputs is **deprecated** and will be removed in future releases. Use subscription configuration files instead. This command enables you to create a new subscription. @@ -191,7 +191,7 @@ You may add some using `openwec subscriptions output`, which is detailed in [Out ### `openwec subscriptions edit` (deprecated) > [!WARNING] -> Using commands to manage subscriptions and there outputs is **deprecated** and will be removed in future releases. Use subscription configuration files instead. +> Using commands to manage subscriptions and there outputs is **deprecated** and will be removed in future releases. Use subscription configuration files instead. This command enables you to edit an already existing subscription. @@ -238,7 +238,7 @@ Subscription my-super-subscription ContentFormat: Raw IgnoreChannelError: true Principal filter: Not configured - Outputs: Not configured + Outputs: Not configured Enabled: false Event filter query: @@ -256,7 +256,7 @@ Event filter query: ### `openwec subscriptions duplicate` (deprecated) > [!WARNING] -> Using commands to manage subscriptions and there outputs is **deprecated** and will be removed in future releases. Use subscription configuration files instead. +> Using commands to manage subscriptions and there outputs is **deprecated** and will be removed in future releases. Use subscription configuration files instead. This command duplicates an existing subscription. @@ -303,7 +303,7 @@ Event filter query: ### `openwec subscriptions export` (deprecated) > [!WARNING] -> Using commands to manage subscriptions and there outputs is **deprecated** and will be removed in future releases. Use subscription configuration files instead. +> Using commands to manage subscriptions and there outputs is **deprecated** and will be removed in future releases. Use subscription configuration files instead. This command exports the currently configured subscriptions in a `json` format. You may export only one subscription using `--subscription `. @@ -322,7 +322,7 @@ $ openwec subscriptions export ### `openwec subscriptions import` (deprecated) > [!WARNING] -> Using commands to manage subscriptions and there outputs is **deprecated** and will be removed in future releases. Use subscription configuration files instead. +> Using commands to manage subscriptions and there outputs is **deprecated** and will be removed in future releases. Use subscription configuration files instead. This command imports subscriptions from a file. Two formats are supported: * `openwec`: the format generated by `openwec subscriptions export`. **Importing subscriptions exported from another openwec version might not work.** @@ -340,7 +340,7 @@ $ openwec subscriptions import -f windows windows-subscription.xml ### `openwec subscriptions delete` (deprecated) > [!WARNING] -> Using commands to manage subscriptions and there outputs is **deprecated** and will be removed in future releases. Use subscription configuration files instead. +> Using commands to manage subscriptions and there outputs is **deprecated** and will be removed in future releases. Use subscription configuration files instead. This command deletes subscriptions, and all associated bookmarks and heartbeats. There is no way to undo this action (unless you backup your database, and **you should definitely do it**). @@ -375,7 +375,7 @@ $ openwec subscriptions machines my-super-subscription ### `openwec subscriptions enable` (deprecated) > [!WARNING] -> Using commands to manage subscriptions and there outputs is **deprecated** and will be removed in future releases. Use subscription configuration files instead. +> Using commands to manage subscriptions and there outputs is **deprecated** and will be removed in future releases. Use subscription configuration files instead. This command enables one or many subscriptions. You may also want to enable all configured subscriptions without listing them using `--all`. @@ -394,7 +394,7 @@ $ openwec subscriptions enable my-super-subscription this-is-a-clone ### `openwec subscriptions disable` (deprecated) > [!WARNING] -> Using commands to manage subscriptions and there outputs is **deprecated** and will be removed in future releases. Use subscription configuration files instead. +> Using commands to manage subscriptions and there outputs is **deprecated** and will be removed in future releases. Use subscription configuration files instead. This command disables one or many subscriptions. You may also want to disable all configured subscriptions without listing them using `--all`. From a92984fb0d52f810d2f436218bc19db3dbd02c5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A1szl=C3=B3=20V=C3=A1rady?= Date: Wed, 23 Oct 2024 19:51:32 +0200 Subject: [PATCH 2/6] Add max_elements to subscriptions The maximum number of events that the client should aggregate before sending a batch. --- cli/src/skell.rs | 6 +++++ common/src/database/postgres.rs | 16 ++++++++++++-- .../_001_create_subscriptions_table.rs | 1 + .../sqlite/_001_create_subscriptions_table.rs | 1 + common/src/database/sqlite.rs | 7 ++++-- common/src/models/config.rs | 5 +++++ common/src/models/export.rs | 4 ++++ common/src/subscription.rs | 22 +++++++++++++++++++ doc/subscription.md | 1 + server/src/logic.rs | 1 + server/src/soap.rs | 6 +++++ subscription.sample.toml | 6 +++++ 12 files changed, 72 insertions(+), 4 deletions(-) diff --git a/cli/src/skell.rs b/cli/src/skell.rs index c692081..3d9b5af 100644 --- a/cli/src/skell.rs +++ b/cli/src/skell.rs @@ -69,6 +69,12 @@ fn get_options() -> String { # events before sending them. # max_time = {} +# The maximum number of events that the client should aggregate before +# sending a batch. +# Defaults to unset, meaning that only max_time and max_envelope_size will +# limit the aggregation. +# max_elements = + # The maximum number of bytes in the SOAP envelope used to deliver # the events. # max_envelope_size = {} diff --git a/common/src/database/postgres.rs b/common/src/database/postgres.rs index 9868850..921bbf2 100644 --- a/common/src/database/postgres.rs +++ b/common/src/database/postgres.rs @@ -211,6 +211,7 @@ fn row_to_subscription(row: &Row) -> Result { let connection_retry_interval: i32 = row.try_get("connection_retry_interval")?; let max_envelope_size: i32 = row.try_get("max_envelope_size")?; let max_time: i32 = row.try_get("max_time")?; + let max_elements: Option = row.try_get("max_elements")?; let princs_filter = PrincsFilter::from( row.try_get("princs_filter_op")?, @@ -226,6 +227,10 @@ fn row_to_subscription(row: &Row) -> Result { .set_connection_retry_count(connection_retry_count.try_into()?) .set_connection_retry_interval(connection_retry_interval.try_into()?) .set_max_time(max_time.try_into()?) + .set_max_elements(match max_elements { + Some(x) => Some(x.try_into()?), + None => None, + }) .set_max_envelope_size(max_envelope_size.try_into()?) .set_enabled(row.try_get("enabled")?) .set_read_existing_events(row.try_get("read_existing_events")?) @@ -614,6 +619,11 @@ impl Database for PostgresDatabase { let connection_retry_count: i32 = subscription.connection_retry_count().into(); let connection_retry_interval: i32 = subscription.connection_retry_interval().try_into()?; let max_time: i32 = subscription.max_time().try_into()?; + let max_elements: Option = match subscription.max_elements() { + Some(x) => Some(x.try_into()?), + None => None, + }; + let max_envelope_size: i32 = subscription.max_envelope_size().try_into()?; let count = self .pool @@ -622,10 +632,10 @@ impl Database for PostgresDatabase { .execute( r#"INSERT INTO subscriptions (uuid, version, revision, name, uri, query, heartbeat_interval, connection_retry_count, connection_retry_interval, - max_time, max_envelope_size, enabled, read_existing_events, content_format, + max_time, max_elements, max_envelope_size, enabled, read_existing_events, content_format, ignore_channel_error, princs_filter_op, princs_filter_value, outputs, locale, data_locale) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21) ON CONFLICT (uuid) DO UPDATE SET version = excluded.version, revision = excluded.revision, @@ -636,6 +646,7 @@ impl Database for PostgresDatabase { connection_retry_count = excluded.connection_retry_count, connection_retry_interval = excluded.connection_retry_interval, max_time = excluded.max_time, + max_elements = excluded.max_elements, max_envelope_size = excluded.max_envelope_size, enabled = excluded.enabled, read_existing_events = excluded.read_existing_events, @@ -657,6 +668,7 @@ impl Database for PostgresDatabase { &connection_retry_count, &connection_retry_interval, &max_time, + &max_elements, &max_envelope_size, &subscription.enabled(), &subscription.read_existing_events(), diff --git a/common/src/database/schema/postgres/_001_create_subscriptions_table.rs b/common/src/database/schema/postgres/_001_create_subscriptions_table.rs index ddf2a76..3e58582 100644 --- a/common/src/database/schema/postgres/_001_create_subscriptions_table.rs +++ b/common/src/database/schema/postgres/_001_create_subscriptions_table.rs @@ -20,6 +20,7 @@ impl PostgresMigration for CreateSubscriptionsTable { connection_retry_count INT4, connection_retry_interval INT4, max_time INT4, + max_elements INT4, max_envelope_size INT4, enabled BOOLEAN, read_existing_events BOOLEAN, diff --git a/common/src/database/schema/sqlite/_001_create_subscriptions_table.rs b/common/src/database/schema/sqlite/_001_create_subscriptions_table.rs index 735ab83..7a5f415 100644 --- a/common/src/database/schema/sqlite/_001_create_subscriptions_table.rs +++ b/common/src/database/schema/sqlite/_001_create_subscriptions_table.rs @@ -19,6 +19,7 @@ impl SQLiteMigration for CreateSubscriptionsTable { connection_retry_count INTEGER, connection_retry_interval INTEGER, max_time INTEGER, + max_elements INTEGER, max_envelope_size INTEGER, enabled INTEGER, read_existing_events INTEGER, diff --git a/common/src/database/sqlite.rs b/common/src/database/sqlite.rs index e105b66..ee51e54 100644 --- a/common/src/database/sqlite.rs +++ b/common/src/database/sqlite.rs @@ -175,6 +175,7 @@ fn row_to_subscription(row: &Row) -> Result { .set_connection_retry_count(row.get("connection_retry_count")?) .set_connection_retry_interval(row.get("connection_retry_interval")?) .set_max_time(row.get("max_time")?) + .set_max_elements(row.get("max_elements")?) .set_max_envelope_size(row.get("max_envelope_size")?) .set_enabled(row.get("enabled")?) .set_read_existing_events(row.get("read_existing_events")?) @@ -553,12 +554,12 @@ impl Database for SQLiteDatabase { conn.execute( r#"INSERT INTO subscriptions (uuid, version, revision, name, uri, query, heartbeat_interval, connection_retry_count, connection_retry_interval, - max_time, max_envelope_size, enabled, read_existing_events, content_format, + max_time, max_elements, max_envelope_size, enabled, read_existing_events, content_format, ignore_channel_error, princs_filter_op, princs_filter_value, outputs, locale, data_locale) VALUES (:uuid, :version, :revision, :name, :uri, :query, :heartbeat_interval, :connection_retry_count, :connection_retry_interval, - :max_time, :max_envelope_size, :enabled, :read_existing_events, :content_format, + :max_time, :max_elements, :max_envelope_size, :enabled, :read_existing_events, :content_format, :ignore_channel_error, :princs_filter_op, :princs_filter_value, :outputs, :locale, :data_locale) ON CONFLICT (uuid) DO UPDATE SET @@ -571,6 +572,7 @@ impl Database for SQLiteDatabase { connection_retry_count = excluded.connection_retry_count, connection_retry_interval = excluded.connection_retry_interval, max_time = excluded.max_time, + max_elements = excluded.max_elements, max_envelope_size = excluded.max_envelope_size, enabled = excluded.enabled, read_existing_events = excluded.read_existing_events, @@ -592,6 +594,7 @@ impl Database for SQLiteDatabase { ":connection_retry_count": subscription.connection_retry_count(), ":connection_retry_interval": subscription.connection_retry_interval(), ":max_time": subscription.max_time(), + ":max_elements": subscription.max_elements(), ":max_envelope_size": subscription.max_envelope_size(), ":enabled": subscription.enabled(), ":read_existing_events": subscription.read_existing_events(), diff --git a/common/src/models/config.rs b/common/src/models/config.rs index 2c44018..79372a3 100644 --- a/common/src/models/config.rs +++ b/common/src/models/config.rs @@ -216,6 +216,7 @@ struct SubscriptionOptions { pub connection_retry_count: Option, pub connection_retry_interval: Option, pub max_time: Option, + pub max_elements: Option, pub max_envelope_size: Option, pub enabled: Option, pub read_existing_events: Option, @@ -245,6 +246,8 @@ impl SubscriptionOptions { data.set_max_time(max_time); } + data.set_max_elements(self.max_elements); + if let Some(max_envelope_size) = self.max_envelope_size { data.set_max_envelope_size(max_envelope_size); } @@ -344,6 +347,7 @@ heartbeat_interval = 32 connection_retry_count = 11 connection_retry_interval = 12 max_time = 13 +max_elements = 15 max_envelope_size = 14 read_existing_events = false content_format = "Raw" # or RenderedText @@ -433,6 +437,7 @@ path = "/whatever/you/{ip}/want/{principal}/{ip:2}/{node}/end" .set_connection_retry_count(11) .set_connection_retry_interval(12) .set_max_time(13) + .set_max_elements(Some(15)) .set_max_envelope_size(14) .set_read_existing_events(false) .set_content_format(crate::subscription::ContentFormat::Raw) diff --git a/common/src/models/export.rs b/common/src/models/export.rs index 7d0f0de..d466448 100644 --- a/common/src/models/export.rs +++ b/common/src/models/export.rs @@ -589,6 +589,7 @@ pub mod v2 { pub connection_retry_count: u16, pub connection_retry_interval: u32, pub max_time: u32, + pub max_elements: Option, pub max_envelope_size: u32, pub enabled: bool, pub read_existing_events: bool, @@ -609,6 +610,7 @@ pub mod v2 { .set_connection_retry_count(value.connection_retry_count) .set_connection_retry_interval(value.connection_retry_interval) .set_max_time(value.max_time) + .set_max_elements(value.max_elements) .set_max_envelope_size(value.max_envelope_size) .set_enabled(value.enabled) .set_read_existing_events(value.read_existing_events) @@ -637,6 +639,7 @@ pub mod v2 { connection_retry_count: value.connection_retry_count(), connection_retry_interval: value.connection_retry_interval(), max_time: value.max_time(), + max_elements: value.max_elements(), max_envelope_size: value.max_envelope_size(), enabled: value.enabled(), read_existing_events: value.read_existing_events(), @@ -698,6 +701,7 @@ mod tests { .set_ignore_channel_error(false) .set_max_envelope_size(10000) .set_max_time(1) + .set_max_elements(Some(100)) .set_read_existing_events(false) .set_uri(Some("toto".to_string())) .set_princs_filter(crate::subscription::PrincsFilter::new( diff --git a/common/src/subscription.rs b/common/src/subscription.rs index e1ded7a..81d00ad 100644 --- a/common/src/subscription.rs +++ b/common/src/subscription.rs @@ -395,6 +395,7 @@ pub struct SubscriptionParameters { pub connection_retry_count: u16, pub connection_retry_interval: u32, pub max_time: u32, + pub max_elements: Option, pub max_envelope_size: u32, pub read_existing_events: bool, pub content_format: ContentFormat, @@ -471,6 +472,14 @@ impl Display for SubscriptionData { "\tMax time without heartbeat/events: {}s", self.max_time() )?; + writeln!( + f, + "\tMax events in a batch: {}", + match self.max_elements() { + Some(max_elements) => max_elements.to_string(), + None => "Not configured".to_string(), + } + )?; writeln!(f, "\tMax envelope size: {} bytes", self.max_envelope_size())?; writeln!(f, "\tRead existing events: {}", self.read_existing_events())?; writeln!(f, "\tContent format: {}", self.content_format())?; @@ -535,6 +544,7 @@ impl SubscriptionData { connection_retry_count: DEFAULT_CONNECTION_RETRY_COUNT, connection_retry_interval: DEFAULT_CONNECTION_RETRY_INTERVAL, max_time: DEFAULT_MAX_TIME, + max_elements: None, max_envelope_size: DEFAULT_MAX_ENVELOPE_SIZE, read_existing_events: DEFAULT_READ_EXISTING_EVENTS, content_format: DEFAULT_CONTENT_FORMAT, @@ -616,6 +626,11 @@ impl SubscriptionData { self.parameters.max_time } + /// Get a reference to the subscription's max elements. + pub fn max_elements(&self) -> Option { + self.parameters.max_elements + } + /// Get a reference to the subscription's max envelope size. pub fn max_envelope_size(&self) -> u32 { self.parameters.max_envelope_size @@ -668,6 +683,13 @@ impl SubscriptionData { self } + /// Set the subscription's max elements. + pub fn set_max_elements(&mut self, max_elements: Option) -> &mut Self { + self.parameters.max_elements = max_elements; + self.update_internal_version(); + self + } + /// Set the subscription's max envelope size. pub fn set_max_envelope_size(&mut self, max_envelope_size: u32) -> &mut Self { self.parameters.max_envelope_size = max_envelope_size; diff --git a/doc/subscription.md b/doc/subscription.md index 235cd95..dec0077 100644 --- a/doc/subscription.md +++ b/doc/subscription.md @@ -38,6 +38,7 @@ Subscriptions and their parameters are not defined in OpenWEC configuration file | `connection_retry_count` | No | 5 | Number of times the client will attempt to connect if the subscriber is unreachable. | | `connection_retry_interval` | No | 60 | Interval observed between each connection attempt if the subscriber is unreachable. | | `max_time` | No | 30 | The maximum time, in seconds, that the client should aggregate new events before sending them. | +| `max_elements` | No | *Undefined* | The maximum number of events that the client should aggregate before sending a batch. Defaults to unset, meaning that only max_time and max_envelope_size will limit the aggregation. | | `max_envelope_size` | No | 512000 | The maximum number of bytes in the SOAP envelope used to deliver the events. | | `enabled` | No | `False` | Whether the subscription is enabled or not. Not that a new subscription is **disabled** by default, and **can not** be enabled unless you configure at least one output. As a safe guard, subscriptions without outputs are ignored by openwec server. | | `read_existing_events` | No | `False` | If `True`, the event source should replay all possible events that match the filter and any events that subsequently occur for that event source. | diff --git a/server/src/logic.rs b/server/src/logic.rs index b268206..7a15076 100644 --- a/server/src/logic.rs +++ b/server/src/logic.rs @@ -200,6 +200,7 @@ async fn handle_enumerate( connection_retry_count: subscription_data.connection_retry_count(), connection_retry_interval: subscription_data.connection_retry_interval(), max_time: subscription_data.max_time(), + max_elements: subscription_data.max_elements(), max_envelope_size: subscription_data.max_envelope_size(), thumbprint: match auth_ctx { AuthenticationContext::Tls(_, thumbprint) => Some(thumbprint.clone()), diff --git a/server/src/soap.rs b/server/src/soap.rs index e7c3db1..8f78679 100644 --- a/server/src/soap.rs +++ b/server/src/soap.rs @@ -96,6 +96,7 @@ pub struct SubscriptionBody { pub connection_retry_interval: u32, pub connection_retry_count: u16, pub max_time: u32, + pub max_elements: Option, pub max_envelope_size: u32, pub thumbprint: Option, pub public_version: String, @@ -243,6 +244,11 @@ impl Serializable for SubscriptionBody { .write_text_content(BytesText::new( format!("PT{}.0S", self.connection_retry_interval).as_str(), ))?; + if let Some(max_elements) = &self.max_elements { + writer.create_element("w:MaxElements").write_text_content( + BytesText::new(format!("{}", max_elements).as_str()), + )?; + } writer.create_element("w:MaxTime").write_text_content( BytesText::new(format!("PT{}.000S", self.max_time).as_str()), )?; diff --git a/subscription.sample.toml b/subscription.sample.toml index 350d3c7..6e82630 100644 --- a/subscription.sample.toml +++ b/subscription.sample.toml @@ -40,6 +40,12 @@ query = """ # events before sending them. # max_time = 30 +# The maximum number of events that the client should aggregate before +# sending a batch. +# Defaults to unset, meaning that only max_time and max_envelope_size will +# limit the aggregation. +# max_elements = + # The maximum number of bytes in the SOAP envelope used to deliver # the events. # max_envelope_size = 512000 From 7aea0852d6b001876032214a7b34a479605f6615 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A1szl=C3=B3=20V=C3=A1rady?= Date: Wed, 23 Oct 2024 20:22:17 +0200 Subject: [PATCH 3/6] database: implement migration for max_elements --- .../_001_create_subscriptions_table.rs | 1 - ...x_elements_field_in_subscriptions_table.rs | 33 +++++++++++++++++++ common/src/database/schema/postgres/mod.rs | 5 ++- .../sqlite/_001_create_subscriptions_table.rs | 1 - ...x_elements_field_in_subscriptions_table.rs | 26 +++++++++++++++ common/src/database/schema/sqlite/mod.rs | 5 ++- 6 files changed, 67 insertions(+), 4 deletions(-) create mode 100644 common/src/database/schema/postgres/_013_add_max_elements_field_in_subscriptions_table.rs create mode 100644 common/src/database/schema/sqlite/_013_add_max_elements_field_in_subscriptions_table.rs diff --git a/common/src/database/schema/postgres/_001_create_subscriptions_table.rs b/common/src/database/schema/postgres/_001_create_subscriptions_table.rs index 3e58582..ddf2a76 100644 --- a/common/src/database/schema/postgres/_001_create_subscriptions_table.rs +++ b/common/src/database/schema/postgres/_001_create_subscriptions_table.rs @@ -20,7 +20,6 @@ impl PostgresMigration for CreateSubscriptionsTable { connection_retry_count INT4, connection_retry_interval INT4, max_time INT4, - max_elements INT4, max_envelope_size INT4, enabled BOOLEAN, read_existing_events BOOLEAN, diff --git a/common/src/database/schema/postgres/_013_add_max_elements_field_in_subscriptions_table.rs b/common/src/database/schema/postgres/_013_add_max_elements_field_in_subscriptions_table.rs new file mode 100644 index 0000000..2bc321f --- /dev/null +++ b/common/src/database/schema/postgres/_013_add_max_elements_field_in_subscriptions_table.rs @@ -0,0 +1,33 @@ +use anyhow::Result; +use async_trait::async_trait; +use deadpool_postgres::Transaction; + +use crate::{database::postgres::PostgresMigration, migration}; + +pub(super) struct AddMaxElementsFieldInSubscriptionsTable; +migration!( + AddMaxElementsFieldInSubscriptionsTable, + 13, + "add max_elements field in subscriptions table" +); + +#[async_trait] +impl PostgresMigration for AddMaxElementsFieldInSubscriptionsTable { + async fn up(&self, tx: &mut Transaction) -> Result<()> { + tx.execute( + "ALTER TABLE subscriptions ADD COLUMN IF NOT EXISTS max_elements INT4;", + &[], + ) + .await?; + Ok(()) + } + + async fn down(&self, tx: &mut Transaction) -> Result<()> { + tx.execute( + "ALTER TABLE subscriptions DROP COLUMN IF EXISTS max_elements", + &[], + ) + .await?; + Ok(()) + } +} diff --git a/common/src/database/schema/postgres/mod.rs b/common/src/database/schema/postgres/mod.rs index 985d626..5b12f95 100644 --- a/common/src/database/schema/postgres/mod.rs +++ b/common/src/database/schema/postgres/mod.rs @@ -14,7 +14,8 @@ use self::{ _009_alter_outputs_format::AlterOutputsFormat, _010_add_revision_field_in_subscriptions_table::AddRevisionFieldInSubscriptionsTable, _011_add_locale_fields_in_subscriptions_table::AddLocaleFieldsInSubscriptionsTable, - _012_alter_outputs_files_config::AlterOutputsFilesConfig + _012_alter_outputs_files_config::AlterOutputsFilesConfig, + _013_add_max_elements_field_in_subscriptions_table::AddMaxElementsFieldInSubscriptionsTable, }; mod _001_create_subscriptions_table; @@ -29,6 +30,7 @@ mod _009_alter_outputs_format; mod _010_add_revision_field_in_subscriptions_table; mod _011_add_locale_fields_in_subscriptions_table; mod _012_alter_outputs_files_config; +mod _013_add_max_elements_field_in_subscriptions_table; pub fn register_migrations(postgres_db: &mut PostgresDatabase) { postgres_db.register_migration(Arc::new(CreateSubscriptionsTable)); @@ -43,4 +45,5 @@ pub fn register_migrations(postgres_db: &mut PostgresDatabase) { postgres_db.register_migration(Arc::new(AddRevisionFieldInSubscriptionsTable)); postgres_db.register_migration(Arc::new(AddLocaleFieldsInSubscriptionsTable)); postgres_db.register_migration(Arc::new(AlterOutputsFilesConfig)); + postgres_db.register_migration(Arc::new(AddMaxElementsFieldInSubscriptionsTable)); } diff --git a/common/src/database/schema/sqlite/_001_create_subscriptions_table.rs b/common/src/database/schema/sqlite/_001_create_subscriptions_table.rs index 7a5f415..735ab83 100644 --- a/common/src/database/schema/sqlite/_001_create_subscriptions_table.rs +++ b/common/src/database/schema/sqlite/_001_create_subscriptions_table.rs @@ -19,7 +19,6 @@ impl SQLiteMigration for CreateSubscriptionsTable { connection_retry_count INTEGER, connection_retry_interval INTEGER, max_time INTEGER, - max_elements INTEGER, max_envelope_size INTEGER, enabled INTEGER, read_existing_events INTEGER, diff --git a/common/src/database/schema/sqlite/_013_add_max_elements_field_in_subscriptions_table.rs b/common/src/database/schema/sqlite/_013_add_max_elements_field_in_subscriptions_table.rs new file mode 100644 index 0000000..1500c5b --- /dev/null +++ b/common/src/database/schema/sqlite/_013_add_max_elements_field_in_subscriptions_table.rs @@ -0,0 +1,26 @@ +use anyhow::{anyhow, Result}; +use rusqlite::Connection; + +use crate::database::sqlite::SQLiteMigration; +use crate::migration; + +pub(super) struct AddMaxElementsFieldInSubscriptionsTable; +migration!( + AddMaxElementsFieldInSubscriptionsTable, + 13, + "add max_elements field in subscriptions table" +); + +impl SQLiteMigration for AddMaxElementsFieldInSubscriptionsTable { + fn up(&self, conn: &Connection) -> Result<()> { + conn.execute("ALTER TABLE subscriptions ADD COLUMN max_elements INTEGER", []) + .map_err(|err| anyhow!("SQLiteError: {}", err))?; + Ok(()) + } + + fn down(&self, conn: &Connection) -> Result<()> { + conn.execute("ALTER TABLE subscriptions DROP COLUMN max_elements", []) + .map_err(|err| anyhow!("SQLiteError: {}", err))?; + Ok(()) + } +} diff --git a/common/src/database/schema/sqlite/mod.rs b/common/src/database/schema/sqlite/mod.rs index 164ff58..c49935c 100644 --- a/common/src/database/schema/sqlite/mod.rs +++ b/common/src/database/schema/sqlite/mod.rs @@ -14,7 +14,8 @@ use self::{ _009_alter_outputs_format::AlterOutputsFormat, _010_add_revision_field_in_subscriptions_table::AddRevisionFieldInSubscriptionsTable, _011_add_locale_fields_in_subscriptions_table::AddLocaleFieldsInSubscriptionsTable, - _012_alter_outputs_files_config::AlterOutputsFilesConfig + _012_alter_outputs_files_config::AlterOutputsFilesConfig, + _013_add_max_elements_field_in_subscriptions_table::AddMaxElementsFieldInSubscriptionsTable, }; mod _001_create_subscriptions_table; @@ -29,6 +30,7 @@ mod _009_alter_outputs_format; mod _010_add_revision_field_in_subscriptions_table; mod _011_add_locale_fields_in_subscriptions_table; mod _012_alter_outputs_files_config; +mod _013_add_max_elements_field_in_subscriptions_table; pub fn register_migrations(sqlite_db: &mut SQLiteDatabase) { sqlite_db.register_migration(Arc::new(CreateSubscriptionsTable)); @@ -43,4 +45,5 @@ pub fn register_migrations(sqlite_db: &mut SQLiteDatabase) { sqlite_db.register_migration(Arc::new(AddRevisionFieldInSubscriptionsTable)); sqlite_db.register_migration(Arc::new(AddLocaleFieldsInSubscriptionsTable)); sqlite_db.register_migration(Arc::new(AlterOutputsFilesConfig)); + sqlite_db.register_migration(Arc::new(AddMaxElementsFieldInSubscriptionsTable)); } From 780e8d255a44fee015aac0fee0194c5904968582 Mon Sep 17 00:00:00 2001 From: Vincent Ruello <5345986+vruello@users.noreply.github.com> Date: Sat, 26 Oct 2024 17:22:28 +0200 Subject: [PATCH 4/6] Use max_elements in database tests --- common/src/database/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/common/src/database/mod.rs b/common/src/database/mod.rs index a4315de..26a340c 100644 --- a/common/src/database/mod.rs +++ b/common/src/database/mod.rs @@ -174,6 +174,7 @@ pub mod tests { assert_eq!(toto.revision(), None); assert_eq!(toto.data_locale(), None); assert_eq!(toto.locale(), None); + assert_eq!(toto.max_elements(), None); let toto2 = db.get_subscription_by_identifier("toto").await?.unwrap(); assert_eq!(toto, &toto2); @@ -211,7 +212,8 @@ pub mod tests { ]) .set_revision(Some("1472".to_string())) .set_locale(Some("fr-FR".to_string())) - .set_data_locale(Some("en-US".to_string())); + .set_data_locale(Some("en-US".to_string())) + .set_max_elements(Some(10)); db.store_subscription(&subscription2).await?; assert!(db.get_subscriptions().await?.len() == 2); @@ -256,6 +258,7 @@ pub mod tests { assert_eq!(tata.revision(), Some("1472".to_string()).as_ref()); assert_eq!(tata.locale(), Some("fr-FR".to_string()).as_ref()); assert_eq!(tata.data_locale(), Some("en-US".to_string()).as_ref()); + assert_eq!(tata.max_elements(), Some(10)); let tata_save = tata.clone(); tata.set_name("titi".to_string()) From b05a87548ab5a728cf4f922c41720616a62d818e Mon Sep 17 00:00:00 2001 From: Vincent Ruello <5345986+vruello@users.noreply.github.com> Date: Sat, 26 Oct 2024 17:39:25 +0200 Subject: [PATCH 5/6] Add the number of events received in debug log --- server/src/logic.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/logic.rs b/server/src/logic.rs index 7a15076..2595c3d 100644 --- a/server/src/logic.rs +++ b/server/src/logic.rs @@ -355,7 +355,8 @@ async fn handle_events( } debug!( - "Received Events from {}:{} ({}) for subscription {} ({})", + "Received {} events from {}:{} ({}) for subscription {} ({})", + events.len(), request_data.remote_addr().ip(), request_data.remote_addr().port(), request_data.principal(), From 3f985aa2854efe3b7480b01c717c63e2c0e4233c Mon Sep 17 00:00:00 2001 From: Vincent Ruello <5345986+vruello@users.noreply.github.com> Date: Sat, 26 Oct 2024 17:45:16 +0200 Subject: [PATCH 6/6] Update CHANGELOG.md --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 927fd95..e6d8f13 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Add `max_elements` subscription parameter (#185) + ## [v0.3.0] ### Added