From e93ed08696f53c7c565a1e1995b1c0cd75724778 Mon Sep 17 00:00:00 2001 From: Benno van den Berg Date: Tue, 1 Oct 2024 15:26:24 +0200 Subject: [PATCH] Implement API manager (#263) * Rename OpenWorkspaceByPathError to OpenWorkspaceError Use Option functions instead of doing it manually * Initial implementation of the ApiManager Move LibSqlStore and InMemoryEvents to fpx * Add fpx.toml * Resolve warnings * Fix tests * Add simple logging with Tracing Rename CI from build to build_cli * Add legacy and fpx api manager Fix issue with duplicate recent workspace entries * Ensure that the lock stays locked long enough (Rust API) Previously there was a possibility that start would be called twice and two processes would be started, but only one PID would be stored Add more docs, logs, etc * Remove unsused file * Fix build issue Use 8788 as api port * remove todo's, they are captured by issues --- .../workflows/{build.yaml => build_cli.yaml} | 0 Cargo.lock | 9 +- fpx-app/Cargo.toml | 21 ++++- fpx-app/Tauri.toml | 2 +- fpx-app/src/api_manager.rs | 11 +++ fpx-app/src/api_manager/fpx_api.rs | 94 +++++++++++++++++++ fpx-app/src/api_manager/legacy.rs | 88 +++++++++++++++++ fpx-app/src/commands/workspace.rs | 29 +++--- fpx-app/src/main.rs | 30 ++++++ fpx-app/src/models/workspace.rs | 2 +- fpx-app/src/state.rs | 6 +- fpx-cli/Cargo.toml | 7 -- fpx-cli/src/commands/dev.rs | 4 +- fpx-cli/src/data/util.rs | 38 -------- fpx-cli/src/main.rs | 2 - fpx.toml | 1 + fpx/Cargo.toml | 8 +- fpx/src/data.rs | 5 +- .../data.rs => fpx/src/data/libsql_store.rs | 52 ++++++++-- .../src/data/libsql_store}/migrations.rs | 3 +- .../migrations/20240708_create_spans.sql | 0 .../migrations/20240723_create_responses.sql | 0 .../migrations/20240918_create_settings.sql | 0 .../src/data/libsql_store}/tests.rs | 10 +- fpx/src/data/util.rs | 2 +- fpx/src/events.rs | 5 + .../src/events.rs => fpx/src/events/memory.rs | 6 +- fpx/src/lib.rs | 1 + fpx/src/service.rs | 3 +- packages/types/src/schemas.ts | 7 +- studio/src/tauri/RuntimeProvider.tsx | 8 +- .../WorkspaceOpenError/WorkspaceOpenError.tsx | 4 +- xtask/src/commands/schemas.rs | 2 +- 33 files changed, 357 insertions(+), 103 deletions(-) rename .github/workflows/{build.yaml => build_cli.yaml} (100%) create mode 100644 fpx-app/src/api_manager.rs create mode 100644 fpx-app/src/api_manager/fpx_api.rs create mode 100644 fpx-app/src/api_manager/legacy.rs delete mode 100644 fpx-cli/src/data/util.rs create mode 100644 fpx.toml rename fpx-cli/src/data.rs => fpx/src/data/libsql_store.rs (85%) rename {fpx-cli/src/data => fpx/src/data/libsql_store}/migrations.rs (97%) rename {fpx-cli/src/data => fpx/src/data/libsql_store}/migrations/20240708_create_spans.sql (100%) rename {fpx-cli/src/data => fpx/src/data/libsql_store}/migrations/20240723_create_responses.sql (100%) rename {fpx-cli/src/data => fpx/src/data/libsql_store}/migrations/20240918_create_settings.sql (100%) rename {fpx-cli/src/data => fpx/src/data/libsql_store}/tests.rs (92%) rename fpx-cli/src/events.rs => fpx/src/events/memory.rs (86%) diff --git a/.github/workflows/build.yaml b/.github/workflows/build_cli.yaml similarity index 100% rename from .github/workflows/build.yaml rename to .github/workflows/build_cli.yaml diff --git a/Cargo.lock b/Cargo.lock index fe190acd2..eba401119 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1467,6 +1467,7 @@ dependencies = [ "hex", "http 1.1.0", "http-body-util", + "include_dir", "libsql", "opentelemetry", "opentelemetry-proto", @@ -1476,6 +1477,7 @@ dependencies = [ "serde", "serde_json", "strum", + "test-log", "thiserror", "time", "tokio", @@ -1492,7 +1494,10 @@ dependencies = [ name = "fpx-app" version = "0.1.0" dependencies = [ + "anyhow", + "axum 0.7.5", "fpx", + "nix", "schemars", "serde", "serde_json", @@ -1501,6 +1506,9 @@ dependencies = [ "tauri-plugin-dialog", "tauri-plugin-store", "tauri-plugin-window-state", + "tokio", + "tracing", + "tracing-subscriber", ] [[package]] @@ -1533,7 +1541,6 @@ dependencies = [ "serde_json", "serde_with", "strum", - "test-log", "thiserror", "time", "tokio", diff --git a/fpx-app/Cargo.toml b/fpx-app/Cargo.toml index 27efde22b..8b5b8d23e 100644 --- a/fpx-app/Cargo.toml +++ b/fpx-app/Cargo.toml @@ -13,17 +13,36 @@ repository = { workspace = true } tauri-build = { version = "2.0.0-rc.11", features = [] } [dependencies] -fpx = { version = "0.1.0", path = "../fpx", features = ["config"] } +anyhow = { workspace = true } +axum = { workspace = true, default-features = false, features = [ + "http1", + "query", + "tokio", + "tracing", + "ws", +], optional = true } +fpx = { version = "0.1.0", path = "../fpx", features = ["config", "libsql"] } +nix = { version = "0.29", default-features = false, features = ["signal"] } schemars = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tauri = { version = "2.0.0-rc.14", features = ["config-toml"] } tauri-plugin-dialog = "2.0.0-rc" tauri-plugin-store = { version = "2.0.0-rc" } +tokio = { version = "1", default-features = false, features = [ + "sync", + "process", + "macros", +] } +tracing = { version = "0.1" } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } [features] # This feature is used for production builds or when a dev server is not specified, DO NOT REMOVE!! custom-protocol = ["tauri/custom-protocol"] +# This feature enables the new Rust API. If not enabled it will use the TS API. +fpx-api = ["dep:axum"] + [target.'cfg(not(any(target_os = "android", target_os = "ios")))'.dependencies] tauri-plugin-window-state = "2.0.0-rc" diff --git a/fpx-app/Tauri.toml b/fpx-app/Tauri.toml index a71d4ebda..9762595c6 100644 --- a/fpx-app/Tauri.toml +++ b/fpx-app/Tauri.toml @@ -3,7 +3,7 @@ identifier = "com.fiberplane.fpx" [build] devUrl = "http://localhost:5173" beforeBuildCommand = "pnpm build:fpx-studio" -beforeDevCommand = "pnpm dev:api & pnpm dev:frontend" +beforeDevCommand = "pnpm dev:frontend" frontendDist = "../studio/dist" [bundle] diff --git a/fpx-app/src/api_manager.rs b/fpx-app/src/api_manager.rs new file mode 100644 index 000000000..7cd5da756 --- /dev/null +++ b/fpx-app/src/api_manager.rs @@ -0,0 +1,11 @@ +#[cfg(feature = "fpx-api")] +mod fpx_api; + +#[cfg(feature = "fpx-api")] +pub use fpx_api::*; + +#[cfg(not(feature = "fpx-api"))] +mod legacy; + +#[cfg(not(feature = "fpx-api"))] +pub use legacy::*; diff --git a/fpx-app/src/api_manager/fpx_api.rs b/fpx-app/src/api_manager/fpx_api.rs new file mode 100644 index 000000000..e069ed529 --- /dev/null +++ b/fpx-app/src/api_manager/fpx_api.rs @@ -0,0 +1,94 @@ +use fpx::api; +use fpx::config::FpxConfig; +use fpx::data::libsql_store::LibsqlStore; +use fpx::events::memory::InMemoryEvents; +use fpx::service::Service; +use std::sync::{Arc, Mutex}; +use tauri::async_runtime::spawn; +use tokio::sync::broadcast::error::RecvError; +use tracing::{error, info, trace, warn}; + +#[derive(Debug, Default)] +pub struct ApiManager { + // Sending a message on this channel will shutdown the axum server. + shutdown_tx: Mutex>>, +} + +impl ApiManager { + pub fn start_api(&self, fpx_config: FpxConfig) { + let mut shutdown_tx = self.shutdown_tx.lock().expect("lock is poisoned"); + if let Some(shutdown_tx) = shutdown_tx.take() { + // shutdown any existing api server + let _ = shutdown_tx.send(()); + } + + // Start a listener early, so that we can handle the issue where another + // process is listening already on that specific port. + let listen_port = fpx_config.listen_port.unwrap_or(6767); + let listener = std::net::TcpListener::bind(format!("127.0.0.1:{listen_port}")).unwrap(); + listener.set_nonblocking(true).unwrap(); + + let (shutdown, on_shutdown) = tokio::sync::oneshot::channel::<()>(); + *shutdown_tx = Some(shutdown); + + spawn(async move { + let store = LibsqlStore::in_memory().await.unwrap(); + LibsqlStore::migrate(&store).await.unwrap(); + let store = Arc::new(store); + + // Create a event sink that simply logs + let events = InMemoryEvents::new(); + let events = Arc::new(events); + + // Our current implementation simply logs the events. + let mut reader = events.subscribe(); + spawn(async move { + loop { + match reader.recv().await { + Ok(message) => { + // Here we can do something with events, like + // emitting them to the frontend: + // window.emit("api_message", message).expect("emit failed"); + info!("Received message: {:?}", message); + } + Err(RecvError::Lagged(i)) => { + warn!(lagged = i, "Event reader lagged behind"); + } + Err(RecvError::Closed) => { + trace!("Event reader loop stopped"); + break; + } + } + } + }); + + let service = Service::new(store.clone(), events.clone()); + + let app = api::Builder::new() + .enable_compression() + .build(service.clone(), store.clone()); + + let listener = tokio::net::TcpListener::from_std(listener).unwrap(); + let api_server = axum::serve(listener, app).with_graceful_shutdown(async { + // Once we receive something on the [`on_shutdown`] channel, + // we'll resolve this future, and thus axum will shutdown. + // We are wrapping this in another future because of the + // incompatible return type of the oneshot channel. + let _ = on_shutdown.await; + trace!("Received API shutdown signal"); + }); + + if let Err(err) = api_server.await { + error!(?err, "API server returned an error"); + }; + }); + } + + pub fn stop_api(&self) { + let mut shutdown_tx = self.shutdown_tx.lock().expect("lock is poisoned"); + if let Some(shutdown_tx) = shutdown_tx.take() { + // shutdown any existing api servers + let _ = shutdown_tx.send(()); + } + } +} diff --git a/fpx-app/src/api_manager/legacy.rs b/fpx-app/src/api_manager/legacy.rs new file mode 100644 index 000000000..a3376f5a3 --- /dev/null +++ b/fpx-app/src/api_manager/legacy.rs @@ -0,0 +1,88 @@ +use fpx::config::FpxConfig; +use nix::sys::signal::{killpg, Signal}; +use nix::unistd::Pid; +use std::os::unix::process::CommandExt; +use std::process; +use std::sync::Mutex; +use tauri::async_runtime::spawn; +use tracing::{error, trace, warn}; + +#[derive(Debug, Default)] +pub struct ApiManager { + api_pid: Mutex>, +} + +impl ApiManager { + /// Start a API server. If a API pid is already set, then that will first be + /// shutdown. + pub fn start_api(&self, fpx_config: FpxConfig) { + // Get a lock for the duration of this function + let mut api_pid = self.api_pid.lock().expect("lock is poisoned"); + + // If there is a API pid already there, then first send the SIGTERM + // signal to that process group. + if let Some(api_pid) = api_pid.take() { + // shutdown any existing api server + Self::send_sigterm_signal(api_pid); + } + + // Create some environment variables overrides based on the fpx.toml + let mut envs: Vec<(&str, String)> = vec![]; + if let Some(listen_port) = fpx_config.listen_port { + envs.push(("FPX_PORT", listen_port.to_string())); + } + + // Start the process using pnpm. The process_group=0 will ensure that + // the process group ID is the same as the root process ID. + let mut child_process = process::Command::new("pnpm") + .arg("dev:api") + .process_group(0) // + .envs(envs) + .spawn() + .expect("failed to execute pnpm dev:api"); + + // Once the process is running, get the pid and store it in the mutex, + // so that we can send signals to it later. + let pid = child_process.id(); + *api_pid = Some(Pid::from_raw(pid as i32)); + + // Spawn a task to wait for the child process to exit, and potentially + // log an error + spawn(async move { + let result = child_process.wait(); + if let Err(err) = result { + error!(?err, api_pid=?pid, "child process exited with error"); + } else { + trace!(api_pid=?pid, "API server exited successfully"); + } + }); + } + + /// Sends the SIGTERM signal to the API process group. If no API pid was set + /// then this function will do nothing. + pub fn stop_api(&self) { + let Some(api_pid) = self.api_pid.lock().expect("lock is poisoned").take() else { + trace!("No API running"); + return; + }; + + Self::send_sigterm_signal(api_pid) + } + + /// Send the SIGTERM signal to the specified process group. + /// + /// This uses a Process ID type instead of a specific process group ID as + /// that does not exist. + fn send_sigterm_signal(api_pid: Pid) { + trace!(?api_pid, "sending SIGTERM signal to API process group"); + + let result = killpg(api_pid, Signal::SIGTERM); + if let Err(errno) = result { + warn!( + ?errno, + ?api_pid, + "failed to send SIGNTERM signal to API process group" + ); + } + } +} diff --git a/fpx-app/src/commands/workspace.rs b/fpx-app/src/commands/workspace.rs index 94b2f8e3d..46a8896d4 100644 --- a/fpx-app/src/commands/workspace.rs +++ b/fpx-app/src/commands/workspace.rs @@ -1,4 +1,5 @@ -use crate::models::workspace::{OpenWorkspaceByPathError, Workspace}; +use crate::api_manager::ApiManager; +use crate::models::workspace::{OpenWorkspaceError, Workspace}; use crate::state::AppState; use crate::STORE_PATH; use fpx::config::{FpxConfig, FpxConfigError}; @@ -37,22 +38,23 @@ pub fn get_current_workspace(state: State<'_, AppState>) -> Option { #[tauri::command] pub fn open_workspace_by_path( path: String, - state: State<'_, AppState>, + app_state: State<'_, AppState>, + api_manager: State<'_, ApiManager>, app: AppHandle, stores: State<'_, StoreCollection>, -) -> Result { +) -> Result { + api_manager.stop_api(); + let path_buf = PathBuf::from(path.clone()); let config = match FpxConfig::load(Some(path_buf)) { Ok((config, _config_path)) => config, Err(err) => { return Err(match err { - FpxConfigError::FileNotFound(path_buf) => { - OpenWorkspaceByPathError::ConfigFileMissing { - path: path_buf.to_string_lossy().to_string(), - } - } + FpxConfigError::FileNotFound(path_buf) => OpenWorkspaceError::ConfigFileMissing { + path: path_buf.to_string_lossy().to_string(), + }, FpxConfigError::InvalidFpxConfig { message, .. } => { - OpenWorkspaceByPathError::InvalidConfiguration { message } + OpenWorkspaceError::InvalidConfiguration { message } } FpxConfigError::RootDirectoryNotFound => { unreachable!("FpxConfig::load takes a path, so this cannot occur") @@ -61,8 +63,10 @@ pub fn open_workspace_by_path( } }; + api_manager.start_api(config.clone()); + let workspace = Workspace::new(path.clone(), config); - state.set_workspace(workspace.clone()); + app_state.set_workspace(workspace.clone()); with_store(app, stores, STORE_PATH, |store| { let mut recents: Vec = store @@ -70,7 +74,7 @@ pub fn open_workspace_by_path( .and_then(|value| value.as_array()) .map(|arr| { arr.iter() - .filter_map(|item| item.as_str().filter(|s| s == &path).map(|s| s.to_string())) + .filter_map(|item| item.as_str().filter(|s| s != &path).map(|s| s.to_string())) .collect() }) .unwrap_or_default(); @@ -89,6 +93,7 @@ pub fn open_workspace_by_path( } #[tauri::command] -pub fn close_workspace(state: State<'_, AppState>) { +pub fn close_workspace(state: State<'_, AppState>, api_manager: State<'_, ApiManager>) { + api_manager.stop_api(); state.close_workspace(); } diff --git a/fpx-app/src/main.rs b/fpx-app/src/main.rs index 669c2fb6b..831c9acf5 100644 --- a/fpx-app/src/main.rs +++ b/fpx-app/src/main.rs @@ -1,12 +1,19 @@ // Prevents additional console window on Windows in release, DO NOT REMOVE!! #![cfg_attr(not(debug_assertions), windows_subsystem = "windows")] +use anyhow::{Context, Result}; +use api_manager::ApiManager; use state::AppState; +use std::env; use tauri::menu::{MenuBuilder, MenuId, MenuItemBuilder, SubmenuBuilder}; use tauri::{Emitter, WebviewWindowBuilder}; use tauri::{Manager, Wry}; use tauri_plugin_store::StoreCollection; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{EnvFilter, Registry}; +mod api_manager; mod commands; mod models; mod state; @@ -15,11 +22,17 @@ const MAIN_WINDOW_ID: &str = "main-window"; const STORE_PATH: &str = "fpx.bin"; fn main() { + if let Err(err) = setup_tracing() { + eprintln!("error while setting up tracing: {:?}", err); + std::process::exit(1); + } + tauri::Builder::default() .plugin(tauri_plugin_window_state::Builder::new().build()) .plugin(tauri_plugin_store::Builder::new().build()) .plugin(tauri_plugin_dialog::init()) .manage(AppState::default()) + .manage(ApiManager::default()) .setup(|app| { app.handle() .try_state::>() @@ -109,3 +122,20 @@ fn main() { .run(tauri::generate_context!()) .expect("error while running tauri application"); } + +fn setup_tracing() -> Result<()> { + let filter_layer = { + let directives = env::var("RUST_LOG").unwrap_or_else(|_| "warn,fpx=info".to_string()); + EnvFilter::builder().parse(directives)? + }; + + let log_layer = tracing_subscriber::fmt::layer(); + + Registry::default() + .with(filter_layer) + .with(log_layer) + .try_init() + .context("unable to initialize logger")?; + + Ok(()) +} diff --git a/fpx-app/src/models/workspace.rs b/fpx-app/src/models/workspace.rs index 2a8432e17..f46b30ab6 100644 --- a/fpx-app/src/models/workspace.rs +++ b/fpx-app/src/models/workspace.rs @@ -16,7 +16,7 @@ impl Workspace { #[derive(JsonSchema, Deserialize, Serialize)] #[serde(tag = "type")] -pub enum OpenWorkspaceByPathError { +pub enum OpenWorkspaceError { ConfigFileMissing { path: String }, InvalidConfiguration { message: String }, } diff --git a/fpx-app/src/state.rs b/fpx-app/src/state.rs index e78bd4da7..5227aa8c7 100644 --- a/fpx-app/src/state.rs +++ b/fpx-app/src/state.rs @@ -10,16 +10,16 @@ pub struct AppState { impl AppState { pub fn set_workspace(&self, workspace: Workspace) { let mut workspace_lock = self.workspace.lock().unwrap(); - *workspace_lock = Some(workspace); + workspace_lock.replace(workspace); } pub fn close_workspace(&self) { let mut workspace_lock = self.workspace.lock().unwrap(); - *workspace_lock = None; + workspace_lock.take(); } pub fn get_workspace(&self) -> Option { let workspace_lock = self.workspace.lock().unwrap(); - workspace_lock.as_ref().map(|workspace| workspace.clone()) + workspace_lock.as_ref().cloned() } } diff --git a/fpx-cli/Cargo.toml b/fpx-cli/Cargo.toml index 86415a13c..b4217818e 100644 --- a/fpx-cli/Cargo.toml +++ b/fpx-cli/Cargo.toml @@ -28,10 +28,6 @@ hex = { version = "0.4" } http = { version = "1.1" } http-body-util = { version = "0.1" } include_dir = { version = "0.7.3" } -libsql = { version = "0.5", default-features = false, features = [ - "core", - "serde", -] } once_cell = { version = "1.19" } opentelemetry = { version = "0.24" } opentelemetry_sdk = { version = "0.24", features = ["rt-tokio"] } @@ -78,6 +74,3 @@ libsql = { version = "0.5", default-features = false, features = [ "serde", "replication", ] } - -[dev-dependencies] -test-log = { version = "0.2", default-features = false, features = ["trace"] } diff --git a/fpx-cli/src/commands/dev.rs b/fpx-cli/src/commands/dev.rs index bdba55d22..3563341ff 100644 --- a/fpx-cli/src/commands/dev.rs +++ b/fpx-cli/src/commands/dev.rs @@ -1,8 +1,8 @@ -use crate::data::LibsqlStore; -use crate::events::InMemoryEvents; use crate::grpc::GrpcService; use crate::initialize_fpx_dir; use anyhow::{Context, Result}; +use fpx::data::libsql_store::LibsqlStore; +use fpx::events::memory::InMemoryEvents; use fpx::{api, service}; use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer; use std::future::IntoFuture; diff --git a/fpx-cli/src/data/util.rs b/fpx-cli/src/data/util.rs deleted file mode 100644 index 6edb159f3..000000000 --- a/fpx-cli/src/data/util.rs +++ /dev/null @@ -1,38 +0,0 @@ -use fpx::data::{DbError, Result}; -use libsql::{de, Rows}; -use serde::de::DeserializeOwned; - -#[allow(dead_code)] -pub(crate) trait RowsExt { - /// `T` must be a `struct` - async fn fetch_one(&mut self) -> Result; - - /// `T` must be a `struct` - async fn fetch_optional(&mut self) -> Result, DbError>; - - /// `T` must be a `struct` - async fn fetch_all(&mut self) -> Result, DbError>; -} - -impl RowsExt for Rows { - async fn fetch_one(&mut self) -> Result { - self.fetch_optional().await?.ok_or(DbError::NotFound) - } - - async fn fetch_optional(&mut self) -> Result, DbError> { - match self.next().await? { - Some(row) => Ok(Some(de::from_row(&row)?)), - None => Ok(None), - } - } - - async fn fetch_all(&mut self) -> Result, DbError> { - let mut results = Vec::new(); - - while let Some(row) = self.next().await? { - results.push(de::from_row(&row)?); - } - - Ok(results) - } -} diff --git a/fpx-cli/src/main.rs b/fpx-cli/src/main.rs index 7e80f330c..2a941bb98 100644 --- a/fpx-cli/src/main.rs +++ b/fpx-cli/src/main.rs @@ -15,8 +15,6 @@ use tracing_subscriber::{EnvFilter, Registry}; mod api; mod commands; -pub mod data; -pub mod events; pub mod grpc; #[tokio::main] diff --git a/fpx.toml b/fpx.toml new file mode 100644 index 000000000..c96c33b68 --- /dev/null +++ b/fpx.toml @@ -0,0 +1 @@ +listen_port = 8788 diff --git a/fpx/Cargo.toml b/fpx/Cargo.toml index 954d3fed2..5cf5d4b27 100644 --- a/fpx/Cargo.toml +++ b/fpx/Cargo.toml @@ -10,6 +10,7 @@ repository = { workspace = true } [features] config = ["toml"] +libsql = ["dep:libsql", "dep:include_dir"] [dependencies] anyhow = { version = "1.0", default-features = false } @@ -23,7 +24,11 @@ fpx-macros = { version = "0.1.0", path = "../fpx-macros" } futures-util = { version = "0.3", default-features = false } hex = { version = "0.4", default-features = false, features = ["alloc"] } http = { version = "1.1", default-features = false } -libsql = { version = "0.5", default-features = false, optional = true } +include_dir = { version = "0.7.3", optional = true } +libsql = { version = "0.5", default-features = false, optional = true, features = [ + "core", + "serde", +] } opentelemetry = { version = "0.24", default-features = false } opentelemetry_sdk = { version = "0.24", default-features = false } opentelemetry-proto = { version = "0.7", default-features = false, features = [ @@ -59,6 +64,7 @@ wasm-bindgen = { version = "0.2", default-features = false, optional = true } [dev-dependencies] http-body-util = { version = "0.1", default-features = false } +test-log = { version = "0.2", default-features = false, features = ["trace"] } tokio = { version = "1.40", default-features = false, features = [ "macros", "test-util", diff --git a/fpx/src/data.rs b/fpx/src/data.rs index 9cf5637e4..0057ec86c 100644 --- a/fpx/src/data.rs +++ b/fpx/src/data.rs @@ -1,17 +1,16 @@ use crate::api::models::settings::Settings; use crate::data::models::HexEncodedId; -use crate::events::ServerEvents; use async_trait::async_trait; use std::sync::Arc; use thiserror::Error; +#[cfg(feature = "libsql")] +pub mod libsql_store; pub mod models; pub mod sql; pub mod util; pub type Result = anyhow::Result; - -pub type BoxedEvents = Arc; pub type BoxedStore = Arc; #[derive(Clone, Default, Debug)] diff --git a/fpx-cli/src/data.rs b/fpx/src/data/libsql_store.rs similarity index 85% rename from fpx-cli/src/data.rs rename to fpx/src/data/libsql_store.rs index d3beadd99..2e97b01bc 100644 --- a/fpx-cli/src/data.rs +++ b/fpx/src/data/libsql_store.rs @@ -1,20 +1,19 @@ +use crate::api::models::settings::Settings; +use crate::data::models::{HexEncodedId, Span}; +use crate::data::sql::SqlBuilder; +use crate::data::{DbError, Result, Store, Transaction}; use anyhow::Context; use async_trait::async_trait; -use fpx::api::models::settings::Settings; -use fpx::data::models::{HexEncodedId, Span}; -use fpx::data::sql::SqlBuilder; -use fpx::data::{DbError, Result, Store, Transaction}; +use libsql::{de, Rows}; use libsql::{params, Builder, Connection}; +use serde::de::DeserializeOwned; use serde_json::Map; use std::fmt::Display; use std::path::Path; use std::sync::Arc; use tracing::{error, trace}; -use util::RowsExt; - -mod migrations; -mod util; +pub mod migrations; #[cfg(test)] mod tests; @@ -181,7 +180,7 @@ impl Store for LibsqlStore { &self, _tx: &Transaction, // Future improvement could hold sort fields, limits, etc - ) -> Result> { + ) -> Result> { let traces = self .connection .query(&self.sql_builder.traces_list(None), ()) @@ -279,3 +278,38 @@ impl Store for LibsqlStore { }) } } + +#[allow(dead_code)] +pub(crate) trait RowsExt { + /// `T` must be a `struct` + async fn fetch_one(&mut self) -> Result; + + /// `T` must be a `struct` + async fn fetch_optional(&mut self) -> Result>; + + /// `T` must be a `struct` + async fn fetch_all(&mut self) -> Result>; +} + +impl RowsExt for Rows { + async fn fetch_one(&mut self) -> Result { + self.fetch_optional().await?.ok_or(DbError::NotFound) + } + + async fn fetch_optional(&mut self) -> Result> { + match self.next().await? { + Some(row) => Ok(Some(de::from_row(&row)?)), + None => Ok(None), + } + } + + async fn fetch_all(&mut self) -> Result> { + let mut results = Vec::new(); + + while let Some(row) = self.next().await? { + results.push(de::from_row(&row)?); + } + + Ok(results) + } +} diff --git a/fpx-cli/src/data/migrations.rs b/fpx/src/data/libsql_store/migrations.rs similarity index 97% rename from fpx-cli/src/data/migrations.rs rename to fpx/src/data/libsql_store/migrations.rs index 2e8ead00c..446ddb30c 100644 --- a/fpx-cli/src/data/migrations.rs +++ b/fpx/src/data/libsql_store/migrations.rs @@ -6,7 +6,8 @@ use tracing::{debug, trace}; // NOTE: We should probably create our own include, which will store it sorted, // as an array, and with just the name and sql as the expected types. -static MIGRATIONS: Dir<'_> = include_dir::include_dir!("$CARGO_MANIFEST_DIR/src/data/migrations"); +static MIGRATIONS: Dir<'_> = + include_dir::include_dir!("$CARGO_MANIFEST_DIR/src/data/libsql_store/migrations"); static MIGRATIONS_BOOTSTRAP: &str = " CREATE TABLE _fpx_migrations ( diff --git a/fpx-cli/src/data/migrations/20240708_create_spans.sql b/fpx/src/data/libsql_store/migrations/20240708_create_spans.sql similarity index 100% rename from fpx-cli/src/data/migrations/20240708_create_spans.sql rename to fpx/src/data/libsql_store/migrations/20240708_create_spans.sql diff --git a/fpx-cli/src/data/migrations/20240723_create_responses.sql b/fpx/src/data/libsql_store/migrations/20240723_create_responses.sql similarity index 100% rename from fpx-cli/src/data/migrations/20240723_create_responses.sql rename to fpx/src/data/libsql_store/migrations/20240723_create_responses.sql diff --git a/fpx-cli/src/data/migrations/20240918_create_settings.sql b/fpx/src/data/libsql_store/migrations/20240918_create_settings.sql similarity index 100% rename from fpx-cli/src/data/migrations/20240918_create_settings.sql rename to fpx/src/data/libsql_store/migrations/20240918_create_settings.sql diff --git a/fpx-cli/src/data/tests.rs b/fpx/src/data/libsql_store/tests.rs similarity index 92% rename from fpx-cli/src/data/tests.rs rename to fpx/src/data/libsql_store/tests.rs index 91e897d12..b652491be 100644 --- a/fpx-cli/src/data/tests.rs +++ b/fpx/src/data/libsql_store/tests.rs @@ -1,7 +1,7 @@ -use crate::data::LibsqlStore; -use fpx::api::models::{AttributeMap, SpanKind}; -use fpx::data::models::{HexEncodedId, Span}; -use fpx::data::Store; +use crate::api::models::{AttributeMap, SpanKind}; +use crate::data::libsql_store::LibsqlStore; +use crate::data::models::{HexEncodedId, Span}; +use crate::data::Store; use test_log::test; /// Tests creating a span and then retrieving it using the various methods. @@ -25,7 +25,7 @@ async fn span_successful() { let span_id = HexEncodedId::new("a6c0ed7c2f81e7c8").unwrap(); let now = time::OffsetDateTime::now_utc(); - let inner_span: fpx::api::models::Span = fpx::api::models::Span { + let inner_span = crate::api::models::Span { trace_id: trace_id.clone(), span_id: span_id.clone(), parent_span_id: None, diff --git a/fpx/src/data/util.rs b/fpx/src/data/util.rs index aa4d24c34..fa2a07517 100644 --- a/fpx/src/data/util.rs +++ b/fpx/src/data/util.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use crate::data::Result; use serde::de::DeserializeOwned; use serde::{Deserialize, Deserializer, Serialize}; use std::ops::{Deref, DerefMut}; diff --git a/fpx/src/events.rs b/fpx/src/events.rs index 2904b0ed0..8bf81bd01 100644 --- a/fpx/src/events.rs +++ b/fpx/src/events.rs @@ -1,5 +1,10 @@ use crate::api::models::ServerMessage; use axum::async_trait; +use std::sync::Arc; + +pub mod memory; + +pub type BoxedEvents = Arc; #[async_trait] pub trait ServerEvents: Sync + Send { diff --git a/fpx-cli/src/events.rs b/fpx/src/events/memory.rs similarity index 86% rename from fpx-cli/src/events.rs rename to fpx/src/events/memory.rs index 8dfde060c..0ab4bd2e3 100644 --- a/fpx-cli/src/events.rs +++ b/fpx/src/events/memory.rs @@ -1,6 +1,6 @@ +use crate::api::models::ServerMessage; +use crate::events::ServerEvents; use async_trait::async_trait; -use fpx::api::models::ServerMessage; -use fpx::events::ServerEvents; use tokio::sync::broadcast; use tracing::trace; @@ -15,7 +15,7 @@ impl InMemoryEvents { Self { sender } } - pub async fn subscribe(&self) -> broadcast::Receiver { + pub fn subscribe(&self) -> broadcast::Receiver { self.sender.subscribe() } } diff --git a/fpx/src/lib.rs b/fpx/src/lib.rs index e0e2a4d3c..dfa2b76c2 100644 --- a/fpx/src/lib.rs +++ b/fpx/src/lib.rs @@ -1,4 +1,5 @@ pub mod api; +#[cfg(feature = "config")] pub mod config; pub mod data; pub mod events; diff --git a/fpx/src/service.rs b/fpx/src/service.rs index 05d9e59a5..2a20cdf20 100644 --- a/fpx/src/service.rs +++ b/fpx/src/service.rs @@ -1,5 +1,6 @@ use crate::api::models::{Span, SpanAdded}; -use crate::data::{BoxedEvents, BoxedStore, DbError}; +use crate::data::{BoxedStore, DbError}; +use crate::events::BoxedEvents; use anyhow::Result; use opentelemetry_proto::tonic::collector::trace::v1::{ ExportTraceServiceRequest, ExportTraceServiceResponse, diff --git a/packages/types/src/schemas.ts b/packages/types/src/schemas.ts index 86a5ea1be..416f0ce3e 100644 --- a/packages/types/src/schemas.ts +++ b/packages/types/src/schemas.ts @@ -84,7 +84,7 @@ export const AppStateSchema = z.object({ export type AppState = z.infer; -export const OpenWorkspaceByPathErrorSchema = z.any().superRefine((x, ctx) => { +export const OpenWorkspaceErrorSchema = z.any().superRefine((x, ctx) => { const schemas = [ z.object({ path: z.string(), type: z.literal("ConfigFileMissing") }), z.object({ message: z.string(), type: z.literal("InvalidConfiguration") }), @@ -106,9 +106,7 @@ export const OpenWorkspaceByPathErrorSchema = z.any().superRefine((x, ctx) => { } }); -export type OpenWorkspaceByPathError = z.infer< - typeof OpenWorkspaceByPathErrorSchema ->; +export type OpenWorkspaceError = z.infer; export const WorkspaceSchema = z.object({ config: z.any(), path: z.string() }); @@ -132,6 +130,7 @@ export type FpxConfig = z.infer; export const FpxConfigErrorSchema = z.any().superRefine((x, ctx) => { const schemas = [ + z.literal("RootDirectoryNotFound"), z.object({ FileNotFound: z.string() }).strict(), z .object({ diff --git a/studio/src/tauri/RuntimeProvider.tsx b/studio/src/tauri/RuntimeProvider.tsx index 51908224d..3b2c4b814 100644 --- a/studio/src/tauri/RuntimeProvider.tsx +++ b/studio/src/tauri/RuntimeProvider.tsx @@ -1,7 +1,7 @@ import { type AppState, - type OpenWorkspaceByPathError, - OpenWorkspaceByPathErrorSchema, + type OpenWorkspaceError, + OpenWorkspaceErrorSchema, type Workspace, } from "@fiberplane/fpx-types"; import { useHandler } from "@fiberplane/hooks"; @@ -45,7 +45,7 @@ export function RuntimeProvider({ children }: RuntimeProviderProps) { function TauriRuntime({ children }: RuntimeProviderProps) { const [workspace, setWorkspace] = useState(); - const [error, setError] = useState(); + const [error, setError] = useState(); const handleOpenWorkspaceByPath = useHandler(async (path: string) => { const workspace = await openWorkspace(path); @@ -60,7 +60,7 @@ function TauriRuntime({ children }: RuntimeProviderProps) { } }) .catch((error) => { - const parsed = OpenWorkspaceByPathErrorSchema.safeParse(error); + const parsed = OpenWorkspaceErrorSchema.safeParse(error); if (parsed.success) { return setError(parsed.data); } diff --git a/studio/src/tauri/WorkspaceOpenError/WorkspaceOpenError.tsx b/studio/src/tauri/WorkspaceOpenError/WorkspaceOpenError.tsx index c52efb0ce..10ec49e7e 100644 --- a/studio/src/tauri/WorkspaceOpenError/WorkspaceOpenError.tsx +++ b/studio/src/tauri/WorkspaceOpenError/WorkspaceOpenError.tsx @@ -1,10 +1,10 @@ import { Button } from "@/components/ui/button"; import { Card } from "@/components/ui/card"; -import type { OpenWorkspaceByPathError } from "@fiberplane/fpx-types"; +import type { OpenWorkspaceError } from "@fiberplane/fpx-types"; import type { ReactNode } from "react"; type WorkspaceOpenErrorProps = { - error: OpenWorkspaceByPathError; + error: OpenWorkspaceError; reset: () => void; }; diff --git a/xtask/src/commands/schemas.rs b/xtask/src/commands/schemas.rs index d4a566c3f..347dd6988 100644 --- a/xtask/src/commands/schemas.rs +++ b/xtask/src/commands/schemas.rs @@ -21,7 +21,7 @@ pub async fn handle_command(args: Args) -> Result<()> { schema_for!(ClientMessage), schema_for!(ServerMessage), schema_for!(fpx_app::state::AppState), - schema_for!(fpx_app::models::workspace::OpenWorkspaceByPathError), + schema_for!(fpx_app::models::workspace::OpenWorkspaceError), schema_for!(fpx_app::models::workspace::Workspace), schema_for!(fpx::config::FpxConfig), schema_for!(fpx::config::FpxConfigError),