Skip to content

Commit

Permalink
On-demand analysis of past recordings
Browse files Browse the repository at this point in the history
* rayhunter-daemon: API for triggering and reading analysis
* rayhunter-daemon: rename readonly mode to debug mode
* rayhunter-daemon: debug mode allows live-loading frontend files
* rayhunter-check: rework to handle directories
* rayhunter-check: better output
* CI: build rayhunter-check
  • Loading branch information
wgreenberg committed Oct 3, 2024
1 parent c59fb7c commit 20909fc
Show file tree
Hide file tree
Showing 13 changed files with 636 additions and 134 deletions.
13 changes: 11 additions & 2 deletions .github/workflows/build-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ env:
CARGO_TERM_COLOR: always

jobs:
build_serial:
build_serial_and_check:
strategy:
matrix:
platform:
Expand All @@ -28,6 +28,15 @@ jobs:
name: serial-${{ matrix.platform.os }}
path: ./target/release/${{ matrix.platform.build_name }}
if-no-files-found: error
steps:
- uses: actions/checkout@v4
- name: Build check
run: cargo build --bin rayhunter-check --release
- uses: actions/upload-artifact@v4
with:
name: rayhunter-check-${{ matrix.platform.os }}
path: ./target/release/${{ matrix.platform.build_name }}
if-no-files-found: error
build_rootshell_and_rayhunter:
runs-on: ubuntu-latest
steps:
Expand Down Expand Up @@ -63,7 +72,7 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/download-artifact@v4
- name: Fix executable permissions on binaries
run: chmod +x serial-*/serial rayhunter-daemon/rayhunter-daemon
run: chmod +x serial-*/serial rayhunter-check-*/rayhunter-check rayhunter-daemon/rayhunter-daemon
- name: Setup release directory
run: mv rayhunter-daemon/rayhunter-daemon rootshell/rootshell serial-* dist
- name: Archive release directory
Expand Down
240 changes: 240 additions & 0 deletions bin/src/analysis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
use std::sync::Arc;
use std::{future, pin};

use axum::Json;
use axum::{
extract::{Path, State},
http::StatusCode,
};
use futures::TryStreamExt;
use log::{debug, error, info};
use rayhunter::analysis::analyzer::Harness;
use rayhunter::diag::{DataType, MessagesContainer};
use rayhunter::qmdl::QmdlReader;
use serde::Serialize;
use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::sync::mpsc::Receiver;
use tokio::sync::{RwLock, RwLockWriteGuard};
use tokio_util::task::TaskTracker;

use crate::qmdl_store::RecordingStore;
use crate::server::ServerState;

pub struct AnalysisWriter {
writer: BufWriter<File>,
harness: Harness,
bytes_written: usize,
}

// We write our analysis results to a file immediately to minimize the amount of
// state Rayhunter has to keep track of in memory. The analysis file's format is
// Newline Delimited JSON
// (https://docs.mulesoft.com/dataweave/latest/dataweave-formats-ndjson), which
// lets us simply append new rows to the end without parsing the entire JSON
// object beforehand.
impl AnalysisWriter {
pub async fn new(file: File) -> Result<Self, std::io::Error> {
let mut result = Self {
writer: BufWriter::new(file),
harness: Harness::new_with_all_analyzers(),
bytes_written: 0,
};
let metadata = result.harness.get_metadata();
result.write(&metadata).await?;
Ok(result)
}

// Runs the analysis harness on the given container, serializing the results
// to the analysis file and returning the file's new length.
pub async fn analyze(&mut self, container: MessagesContainer) -> Result<usize, std::io::Error> {
let row = self.harness.analyze_qmdl_messages(container);
if !row.is_empty() {
self.write(&row).await?;
}
Ok(self.bytes_written)
}

async fn write<T: Serialize>(&mut self, value: &T) -> Result<(), std::io::Error> {
let mut value_str = serde_json::to_string(value).unwrap();
value_str.push('\n');
self.bytes_written += value_str.len();
self.writer.write_all(value_str.as_bytes()).await?;
self.writer.flush().await?;
Ok(())
}

// Flushes any pending I/O to disk before dropping the writer
pub async fn close(mut self) -> Result<(), std::io::Error> {
self.writer.flush().await?;
Ok(())
}
}

#[derive(Debug, Serialize, Clone, Default)]
pub struct AnalysisStatus {
queued: Vec<String>,
running: Option<String>,
}

pub enum AnalysisCtrlMessage {
NewFilesQueued,
Exit,
}

async fn queued_len(analysis_status_lock: Arc<RwLock<AnalysisStatus>>) -> usize {
analysis_status_lock.read().await.queued.len()
}

