Skip to content

Commit

Permalink
feat: support array of events in frontend (#76)
Browse files Browse the repository at this point in the history
* feat: support array of events in frontend

* test: cover receiving array of events

* fix: manager doesnt account for all possible result from foundchannel

* chore: bump release
  • Loading branch information
mmta authored Apr 2, 2024
1 parent 90945c2 commit abf787e
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 36 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ default-members = [
]

[workspace.package]
version = "1.4.3"
version = "1.5.0"
authors = ["Dsiem Authors"]
description = "OSSIM-style event correlation engine for ELK stack"
documentation = "https://github.com/defenxor/dsiem/blob/master/docs/README.md"
Expand Down
18 changes: 14 additions & 4 deletions server/src/backlog/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ impl BacklogManager {
let mut backlogs = self.backlogs.write().await;

debug!(directive.id = self.id, event.id, "total backlogs {}", backlogs.len());
// all backlogs should report not found before we create a new one
// if there's one that matches, we should exit immediately and the notfound count will not
// match backlog.len(). the same is true when there's one that timed out
let mut notfound_count = 0;

tracer::store_parent_into_event(&backlog_mgr_span, &mut event);

Expand All @@ -307,11 +311,13 @@ impl BacklogManager {

mgr_report.timedout_backlogs = 0;


// check the result, break as soon as there's a match
for b in backlogs.iter() {
let mut v = b.found_channel.locked_rx.lock().await;
if timeout(timeout_duration, v.changed()).await.is_ok() {
let val = v.borrow();
// found and event.id matches => no new backlog
if val.0 && val.1 == event.id {
match_found = true;
debug!(
Expand All @@ -322,6 +328,10 @@ impl BacklogManager {
);
break;
}
if !val.0 && val.1 == event.id {
debug!("not found count: {}", notfound_count);
notfound_count += 1;
}
} else {
mgr_report.timedout_backlogs += 1;
debug!(
Expand Down Expand Up @@ -349,11 +359,11 @@ impl BacklogManager {
continue;
}

// timeout should not be treated as no match since it could trigger a duplicate backlog
if mgr_report.timedout_backlogs > 0 {
if notfound_count != backlogs.len() {
debug!(
directive.id = self.id, event.id,
"{} backlog timeouts, skip creating new backlog based on this event",
directive.id = self.id,
event.id,
"not all backlogs reported not found (timed-out: {}), skip creating new backlog",
mgr_report.timedout_backlogs
);
continue;
Expand Down
92 changes: 63 additions & 29 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::sync::broadcast::Sender;
use tower_http::{services::ServeDir, timeout::TimeoutLayer};
use tracing::{debug, info, info_span, trace, warn};
use tracing::{debug, error, info, info_span, trace, warn};

use crate::{eps_limiter::EpsLimiter, event::NormalizedEvent, tracer, utils};

Expand Down Expand Up @@ -177,39 +177,59 @@ pub async fn config_delete_handler(
pub async fn events_handler(
State(state): State<AppState>,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
JsonExtractor(mut event): JsonExtractor<NormalizedEvent>,
JsonExtractor(value): JsonExtractor<Value>,
) -> Result<(), AppError> {
if let Some(limiter) = &state.eps_limiter.as_ref().limiter {
let limiter = limiter.read().await;
debug!("max_tokens: {}, available: {}", limiter.max_tokens(), limiter.available());
if limiter.try_wait().is_err() {
return Err(AppError::new(StatusCode::TOO_MANY_REQUESTS, "EPS rate limit reached\n"));
let events = if !value.is_array() {
let event: NormalizedEvent = serde_json::from_value(value).map_err(|e| {
let s = e.to_string();
error!("cannot read event, json parse error: {}", s);
AppError::new(StatusCode::BAD_REQUEST, &s)
})?;
Vec::from([event])
} else {
serde_json::from_value(value).map_err(|e| {
let s = e.to_string();
error!("cannot read events, json parse error: {}", s);
AppError::new(StatusCode::BAD_REQUEST, &s)
})?
};

// println!("value: {}", serde_json::to_string_pretty(&value).unwrap());
debug!("received {} events from {}", events.len(), addr.to_string());

for mut event in events {
if let Some(limiter) = &state.eps_limiter.as_ref().limiter {
let limiter = limiter.read().await;
debug!("max_tokens: {}, available: {}", limiter.max_tokens(), limiter.available());
if limiter.try_wait().is_err() {
return Err(AppError::new(StatusCode::TOO_MANY_REQUESTS, "EPS rate limit reached\n"));
}
}
}

state.conn_counter.as_ref().inc();
let conn_id = state.conn_counter.as_ref().get();
event.conn_id = conn_id as u64;
state.conn_counter.as_ref().inc();
let conn_id = state.conn_counter.as_ref().get();
event.conn_id = conn_id as u64;

trace!("event received: {:?}", event);
let span = info_span!("frontend handler", conn.id = conn_id, event.id);
tracer::store_parent_into_event(&span, &mut event);
trace!("event received: {:?}", event);
let span = info_span!("frontend handler", conn.id = conn_id, event.id);
tracer::store_parent_into_event(&span, &mut event);

if !event.valid() {
warn!(event.id, "l337 or epic fail attempt from {} detected, discarding event", addr.to_string());
return Err(AppError::new(StatusCode::IM_A_TEAPOT, "Invalid event\n"));
}
let now = Utc::now();
if let Some(n) = now.timestamp_nanos_opt() {
event.rcvd_time = n;
}
if !event.valid() {
warn!(event.id, "l337 or epic fail attempt from {} detected, discarding event", addr.to_string());
return Err(AppError::new(StatusCode::IM_A_TEAPOT, "Invalid event\n"));
}
let now = Utc::now();
if let Some(n) = now.timestamp_nanos_opt() {
event.rcvd_time = n;
}

debug!(event.id, conn.id = event.conn_id, "sending event to nats");
debug!(event.id, conn.id = event.conn_id, "sending event to nats");

state
.event_tx
.send(event)
.map_err(|e| AppError::new(StatusCode::INTERNAL_SERVER_ERROR, &format!("error sending to NATS: {}", e)))?;
state
.event_tx
.send(event)
.map_err(|e| AppError::new(StatusCode::INTERNAL_SERVER_ERROR, &format!("error sending to NATS: {}", e)))?;
}
Ok(())
}

Expand All @@ -232,7 +252,7 @@ mod tests {
#[tokio::test]
#[traced_test]
async fn test_event_handler() {
let eps_limiter = Arc::new(EpsLimiter::new(2, 2).unwrap());
let eps_limiter = Arc::new(EpsLimiter::new(4, 4).unwrap());
let (event_tx, mut event_rx) = tokio::sync::broadcast::channel(5);
let mut app = app(true, true, eps_limiter, event_tx)
.unwrap()
Expand All @@ -248,7 +268,7 @@ mod tests {
.body(b)
.unwrap();
let response = app.ready().await.unwrap().call(request).await.expect("request failed");
assert!(response.status() == StatusCode::INTERNAL_SERVER_ERROR); // content accepted
assert!(response.status() == StatusCode::BAD_REQUEST);

let evt = json!({
"event_id": "id1",
Expand Down Expand Up @@ -293,6 +313,20 @@ mod tests {
let response = app.ready().await.unwrap().call(request).await.expect("request failed");
assert!(response.status() == StatusCode::OK); // event accepted

// test multiple events
let events = Vec::from([evt.clone(), evt.clone()]);
let b = Body::from(serde_json::to_vec(&events).unwrap());
let request = Request::builder()
.uri("/events")
.header(http::header::CONTENT_TYPE, "Application/Json")
.method(http::Method::POST)
.body(b)
.unwrap();
let response = app.ready().await.unwrap().call(request).await.expect("request failed");
assert!(response.status() == StatusCode::OK); // event accepted

// test eps limiter

let b = Body::from(serde_json::to_vec(&evt).unwrap());
let request = Request::builder()
.uri("/events")
Expand Down

0 comments on commit abf787e

Please sign in to comment.