Skip to content

Commit

Permalink
Merge pull request cea-sec#185 from MrAnno/max-elements
Browse files Browse the repository at this point in the history
Configurable batch sizes with the `max_elements` subscription option
  • Loading branch information
vruello authored Oct 26, 2024
2 parents c6d4ae7 + 3f985aa commit 53ee64f
Show file tree
Hide file tree
Showing 16 changed files with 181 additions and 42 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Add `max_elements` subscription parameter (#185)

## [v0.3.0]

### Added
Expand Down
6 changes: 6 additions & 0 deletions cli/src/skell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ fn get_options() -> String {
# events before sending them.
# max_time = {}
# The maximum number of events that the client should aggregate before
# sending a batch.
# Defaults to unset, meaning that only max_time and max_envelope_size will
# limit the aggregation.
# max_elements =
# The maximum number of bytes in the SOAP envelope used to deliver
# the events.
# max_envelope_size = {}
Expand Down
5 changes: 4 additions & 1 deletion common/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ pub mod tests {
assert_eq!(toto.revision(), None);
assert_eq!(toto.data_locale(), None);
assert_eq!(toto.locale(), None);
assert_eq!(toto.max_elements(), None);

let toto2 = db.get_subscription_by_identifier("toto").await?.unwrap();
assert_eq!(toto, &toto2);
Expand Down Expand Up @@ -211,7 +212,8 @@ pub mod tests {
])
.set_revision(Some("1472".to_string()))
.set_locale(Some("fr-FR".to_string()))
.set_data_locale(Some("en-US".to_string()));
.set_data_locale(Some("en-US".to_string()))
.set_max_elements(Some(10));
db.store_subscription(&subscription2).await?;

assert!(db.get_subscriptions().await?.len() == 2);
Expand Down Expand Up @@ -256,6 +258,7 @@ pub mod tests {
assert_eq!(tata.revision(), Some("1472".to_string()).as_ref());
assert_eq!(tata.locale(), Some("fr-FR".to_string()).as_ref());
assert_eq!(tata.data_locale(), Some("en-US".to_string()).as_ref());
assert_eq!(tata.max_elements(), Some(10));

let tata_save = tata.clone();
tata.set_name("titi".to_string())
Expand Down
30 changes: 21 additions & 9 deletions common/src/database/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ impl PostgresDatabase {
client
.query(
format!(
r#"SELECT *
FROM heartbeats
r#"SELECT *
FROM heartbeats
JOIN subscriptions ON subscriptions.uuid = heartbeats.subscription
WHERE {} = $1
AND subscription = $2"#,
Expand All @@ -170,8 +170,8 @@ impl PostgresDatabase {
client
.query(
format!(
r#"SELECT *
FROM heartbeats
r#"SELECT *
FROM heartbeats
JOIN subscriptions ON subscriptions.uuid = heartbeats.subscription
WHERE {} = $1"#,
field
Expand Down Expand Up @@ -211,6 +211,7 @@ fn row_to_subscription(row: &Row) -> Result<SubscriptionData> {
let connection_retry_interval: i32 = row.try_get("connection_retry_interval")?;
let max_envelope_size: i32 = row.try_get("max_envelope_size")?;
let max_time: i32 = row.try_get("max_time")?;
let max_elements: Option<i32> = row.try_get("max_elements")?;

let princs_filter = PrincsFilter::from(
row.try_get("princs_filter_op")?,
Expand All @@ -226,6 +227,10 @@ fn row_to_subscription(row: &Row) -> Result<SubscriptionData> {
.set_connection_retry_count(connection_retry_count.try_into()?)
.set_connection_retry_interval(connection_retry_interval.try_into()?)
.set_max_time(max_time.try_into()?)
.set_max_elements(match max_elements {
Some(x) => Some(x.try_into()?),
None => None,
})
.set_max_envelope_size(max_envelope_size.try_into()?)
.set_enabled(row.try_get("enabled")?)
.set_read_existing_events(row.try_get("read_existing_events")?)
Expand Down Expand Up @@ -572,7 +577,7 @@ impl Database for PostgresDatabase {
.await?
.query(
r#"
SELECT *
SELECT *
FROM subscriptions
"#,
&[],
Expand All @@ -596,7 +601,7 @@ impl Database for PostgresDatabase {
.get()
.await?
.query_opt(
r#"SELECT *
r#"SELECT *
FROM subscriptions
WHERE uuid = $1 OR name = $1"#,
&[&identifier],
Expand All @@ -614,6 +619,11 @@ impl Database for PostgresDatabase {
let connection_retry_count: i32 = subscription.connection_retry_count().into();
let connection_retry_interval: i32 = subscription.connection_retry_interval().try_into()?;
let max_time: i32 = subscription.max_time().try_into()?;
let max_elements: Option<i32> = match subscription.max_elements() {
Some(x) => Some(x.try_into()?),
None => None,
};

let max_envelope_size: i32 = subscription.max_envelope_size().try_into()?;
let count = self
.pool
Expand All @@ -622,11 +632,11 @@ impl Database for PostgresDatabase {
.execute(
r#"INSERT INTO subscriptions (uuid, version, revision, name, uri, query,
heartbeat_interval, connection_retry_count, connection_retry_interval,
max_time, max_envelope_size, enabled, read_existing_events, content_format,
max_time, max_elements, max_envelope_size, enabled, read_existing_events, content_format,
ignore_channel_error, princs_filter_op, princs_filter_value, outputs, locale,
data_locale)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20)
ON CONFLICT (uuid) DO UPDATE SET
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21)
ON CONFLICT (uuid) DO UPDATE SET
version = excluded.version,
revision = excluded.revision,
name = excluded.name,
Expand All @@ -636,6 +646,7 @@ impl Database for PostgresDatabase {
connection_retry_count = excluded.connection_retry_count,
connection_retry_interval = excluded.connection_retry_interval,
max_time = excluded.max_time,
max_elements = excluded.max_elements,
max_envelope_size = excluded.max_envelope_size,
enabled = excluded.enabled,
read_existing_events = excluded.read_existing_events,
Expand All @@ -657,6 +668,7 @@ impl Database for PostgresDatabase {
&connection_retry_count,
&connection_retry_interval,
&max_time,
&max_elements,
&max_envelope_size,
&subscription.enabled(),
&subscription.read_existing_events(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use anyhow::Result;
use async_trait::async_trait;
use deadpool_postgres::Transaction;

use crate::{database::postgres::PostgresMigration, migration};

pub(super) struct AddMaxElementsFieldInSubscriptionsTable;
migration!(
AddMaxElementsFieldInSubscriptionsTable,
13,
"add max_elements field in subscriptions table"
);

#[async_trait]
impl PostgresMigration for AddMaxElementsFieldInSubscriptionsTable {
async fn up(&self, tx: &mut Transaction) -> Result<()> {
tx.execute(
"ALTER TABLE subscriptions ADD COLUMN IF NOT EXISTS max_elements INT4;",
&[],
)
.await?;
Ok(())
}

async fn down(&self, tx: &mut Transaction) -> Result<()> {
tx.execute(
"ALTER TABLE subscriptions DROP COLUMN IF EXISTS max_elements",
&[],
)
.await?;
Ok(())
}
}
5 changes: 4 additions & 1 deletion common/src/database/schema/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use self::{
_009_alter_outputs_format::AlterOutputsFormat,
_010_add_revision_field_in_subscriptions_table::AddRevisionFieldInSubscriptionsTable,
_011_add_locale_fields_in_subscriptions_table::AddLocaleFieldsInSubscriptionsTable,
_012_alter_outputs_files_config::AlterOutputsFilesConfig
_012_alter_outputs_files_config::AlterOutputsFilesConfig,
_013_add_max_elements_field_in_subscriptions_table::AddMaxElementsFieldInSubscriptionsTable,
};

mod _001_create_subscriptions_table;
Expand All @@ -29,6 +30,7 @@ mod _009_alter_outputs_format;
mod _010_add_revision_field_in_subscriptions_table;
mod _011_add_locale_fields_in_subscriptions_table;
mod _012_alter_outputs_files_config;
mod _013_add_max_elements_field_in_subscriptions_table;

pub fn register_migrations(postgres_db: &mut PostgresDatabase) {
postgres_db.register_migration(Arc::new(CreateSubscriptionsTable));
Expand All @@ -43,4 +45,5 @@ pub fn register_migrations(postgres_db: &mut PostgresDatabase) {
postgres_db.register_migration(Arc::new(AddRevisionFieldInSubscriptionsTable));
postgres_db.register_migration(Arc::new(AddLocaleFieldsInSubscriptionsTable));
postgres_db.register_migration(Arc::new(AlterOutputsFilesConfig));
postgres_db.register_migration(Arc::new(AddMaxElementsFieldInSubscriptionsTable));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use anyhow::{anyhow, Result};
use rusqlite::Connection;

use crate::database::sqlite::SQLiteMigration;
use crate::migration;

pub(super) struct AddMaxElementsFieldInSubscriptionsTable;
migration!(
AddMaxElementsFieldInSubscriptionsTable,
13,
"add max_elements field in subscriptions table"
);

impl SQLiteMigration for AddMaxElementsFieldInSubscriptionsTable {
fn up(&self, conn: &Connection) -> Result<()> {
conn.execute("ALTER TABLE subscriptions ADD COLUMN max_elements INTEGER", [])
.map_err(|err| anyhow!("SQLiteError: {}", err))?;
Ok(())
}

fn down(&self, conn: &Connection) -> Result<()> {
conn.execute("ALTER TABLE subscriptions DROP COLUMN max_elements", [])
.map_err(|err| anyhow!("SQLiteError: {}", err))?;
Ok(())
}
}
5 changes: 4 additions & 1 deletion common/src/database/schema/sqlite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use self::{
_009_alter_outputs_format::AlterOutputsFormat,
_010_add_revision_field_in_subscriptions_table::AddRevisionFieldInSubscriptionsTable,
_011_add_locale_fields_in_subscriptions_table::AddLocaleFieldsInSubscriptionsTable,
_012_alter_outputs_files_config::AlterOutputsFilesConfig
_012_alter_outputs_files_config::AlterOutputsFilesConfig,
_013_add_max_elements_field_in_subscriptions_table::AddMaxElementsFieldInSubscriptionsTable,
};

mod _001_create_subscriptions_table;
Expand All @@ -29,6 +30,7 @@ mod _009_alter_outputs_format;
mod _010_add_revision_field_in_subscriptions_table;
mod _011_add_locale_fields_in_subscriptions_table;
mod _012_alter_outputs_files_config;
mod _013_add_max_elements_field_in_subscriptions_table;

pub fn register_migrations(sqlite_db: &mut SQLiteDatabase) {
sqlite_db.register_migration(Arc::new(CreateSubscriptionsTable));
Expand All @@ -43,4 +45,5 @@ pub fn register_migrations(sqlite_db: &mut SQLiteDatabase) {
sqlite_db.register_migration(Arc::new(AddRevisionFieldInSubscriptionsTable));
sqlite_db.register_migration(Arc::new(AddLocaleFieldsInSubscriptionsTable));
sqlite_db.register_migration(Arc::new(AlterOutputsFilesConfig));
sqlite_db.register_migration(Arc::new(AddMaxElementsFieldInSubscriptionsTable));
}
31 changes: 17 additions & 14 deletions common/src/database/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@
// license (MIT), we include below its copyright notice and permission notice:
//
// The MIT License (MIT)
//
//
// Copyright (c) 2015 Skyler Lipthay
//
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
Expand Down Expand Up @@ -125,7 +125,7 @@ impl SQLiteDatabase {
)?;
let rows = statement.query_and_then(&[(":field_value", &field_value), (":subscription", &value)], row_to_heartbeat)?;

let mut heartbeats = Vec::new();
let mut heartbeats = Vec::new();
for heartbeat in rows {
heartbeats.push(heartbeat?);
}
Expand All @@ -134,15 +134,15 @@ impl SQLiteDatabase {
let mut statement = conn.prepare(
format!(
r#"SELECT *
FROM heartbeats
FROM heartbeats
JOIN subscriptions ON subscriptions.uuid = heartbeats.subscription
WHERE {} = :field_value"#,
field
)
.as_str()
)?;
let rows = statement.query_and_then(&[(":field_value", &field_value)], row_to_heartbeat)?;
let mut heartbeats = Vec::new();
let mut heartbeats = Vec::new();
for heartbeat in rows {
heartbeats.push(heartbeat?);
}
Expand Down Expand Up @@ -175,6 +175,7 @@ fn row_to_subscription(row: &Row) -> Result<SubscriptionData> {
.set_connection_retry_count(row.get("connection_retry_count")?)
.set_connection_retry_interval(row.get("connection_retry_interval")?)
.set_max_time(row.get("max_time")?)
.set_max_elements(row.get("max_elements")?)
.set_max_envelope_size(row.get("max_envelope_size")?)
.set_enabled(row.get("enabled")?)
.set_read_existing_events(row.get("read_existing_events")?)
Expand Down Expand Up @@ -321,12 +322,12 @@ impl Database for SQLiteDatabase {
(None, None) => {
client.interact(move |conn| {
conn.execute("DELETE FROM bookmarks", [])
}).await
}).await
}
};
future.map_err(|err| anyhow!(format!("{}", err)))??;
Ok(())

}

async fn get_heartbeats_by_machine(
Expand Down Expand Up @@ -469,7 +470,7 @@ impl Database for SQLiteDatabase {
for (key, value) in heartbeats_cloned {
match value.last_event_seen {
Some(last_event_seen) => {
query_with_event
query_with_event
.execute(
params![
&key.machine,
Expand Down Expand Up @@ -553,15 +554,15 @@ impl Database for SQLiteDatabase {
conn.execute(
r#"INSERT INTO subscriptions (uuid, version, revision, name, uri, query,
heartbeat_interval, connection_retry_count, connection_retry_interval,
max_time, max_envelope_size, enabled, read_existing_events, content_format,
max_time, max_elements, max_envelope_size, enabled, read_existing_events, content_format,
ignore_channel_error, princs_filter_op, princs_filter_value, outputs, locale,
data_locale)
VALUES (:uuid, :version, :revision, :name, :uri, :query,
:heartbeat_interval, :connection_retry_count, :connection_retry_interval,
:max_time, :max_envelope_size, :enabled, :read_existing_events, :content_format,
:max_time, :max_elements, :max_envelope_size, :enabled, :read_existing_events, :content_format,
:ignore_channel_error, :princs_filter_op, :princs_filter_value, :outputs,
:locale, :data_locale)
ON CONFLICT (uuid) DO UPDATE SET
ON CONFLICT (uuid) DO UPDATE SET
version = excluded.version,
revision = excluded.revision,
name = excluded.name,
Expand All @@ -571,6 +572,7 @@ impl Database for SQLiteDatabase {
connection_retry_count = excluded.connection_retry_count,
connection_retry_interval = excluded.connection_retry_interval,
max_time = excluded.max_time,
max_elements = excluded.max_elements,
max_envelope_size = excluded.max_envelope_size,
enabled = excluded.enabled,
read_existing_events = excluded.read_existing_events,
Expand All @@ -592,6 +594,7 @@ impl Database for SQLiteDatabase {
":connection_retry_count": subscription.connection_retry_count(),
":connection_retry_interval": subscription.connection_retry_interval(),
":max_time": subscription.max_time(),
":max_elements": subscription.max_elements(),
":max_envelope_size": subscription.max_envelope_size(),
":enabled": subscription.enabled(),
":read_existing_events": subscription.read_existing_events(),
Expand Down Expand Up @@ -776,7 +779,7 @@ impl Database for SQLiteDatabase {
.interact(move |conn| {
conn.query_row(
r#"SELECT COUNT(machine)
FROM heartbeats
FROM heartbeats
WHERE subscription = :subscription"#,
&[(":subscription", &subscription_owned)],
|row| row.get(0),
Expand Down
Loading

0 comments on commit 53ee64f

Please sign in to comment.