async fn dequeue_to_running(analysis_status_lock: Arc<RwLock<AnalysisStatus>>) -> String {
let mut analysis_status = analysis_status_lock.write().await;
let name = analysis_status.queued.remove(0);
assert!(analysis_status.running.is_none());
analysis_status.running = Some(name.clone());
name
}

async fn clear_running(analysis_status_lock: Arc<RwLock<AnalysisStatus>>) {
let mut analysis_status = analysis_status_lock.write().await;
analysis_status.running = None;
}

async fn perform_analysis(
name: &str,
qmdl_store_lock: Arc<RwLock<RecordingStore>>,
) -> Result<(), String> {
info!("Opening QMDL and analysis file for {}...", name);
let (analysis_file, qmdl_file, entry_index) = {
let mut qmdl_store = qmdl_store_lock.write().await;
let (entry_index, _) = qmdl_store
.entry_for_name(&name)
.ok_or(format!("failed to find QMDL store entry for {}", name))?;
let analysis_file = qmdl_store
.clear_and_open_entry_analysis(entry_index)
.await
.map_err(|e| format!("{:?}", e))?;
let qmdl_file = qmdl_store
.open_entry_qmdl(entry_index)
.await
.map_err(|e| format!("{:?}", e))?;

(analysis_file, qmdl_file, entry_index)
};

let mut analysis_writer = AnalysisWriter::new(analysis_file)
.await
.map_err(|e| format!("{:?}", e))?;
let file_size = qmdl_file
.metadata()
.await
.expect("failed to get QMDL file metadata")
.len();
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(file_size as usize));
let mut qmdl_stream = pin::pin!(qmdl_reader
.as_stream()
.try_filter(|container| future::ready(container.data_type == DataType::UserSpace)));

info!("Starting analysis for {}...", name);
while let Some(container) = qmdl_stream
.try_next()
.await
.expect("failed getting QMDL container")
{
let size_bytes = analysis_writer
.analyze(container)
.await
.map_err(|e| format!("{:?}", e))?;
debug!("{} analysis: {} bytes written", name, size_bytes);
let mut qmdl_store = qmdl_store_lock.write().await;
qmdl_store
.update_entry_analysis_size(entry_index, size_bytes)
.await
.map_err(|e| format!("{:?}", e))?;
}

analysis_writer
.close()
.await
.map_err(|e| format!("{:?}", e))?;
info!("Analysis for {} complete!", name);

Ok(())
}

pub fn run_analysis_thread(
task_tracker: &TaskTracker,
mut analysis_rx: Receiver<AnalysisCtrlMessage>,
qmdl_store_lock: Arc<RwLock<RecordingStore>>,
analysis_status_lock: Arc<RwLock<AnalysisStatus>>,
) {
task_tracker.spawn(async move {
loop {
match analysis_rx.recv().await {
Some(AnalysisCtrlMessage::NewFilesQueued) => {
let count = queued_len(analysis_status_lock.clone()).await;
for _ in 0..count {
let name = dequeue_to_running(analysis_status_lock.clone()).await;
if let Err(err) = perform_analysis(&name, qmdl_store_lock.clone()).await {
error!("failed to analyze {}: {}", name, err);
}
clear_running(analysis_status_lock.clone()).await;
}
}
Some(AnalysisCtrlMessage::Exit) | None => return,
}
}
});
}

pub async fn get_analysis_status(
State(state): State<Arc<ServerState>>,
) -> Result<Json<AnalysisStatus>, (StatusCode, String)> {
Ok(Json(state.analysis_status_lock.read().await.clone()))
}

fn queue_qmdl(name: &str, analysis_status: &mut RwLockWriteGuard<AnalysisStatus>) -> bool {
if analysis_status.queued.iter().any(|n| n == name)
|| analysis_status.running.iter().any(|n| n == name)
{
return false;
}
analysis_status.queued.push(name.to_string());
true
}

pub async fn start_analysis(
State(state): State<Arc<ServerState>>,
Path(qmdl_name): Path<String>,
) -> Result<(StatusCode, Json<AnalysisStatus>), (StatusCode, String)> {
let mut analysis_status = state.analysis_status_lock.write().await;
let store = state.qmdl_store_lock.read().await;
let queued = if qmdl_name.is_empty() {
let mut entry_names: Vec<&str> = store
.manifest
.entries
.iter()
.map(|e| e.name.as_str())
.collect();
if let Some(current_entry) = store.current_entry {
entry_names.remove(current_entry);
}
entry_names
.iter()
.any(|name| queue_qmdl(name, &mut analysis_status))
} else {
queue_qmdl(&qmdl_name, &mut analysis_status)
};
if queued {
state
.analysis_sender
.send(AnalysisCtrlMessage::NewFilesQueued)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("failed to queue new analysis files: {:?}", e),
)
})?;
}
Ok((StatusCode::ACCEPTED, Json(analysis_status.clone())))
}
51 changes: 35 additions & 16 deletions bin/src/check.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{collections::HashMap, future, path::PathBuf, pin::pin};
use rayhunter::{analysis::analyzer::Harness, diag::DataType, qmdl::QmdlReader};
use tokio::fs::File;
use tokio::fs::{metadata, read_dir, File};
use clap::Parser;
use futures::TryStreamExt;

