Skip to content

Commit

Permalink
Implement API manager (#263)
Browse files Browse the repository at this point in the history
* Rename OpenWorkspaceByPathError to OpenWorkspaceError

Use Option<T> functions instead of doing it manually

* Initial implementation of the ApiManager

Move LibSqlStore and InMemoryEvents to fpx

* Add fpx.toml

* Resolve warnings

* Fix tests

* Add simple logging with Tracing

Rename CI from build to build_cli

* Add legacy and fpx api manager

Fix issue with duplicate recent workspace entries

* Ensure that the lock stays locked long enough (Rust API)

Previously there was a possibility that start would be called twice and two processes would be started, but only one PID would be stored

Add more docs, logs, etc

* Remove unsused file

* Fix build issue

Use 8788 as api port

* remove todo's, they are captured by issues
  • Loading branch information
hatchan authored Oct 1, 2024
1 parent 79608c0 commit e93ed08
Show file tree
Hide file tree
Showing 33 changed files with 357 additions and 103 deletions.
File renamed without changes.
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 20 additions & 1 deletion fpx-app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,36 @@ repository = { workspace = true }
tauri-build = { version = "2.0.0-rc.11", features = [] }

[dependencies]
fpx = { version = "0.1.0", path = "../fpx", features = ["config"] }
anyhow = { workspace = true }
axum = { workspace = true, default-features = false, features = [
"http1",
"query",
"tokio",
"tracing",
"ws",
], optional = true }
fpx = { version = "0.1.0", path = "../fpx", features = ["config", "libsql"] }
nix = { version = "0.29", default-features = false, features = ["signal"] }
schemars = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tauri = { version = "2.0.0-rc.14", features = ["config-toml"] }
tauri-plugin-dialog = "2.0.0-rc"
tauri-plugin-store = { version = "2.0.0-rc" }
tokio = { version = "1", default-features = false, features = [
"sync",
"process",
"macros",
] }
tracing = { version = "0.1" }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

[features]
# This feature is used for production builds or when a dev server is not specified, DO NOT REMOVE!!
custom-protocol = ["tauri/custom-protocol"]

# This feature enables the new Rust API. If not enabled it will use the TS API.
fpx-api = ["dep:axum"]

[target.'cfg(not(any(target_os = "android", target_os = "ios")))'.dependencies]
tauri-plugin-window-state = "2.0.0-rc"
2 changes: 1 addition & 1 deletion fpx-app/Tauri.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ identifier = "com.fiberplane.fpx"
[build]
devUrl = "http://localhost:5173"
beforeBuildCommand = "pnpm build:fpx-studio"
beforeDevCommand = "pnpm dev:api & pnpm dev:frontend"
beforeDevCommand = "pnpm dev:frontend"
frontendDist = "../studio/dist"

[bundle]
Expand Down
11 changes: 11 additions & 0 deletions fpx-app/src/api_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#[cfg(feature = "fpx-api")]
mod fpx_api;

#[cfg(feature = "fpx-api")]
pub use fpx_api::*;

#[cfg(not(feature = "fpx-api"))]
mod legacy;

#[cfg(not(feature = "fpx-api"))]
pub use legacy::*;
94 changes: 94 additions & 0 deletions fpx-app/src/api_manager/fpx_api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use fpx::api;
use fpx::config::FpxConfig;
use fpx::data::libsql_store::LibsqlStore;
use fpx::events::memory::InMemoryEvents;
use fpx::service::Service;
use std::sync::{Arc, Mutex};
use tauri::async_runtime::spawn;
use tokio::sync::broadcast::error::RecvError;
use tracing::{error, info, trace, warn};

#[derive(Debug, Default)]
pub struct ApiManager {
// Sending a message on this channel will shutdown the axum server.
shutdown_tx: Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
}

impl ApiManager {
pub fn start_api(&self, fpx_config: FpxConfig) {
let mut shutdown_tx = self.shutdown_tx.lock().expect("lock is poisoned");
if let Some(shutdown_tx) = shutdown_tx.take() {
// shutdown any existing api server
let _ = shutdown_tx.send(());
}

// Start a listener early, so that we can handle the issue where another
// process is listening already on that specific port.
let listen_port = fpx_config.listen_port.unwrap_or(6767);
let listener = std::net::TcpListener::bind(format!("127.0.0.1:{listen_port}")).unwrap();
listener.set_nonblocking(true).unwrap();

let (shutdown, on_shutdown) = tokio::sync::oneshot::channel::<()>();
*shutdown_tx = Some(shutdown);

spawn(async move {
let store = LibsqlStore::in_memory().await.unwrap();
LibsqlStore::migrate(&store).await.unwrap();
let store = Arc::new(store);

// Create a event sink that simply logs
let events = InMemoryEvents::new();
let events = Arc::new(events);

// Our current implementation simply logs the events.
let mut reader = events.subscribe();
spawn(async move {
loop {
match reader.recv().await {
Ok(message) => {
// Here we can do something with events, like
// emitting them to the frontend:
// window.emit("api_message", message).expect("emit failed");
info!("Received message: {:?}", message);
}
Err(RecvError::Lagged(i)) => {
warn!(lagged = i, "Event reader lagged behind");
}
Err(RecvError::Closed) => {
trace!("Event reader loop stopped");
break;
}
}
}
});

let service = Service::new(store.clone(), events.clone());

let app = api::Builder::new()
.enable_compression()
.build(service.clone(), store.clone());

let listener = tokio::net::TcpListener::from_std(listener).unwrap();
let api_server = axum::serve(listener, app).with_graceful_shutdown(async {
// Once we receive something on the [`on_shutdown`] channel,
// we'll resolve this future, and thus axum will shutdown.
// We are wrapping this in another future because of the
// incompatible return type of the oneshot channel.
let _ = on_shutdown.await;
trace!("Received API shutdown signal");
});

if let Err(err) = api_server.await {
error!(?err, "API server returned an error");
};
});
}

pub fn stop_api(&self) {
let mut shutdown_tx = self.shutdown_tx.lock().expect("lock is poisoned");
if let Some(shutdown_tx) = shutdown_tx.take() {
// shutdown any existing api servers
let _ = shutdown_tx.send(());
}
}
}
88 changes: 88 additions & 0 deletions fpx-app/src/api_manager/legacy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use fpx::config::FpxConfig;
use nix::sys::signal::{killpg, Signal};
use nix::unistd::Pid;
use std::os::unix::process::CommandExt;
use std::process;
use std::sync::Mutex;
use tauri::async_runtime::spawn;
use tracing::{error, trace, warn};

#[derive(Debug, Default)]
pub struct ApiManager {
api_pid: Mutex<Option<Pid>>,
}

impl ApiManager {
/// Start a API server. If a API pid is already set, then that will first be
/// shutdown.
pub fn start_api(&self, fpx_config: FpxConfig) {
// Get a lock for the duration of this function
let mut api_pid = self.api_pid.lock().expect("lock is poisoned");

// If there is a API pid already there, then first send the SIGTERM
// signal to that process group.
if let Some(api_pid) = api_pid.take() {
// shutdown any existing api server
Self::send_sigterm_signal(api_pid);
}

// Create some environment variables overrides based on the fpx.toml
let mut envs: Vec<(&str, String)> = vec![];
if let Some(listen_port) = fpx_config.listen_port {
envs.push(("FPX_PORT", listen_port.to_string()));
}

// Start the process using pnpm. The process_group=0 will ensure that
// the process group ID is the same as the root process ID.
let mut child_process = process::Command::new("pnpm")
.arg("dev:api")
.process_group(0) //
.envs(envs)
.spawn()
.expect("failed to execute pnpm dev:api");

// Once the process is running, get the pid and store it in the mutex,
// so that we can send signals to it later.
let pid = child_process.id();
*api_pid = Some(Pid::from_raw(pid as i32));

// Spawn a task to wait for the child process to exit, and potentially
// log an error
spawn(async move {
let result = child_process.wait();
if let Err(err) = result {
error!(?err, api_pid=?pid, "child process exited with error");
} else {
trace!(api_pid=?pid, "API server exited successfully");
}
});
}

/// Sends the SIGTERM signal to the API process group. If no API pid was set
/// then this function will do nothing.
pub fn stop_api(&self) {
let Some(api_pid) = self.api_pid.lock().expect("lock is poisoned").take() else {
trace!("No API running");
return;
};

Self::send_sigterm_signal(api_pid)
}

/// Send the SIGTERM signal to the specified process group.
///
/// This uses a Process ID type instead of a specific process group ID as
/// that does not exist.
fn send_sigterm_signal(api_pid: Pid) {
trace!(?api_pid, "sending SIGTERM signal to API process group");

let result = killpg(api_pid, Signal::SIGTERM);
if let Err(errno) = result {
warn!(
?errno,
?api_pid,
"failed to send SIGNTERM signal to API process group"
);
}
}
}
29 changes: 17 additions & 12 deletions fpx-app/src/commands/workspace.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::models::workspace::{OpenWorkspaceByPathError, Workspace};
use crate::api_manager::ApiManager;
use crate::models::workspace::{OpenWorkspaceError, Workspace};
use crate::state::AppState;
use crate::STORE_PATH;
use fpx::config::{FpxConfig, FpxConfigError};
Expand Down Expand Up @@ -37,22 +38,23 @@ pub fn get_current_workspace(state: State<'_, AppState>) -> Option<Workspace> {
#[tauri::command]
pub fn open_workspace_by_path<R: Runtime>(
path: String,
state: State<'_, AppState>,
app_state: State<'_, AppState>,
api_manager: State<'_, ApiManager>,
app: AppHandle<R>,
stores: State<'_, StoreCollection<R>>,
) -> Result<Workspace, OpenWorkspaceByPathError> {
) -> Result<Workspace, OpenWorkspaceError> {
api_manager.stop_api();

let path_buf = PathBuf::from(path.clone());
let config = match FpxConfig::load(Some(path_buf)) {
Ok((config, _config_path)) => config,
Err(err) => {
return Err(match err {
FpxConfigError::FileNotFound(path_buf) => {
OpenWorkspaceByPathError::ConfigFileMissing {
path: path_buf.to_string_lossy().to_string(),
}
}
FpxConfigError::FileNotFound(path_buf) => OpenWorkspaceError::ConfigFileMissing {
path: path_buf.to_string_lossy().to_string(),
},
FpxConfigError::InvalidFpxConfig { message, .. } => {
OpenWorkspaceByPathError::InvalidConfiguration { message }
OpenWorkspaceError::InvalidConfiguration { message }
}
FpxConfigError::RootDirectoryNotFound => {
unreachable!("FpxConfig::load takes a path, so this cannot occur")
Expand All @@ -61,16 +63,18 @@ pub fn open_workspace_by_path<R: Runtime>(
}
};

api_manager.start_api(config.clone());

let workspace = Workspace::new(path.clone(), config);
state.set_workspace(workspace.clone());
app_state.set_workspace(workspace.clone());

with_store(app, stores, STORE_PATH, |store| {
let mut recents: Vec<String> = store
.get(RECENT_WORKSPACES_STORE_KEY)
.and_then(|value| value.as_array())
.map(|arr| {
arr.iter()
.filter_map(|item| item.as_str().filter(|s| s == &path).map(|s| s.to_string()))
.filter_map(|item| item.as_str().filter(|s| s != &path).map(|s| s.to_string()))
.collect()
})
.unwrap_or_default();
Expand All @@ -89,6 +93,7 @@ pub fn open_workspace_by_path<R: Runtime>(
}

#[tauri::command]
pub fn close_workspace(state: State<'_, AppState>) {
pub fn close_workspace(state: State<'_, AppState>, api_manager: State<'_, ApiManager>) {
api_manager.stop_api();
state.close_workspace();
}
Loading

0 comments on commit e93ed08

Please sign in to comment.