Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable batch sizes with the max_elements subscription option #185

Merged
merged 6 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Add `max_elements` subscription parameter (#185)

## [v0.3.0]

### Added
Expand Down
6 changes: 6 additions & 0 deletions cli/src/skell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ fn get_options() -> String {
# events before sending them.
# max_time = {}

# The maximum number of events that the client should aggregate before
# sending a batch.
# Defaults to unset, meaning that only max_time and max_envelope_size will
# limit the aggregation.
# max_elements =

# The maximum number of bytes in the SOAP envelope used to deliver
# the events.
# max_envelope_size = {}
Expand Down
5 changes: 4 additions & 1 deletion common/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ pub mod tests {
assert_eq!(toto.revision(), None);
assert_eq!(toto.data_locale(), None);
assert_eq!(toto.locale(), None);
assert_eq!(toto.max_elements(), None);

let toto2 = db.get_subscription_by_identifier("toto").await?.unwrap();
assert_eq!(toto, &toto2);
Expand Down Expand Up @@ -211,7 +212,8 @@ pub mod tests {
])
.set_revision(Some("1472".to_string()))
.set_locale(Some("fr-FR".to_string()))
.set_data_locale(Some("en-US".to_string()));
.set_data_locale(Some("en-US".to_string()))
.set_max_elements(Some(10));
db.store_subscription(&subscription2).await?;

assert!(db.get_subscriptions().await?.len() == 2);
Expand Down Expand Up @@ -256,6 +258,7 @@ pub mod tests {
assert_eq!(tata.revision(), Some("1472".to_string()).as_ref());
assert_eq!(tata.locale(), Some("fr-FR".to_string()).as_ref());
assert_eq!(tata.data_locale(), Some("en-US".to_string()).as_ref());
assert_eq!(tata.max_elements(), Some(10));

let tata_save = tata.clone();
tata.set_name("titi".to_string())
Expand Down
30 changes: 21 additions & 9 deletions common/src/database/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ impl PostgresDatabase {
client
.query(
format!(
r#"SELECT *
FROM heartbeats
r#"SELECT *
FROM heartbeats
JOIN subscriptions ON subscriptions.uuid = heartbeats.subscription
WHERE {} = $1
AND subscription = $2"#,
Expand All @@ -170,8 +170,8 @@ impl PostgresDatabase {
client
.query(
format!(
r#"SELECT *
FROM heartbeats
r#"SELECT *
FROM heartbeats
JOIN subscriptions ON subscriptions.uuid = heartbeats.subscription
WHERE {} = $1"#,
field
Expand Down Expand Up @@ -211,6 +211,7 @@ fn row_to_subscription(row: &Row) -> Result<SubscriptionData> {
let connection_retry_interval: i32 = row.try_get("connection_retry_interval")?;
let max_envelope_size: i32 = row.try_get("max_envelope_size")?;
let max_time: i32 = row.try_get("max_time")?;
let max_elements: Option<i32> = row.try_get("max_elements")?;

let princs_filter = PrincsFilter::from(
row.try_get("princs_filter_op")?,
Expand All @@ -226,6 +227,10 @@ fn row_to_subscription(row: &Row) -> Result<SubscriptionData> {
.set_connection_retry_count(connection_retry_count.try_into()?)
.set_connection_retry_interval(connection_retry_interval.try_into()?)
.set_max_time(max_time.try_into()?)
.set_max_elements(match max_elements {
Some(x) => Some(x.try_into()?),
None => None,
})
.set_max_envelope_size(max_envelope_size.try_into()?)
.set_enabled(row.try_get("enabled")?)
.set_read_existing_events(row.try_get("read_existing_events")?)
Expand Down Expand Up @@ -572,7 +577,7 @@ impl Database for PostgresDatabase {
.await?
.query(
r#"
SELECT *
SELECT *
FROM subscriptions
"#,
&[],
Expand All @@ -596,7 +601,7 @@ impl Database for PostgresDatabase {
.get()
.await?
.query_opt(
r#"SELECT *
r#"SELECT *
FROM subscriptions
WHERE uuid = $1 OR name = $1"#,
&[&identifier],
Expand All @@ -614,6 +619,11 @@ impl Database for PostgresDatabase {
let connection_retry_count: i32 = subscription.connection_retry_count().into();
let connection_retry_interval: i32 = subscription.connection_retry_interval().try_into()?;
let max_time: i32 = subscription.max_time().try_into()?;
let max_elements: Option<i32> = match subscription.max_elements() {
Some(x) => Some(x.try_into()?),
None => None,
};

let max_envelope_size: i32 = subscription.max_envelope_size().try_into()?;
let count = self
.pool
Expand All @@ -622,11 +632,11 @@ impl Database for PostgresDatabase {
.execute(
r#"INSERT INTO subscriptions (uuid, version, revision, name, uri, query,
heartbeat_interval, connection_retry_count, connection_retry_interval,
max_time, max_envelope_size, enabled, read_existing_events, content_format,
max_time, max_elements, max_envelope_size, enabled, read_existing_events, content_format,
ignore_channel_error, princs_filter_op, princs_filter_value, outputs, locale,
data_locale)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20)
ON CONFLICT (uuid) DO UPDATE SET
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21)
ON CONFLICT (uuid) DO UPDATE SET
version = excluded.version,
revision = excluded.revision,
name = excluded.name,
Expand All @@ -636,6 +646,7 @@ impl Database for PostgresDatabase {
connection_retry_count = excluded.connection_retry_count,
connection_retry_interval = excluded.connection_retry_interval,
max_time = excluded.max_time,
max_elements = excluded.max_elements,
max_envelope_size = excluded.max_envelope_size,
enabled = excluded.enabled,
read_existing_events = excluded.read_existing_events,
Expand All @@ -657,6 +668,7 @@ impl Database for PostgresDatabase {
&connection_retry_count,
&connection_retry_interval,
&max_time,
&max_elements,
&max_envelope_size,
&subscription.enabled(),
&subscription.read_existing_events(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use anyhow::Result;
use async_trait::async_trait;
use deadpool_postgres::Transaction;

use crate::{database::postgres::PostgresMigration, migration};

pub(super) struct AddMaxElementsFieldInSubscriptionsTable;
migration!(
AddMaxElementsFieldInSubscriptionsTable,
13,
"add max_elements field in subscriptions table"
);

#[async_trait]
impl PostgresMigration for AddMaxElementsFieldInSubscriptionsTable {
async fn up(&self, tx: &mut Transaction) -> Result<()> {
tx.execute(
"ALTER TABLE subscriptions ADD COLUMN IF NOT EXISTS max_elements INT4;",
&[],
)
.await?;
Ok(())
}

async fn down(&self, tx: &mut Transaction) -> Result<()> {
tx.execute(
"ALTER TABLE subscriptions DROP COLUMN IF EXISTS max_elements",
&[],
)
.await?;
Ok(())
}
}
5 changes: 4 additions & 1 deletion common/src/database/schema/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use self::{
_009_alter_outputs_format::AlterOutputsFormat,
_010_add_revision_field_in_subscriptions_table::AddRevisionFieldInSubscriptionsTable,
_011_add_locale_fields_in_subscriptions_table::AddLocaleFieldsInSubscriptionsTable,
_012_alter_outputs_files_config::AlterOutputsFilesConfig
_012_alter_outputs_files_config::AlterOutputsFilesConfig,
_013_add_max_elements_field_in_subscriptions_table::AddMaxElementsFieldInSubscriptionsTable,
};

mod _001_create_subscriptions_table;
Expand All @@ -29,6 +30,7 @@ mod _009_alter_outputs_format;
mod _010_add_revision_field_in_subscriptions_table;
mod _011_add_locale_fields_in_subscriptions_table;
mod _012_alter_outputs_files_config;
mod _013_add_max_elements_field_in_subscriptions_table;

pub fn register_migrations(postgres_db: &mut PostgresDatabase) {
postgres_db.register_migration(Arc::new(CreateSubscriptionsTable));
Expand All @@ -43,4 +45,5 @@ pub fn register_migrations(postgres_db: &mut PostgresDatabase) {
postgres_db.register_migration(Arc::new(AddRevisionFieldInSubscriptionsTable));
postgres_db.register_migration(Arc::new(AddLocaleFieldsInSubscriptionsTable));
postgres_db.register_migration(Arc::new(AlterOutputsFilesConfig));
postgres_db.register_migration(Arc::new(AddMaxElementsFieldInSubscriptionsTable));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use anyhow::{anyhow, Result};
use rusqlite::Connection;

use crate::database::sqlite::SQLiteMigration;
use crate::migration;

pub(super) struct AddMaxElementsFieldInSubscriptionsTable;
migration!(
AddMaxElementsFieldInSubscriptionsTable,
13,
"add max_elements field in subscriptions table"
);

impl SQLiteMigration for AddMaxElementsFieldInSubscriptionsTable {
fn up(&self, conn: &Connection) -> Result<()> {
conn.execute("ALTER TABLE subscriptions ADD COLUMN max_elements INTEGER", [])
.map_err(|err| anyhow!("SQLiteError: {}", err))?;
Ok(())
}

fn down(&self, conn: &Connection) -> Result<()> {
conn.execute("ALTER TABLE subscriptions DROP COLUMN max_elements", [])
.map_err(|err| anyhow!("SQLiteError: {}", err))?;
Ok(())
}
}
5 changes: 4 additions & 1 deletion common/src/database/schema/sqlite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use self::{
_009_alter_outputs_format::AlterOutputsFormat,
_010_add_revision_field_in_subscriptions_table::AddRevisionFieldInSubscriptionsTable,
_011_add_locale_fields_in_subscriptions_table::AddLocaleFieldsInSubscriptionsTable,
_012_alter_outputs_files_config::AlterOutputsFilesConfig
_012_alter_outputs_files_config::AlterOutputsFilesConfig,
_013_add_max_elements_field_in_subscriptions_table::AddMaxElementsFieldInSubscriptionsTable,
};

mod _001_create_subscriptions_table;
Expand All @@ -29,6 +30,7 @@ mod _009_alter_outputs_format;
mod _010_add_revision_field_in_subscriptions_table;
mod _011_add_locale_fields_in_subscriptions_table;
mod _012_alter_outputs_files_config;
mod _013_add_max_elements_field_in_subscriptions_table;

pub fn register_migrations(sqlite_db: &mut SQLiteDatabase) {
sqlite_db.register_migration(Arc::new(CreateSubscriptionsTable));
Expand All @@ -43,4 +45,5 @@ pub fn register_migrations(sqlite_db: &mut SQLiteDatabase) {
sqlite_db.register_migration(Arc::new(AddRevisionFieldInSubscriptionsTable));
sqlite_db.register_migration(Arc::new(AddLocaleFieldsInSubscriptionsTable));
sqlite_db.register_migration(Arc::new(AlterOutputsFilesConfig));
sqlite_db.register_migration(Arc::new(AddMaxElementsFieldInSubscriptionsTable));
}
31 changes: 17 additions & 14 deletions common/src/database/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@
// license (MIT), we include below its copyright notice and permission notice:
//
// The MIT License (MIT)
//
//
// Copyright (c) 2015 Skyler Lipthay
//
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
Expand Down Expand Up @@ -125,7 +125,7 @@ impl SQLiteDatabase {
)?;
let rows = statement.query_and_then(&[(":field_value", &field_value), (":subscription", &value)], row_to_heartbeat)?;

let mut heartbeats = Vec::new();
let mut heartbeats = Vec::new();
for heartbeat in rows {
heartbeats.push(heartbeat?);
}
Expand All @@ -134,15 +134,15 @@ impl SQLiteDatabase {
let mut statement = conn.prepare(
format!(
r#"SELECT *
FROM heartbeats
FROM heartbeats
JOIN subscriptions ON subscriptions.uuid = heartbeats.subscription
WHERE {} = :field_value"#,
field
)
.as_str()
)?;
let rows = statement.query_and_then(&[(":field_value", &field_value)], row_to_heartbeat)?;
let mut heartbeats = Vec::new();
let mut heartbeats = Vec::new();
for heartbeat in rows {
heartbeats.push(heartbeat?);
}
Expand Down Expand Up @@ -175,6 +175,7 @@ fn row_to_subscription(row: &Row) -> Result<SubscriptionData> {
.set_connection_retry_count(row.get("connection_retry_count")?)
.set_connection_retry_interval(row.get("connection_retry_interval")?)
.set_max_time(row.get("max_time")?)
.set_max_elements(row.get("max_elements")?)
.set_max_envelope_size(row.get("max_envelope_size")?)
.set_enabled(row.get("enabled")?)
.set_read_existing_events(row.get("read_existing_events")?)
Expand Down Expand Up @@ -321,12 +322,12 @@ impl Database for SQLiteDatabase {
(None, None) => {
client.interact(move |conn| {
conn.execute("DELETE FROM bookmarks", [])
}).await
}).await
}
};
future.map_err(|err| anyhow!(format!("{}", err)))??;
Ok(())

}

async fn get_heartbeats_by_machine(
Expand Down Expand Up @@ -469,7 +470,7 @@ impl Database for SQLiteDatabase {
for (key, value) in heartbeats_cloned {
match value.last_event_seen {
Some(last_event_seen) => {
query_with_event
query_with_event
.execute(
params![
&key.machine,
Expand Down Expand Up @@ -553,15 +554,15 @@ impl Database for SQLiteDatabase {
conn.execute(
r#"INSERT INTO subscriptions (uuid, version, revision, name, uri, query,
heartbeat_interval, connection_retry_count, connection_retry_interval,
max_time, max_envelope_size, enabled, read_existing_events, content_format,
max_time, max_elements, max_envelope_size, enabled, read_existing_events, content_format,
ignore_channel_error, princs_filter_op, princs_filter_value, outputs, locale,
data_locale)
VALUES (:uuid, :version, :revision, :name, :uri, :query,
:heartbeat_interval, :connection_retry_count, :connection_retry_interval,
:max_time, :max_envelope_size, :enabled, :read_existing_events, :content_format,
:max_time, :max_elements, :max_envelope_size, :enabled, :read_existing_events, :content_format,
:ignore_channel_error, :princs_filter_op, :princs_filter_value, :outputs,
:locale, :data_locale)
ON CONFLICT (uuid) DO UPDATE SET
ON CONFLICT (uuid) DO UPDATE SET
version = excluded.version,
revision = excluded.revision,
name = excluded.name,
Expand All @@ -571,6 +572,7 @@ impl Database for SQLiteDatabase {
connection_retry_count = excluded.connection_retry_count,
connection_retry_interval = excluded.connection_retry_interval,
max_time = excluded.max_time,
max_elements = excluded.max_elements,
max_envelope_size = excluded.max_envelope_size,
enabled = excluded.enabled,
read_existing_events = excluded.read_existing_events,
Expand All @@ -592,6 +594,7 @@ impl Database for SQLiteDatabase {
":connection_retry_count": subscription.connection_retry_count(),
":connection_retry_interval": subscription.connection_retry_interval(),
":max_time": subscription.max_time(),
":max_elements": subscription.max_elements(),
":max_envelope_size": subscription.max_envelope_size(),
":enabled": subscription.enabled(),
":read_existing_events": subscription.read_existing_events(),
Expand Down Expand Up @@ -776,7 +779,7 @@ impl Database for SQLiteDatabase {
.interact(move |conn| {
conn.query_row(
r#"SELECT COUNT(machine)
FROM heartbeats
FROM heartbeats
WHERE subscription = :subscription"#,
&[(":subscription", &subscription_owned)],
|row| row.get(0),
Expand Down
Loading