Expand All @@ -9,24 +9,17 @@ use futures::TryStreamExt;
struct Args {
#[arg(short, long)]
qmdl_path: PathBuf,
}

#[tokio::main]
async fn main() {
env_logger::init();
let args = Args::parse();

let mut harness = Harness::new_with_all_analyzers();
#[arg(long)]
show_skipped: bool,
}

let qmdl_file = File::open(args.qmdl_path).await.expect("failed to open QMDL file");
async fn analyze_file(harness: &mut Harness, qmdl_path: &str, show_skipped: bool) {
let qmdl_file = &mut File::open(&qmdl_path).await.expect("failed to open file");
let file_size = qmdl_file.metadata().await.expect("failed to get QMDL file metadata").len();
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(file_size as usize));
let mut qmdl_stream = pin!(qmdl_reader.as_stream()
.try_filter(|container| future::ready(container.data_type == DataType::UserSpace)));
println!("Analyzers:");
for analyzer in harness.get_metadata().analyzers {
println!(" - {}: {}", analyzer.name, analyzer.description);
}
let mut skipped_reasons: HashMap<String, i32> = HashMap::new();
let mut total_messages = 0;
let mut warnings = 0;
Expand All @@ -47,11 +40,37 @@ async fn main() {
}
}
}
if skipped > 0 {
println!("Messages skipped:");
if show_skipped && skipped > 0 {
println!("{}: messages skipped:", qmdl_path);
for (reason, count) in skipped_reasons.iter() {
println!(" - {}: \"{}\"", count, reason);
}
}
println!("{} messages analyzed, {} warnings, {} messages skipped", total_messages, warnings, skipped);
println!("{}: {} messages analyzed, {} warnings, {} messages skipped", qmdl_path, total_messages, warnings, skipped);
}

#[tokio::main]
async fn main() {
env_logger::init();
let args = Args::parse();

let mut harness = Harness::new_with_all_analyzers();
println!("Analyzers:");
for analyzer in harness.get_metadata().analyzers {
println!(" - {}: {}", analyzer.name, analyzer.description);
}

let metadata = metadata(&args.qmdl_path).await.expect("failed to get metadata");
if metadata.is_dir() {
let mut dir = read_dir(&args.qmdl_path).await.expect("failed to read dir");
while let Some(entry) = dir.next_entry().await.expect("failed to get entry") {
let name = entry.file_name();
let name_str = name.to_str().unwrap();
if name_str.ends_with(".qmdl") {
analyze_file(&mut harness, entry.path().to_str().unwrap(), args.show_skipped).await;
}
}
} else {
analyze_file(&mut harness, args.qmdl_path.to_str().unwrap(), args.show_skipped).await;
}
}
8 changes: 4 additions & 4 deletions bin/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use serde::Deserialize;
struct ConfigFile {
qmdl_store_path: Option<String>,
port: Option<u16>,
readonly_mode: Option<bool>,
debug_mode: Option<bool>,
ui_level: Option<u8>,
}

#[derive(Debug)]
pub struct Config {
pub qmdl_store_path: String,
pub port: u16,
pub readonly_mode: bool,
pub debug_mode: bool,
pub ui_level: u8,
}

Expand All @@ -23,7 +23,7 @@ impl Default for Config {
Config {
qmdl_store_path: "/data/rayhunter/qmdl".to_string(),
port: 8080,
readonly_mode: false,
debug_mode: false,
ui_level: 1,
}
}
Expand All @@ -36,7 +36,7 @@ pub fn parse_config<P>(path: P) -> Result<Config, RayhunterError> where P: AsRef
.map_err(RayhunterError::ConfigFileParsingError)?;
if let Some(path) = parsed_config.qmdl_store_path { config.qmdl_store_path = path }
if let Some(port) = parsed_config.port { config.port = port }
if let Some(readonly_mode) = parsed_config.readonly_mode { config.readonly_mode = readonly_mode }
if let Some(debug_mode) = parsed_config.debug_mode { config.debug_mode = debug_mode }
if let Some(ui_level) = parsed_config.ui_level { config.ui_level = ui_level }
}
Ok(config)
Expand Down
Loading

0 comments on commit 20909fc

Please sign in to comment.