Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement API manager #263

Merged
merged 11 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 20 additions & 1 deletion fpx-app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,36 @@ repository = { workspace = true }
tauri-build = { version = "2.0.0-rc.11", features = [] }

[dependencies]
fpx = { version = "0.1.0", path = "../fpx", features = ["config"] }
anyhow = { workspace = true }
axum = { workspace = true, default-features = false, features = [
"http1",
"query",
"tokio",
"tracing",
"ws",
], optional = true }
fpx = { version = "0.1.0", path = "../fpx", features = ["config", "libsql"] }
nix = { version = "0.29", default-features = false, features = ["signal"] }
schemars = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tauri = { version = "2.0.0-rc.14", features = ["config-toml"] }
tauri-plugin-dialog = "2.0.0-rc"
tauri-plugin-store = { version = "2.0.0-rc" }
tokio = { version = "1", default-features = false, features = [
"sync",
"process",
"macros",
] }
tracing = { version = "0.1" }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

[features]
# This feature is used for production builds or when a dev server is not specified, DO NOT REMOVE!!
custom-protocol = ["tauri/custom-protocol"]

# This feature enables the new Rust API. If not enabled it will use the TS API.
fpx-api = ["dep:axum"]

[target.'cfg(not(any(target_os = "android", target_os = "ios")))'.dependencies]
tauri-plugin-window-state = "2.0.0-rc"
2 changes: 1 addition & 1 deletion fpx-app/Tauri.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ identifier = "com.fiberplane.fpx"
[build]
devUrl = "http://localhost:5173"
beforeBuildCommand = "pnpm build:fpx-studio"
beforeDevCommand = "pnpm dev:api & pnpm dev:frontend"
beforeDevCommand = "pnpm dev:frontend"
frontendDist = "../studio/dist"

[bundle]
Expand Down
11 changes: 11 additions & 0 deletions fpx-app/src/api_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#[cfg(feature = "fpx-api")]
mod fpx_api;

#[cfg(feature = "fpx-api")]
pub use fpx_api::*;

#[cfg(not(feature = "fpx-api"))]
mod legacy;

#[cfg(not(feature = "fpx-api"))]
pub use legacy::*;
94 changes: 94 additions & 0 deletions fpx-app/src/api_manager/fpx_api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use fpx::api;
use fpx::config::FpxConfig;
use fpx::data::libsql_store::LibsqlStore;
use fpx::events::memory::InMemoryEvents;
use fpx::service::Service;
use std::sync::{Arc, Mutex};
use tauri::async_runtime::spawn;
use tokio::sync::broadcast::error::RecvError;
use tracing::{error, info, trace, warn};

#[derive(Debug, Default)]
pub struct ApiManager {
// Sending a message on this channel will shutdown the axum server.
shutdown_tx: Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
}

impl ApiManager {
pub fn start_api(&self, fpx_config: FpxConfig) {
let mut shutdown_tx = self.shutdown_tx.lock().expect("lock is poisoned");
if let Some(shutdown_tx) = shutdown_tx.take() {
// shutdown any existing api server
let _ = shutdown_tx.send(());
}

// Start a listener early, so that we can handle the issue where another
// process is listening already on that specific port.
let listen_port = fpx_config.listen_port.unwrap_or(6767);
let listener = std::net::TcpListener::bind(format!("127.0.0.1:{listen_port}")).unwrap();
listener.set_nonblocking(true).unwrap();

let (shutdown, on_shutdown) = tokio::sync::oneshot::channel::<()>();
*shutdown_tx = Some(shutdown);

spawn(async move {
let store = LibsqlStore::in_memory().await.unwrap();
LibsqlStore::migrate(&store).await.unwrap();
let store = Arc::new(store);

// Create a event sink that simply logs
let events = InMemoryEvents::new();
let events = Arc::new(events);

// Our current implementation simply logs the events.
let mut reader = events.subscribe();
spawn(async move {
loop {
match reader.recv().await {
Ok(message) => {
// Here we can do something with events, like
// emitting them to the frontend:
// window.emit("api_message", message).expect("emit failed");
info!("Received message: {:?}", message);
}
Err(RecvError::Lagged(i)) => {
warn!(lagged = i, "Event reader lagged behind");
}
Err(RecvError::Closed) => {
trace!("Event reader loop stopped");
break;
}
}
}
});

let service = Service::new(store.clone(), events.clone());

let app = api::Builder::new()
.enable_compression()
.build(service.clone(), store.clone());

let listener = tokio::net::TcpListener::from_std(listener).unwrap();
let api_server = axum::serve(listener, app).with_graceful_shutdown(async {
// Once we receive something on the [`on_shutdown`] channel,
// we'll resolve this future, and thus axum will shutdown.
// We are wrapping this in another future because of the
// incompatible return type of the oneshot channel.
let _ = on_shutdown.await;
trace!("Received API shutdown signal");
});

if let Err(err) = api_server.await {
error!(?err, "API server returned an error");
};
});
}

pub fn stop_api(&self) {
let mut shutdown_tx = self.shutdown_tx.lock().expect("lock is poisoned");
if let Some(shutdown_tx) = shutdown_tx.take() {
// shutdown any existing api servers
let _ = shutdown_tx.send(());
}
}
}
88 changes: 88 additions & 0 deletions fpx-app/src/api_manager/legacy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use fpx::config::FpxConfig;
use nix::sys::signal::{killpg, Signal};
use nix::unistd::Pid;
use std::os::unix::process::CommandExt;
use std::process;
use std::sync::Mutex;
use tauri::async_runtime::spawn;
use tracing::{error, trace, warn};

#[derive(Debug, Default)]
pub struct ApiManager {
api_pid: Mutex<Option<Pid>>,
}

impl ApiManager {
/// Start a API server. If a API pid is already set, then that will first be
/// shutdown.
pub fn start_api(&self, fpx_config: FpxConfig) {
// Get a lock for the duration of this function
let mut api_pid = self.api_pid.lock().expect("lock is poisoned");

// If there is a API pid already there, then first send the SIGTERM
// signal to that process group.
if let Some(api_pid) = api_pid.take() {
// shutdown any existing api server
Self::send_sigterm_signal(api_pid);
}

// Create some environment variables overrides based on the fpx.toml
let mut envs: Vec<(&str, String)> = vec![];
if let Some(listen_port) = fpx_config.listen_port {
envs.push(("FPX_PORT", listen_port.to_string()));
}

// Start the process using pnpm. The process_group=0 will ensure that
// the process group ID is the same as the root process ID.
let mut child_process = process::Command::new("pnpm")
.arg("dev:api")
.process_group(0) //
.envs(envs)
.spawn()
.expect("failed to execute pnpm dev:api");

// Once the process is running, get the pid and store it in the mutex,
// so that we can send signals to it later.
let pid = child_process.id();
*api_pid = Some(Pid::from_raw(pid as i32));

// Spawn a task to wait for the child process to exit, and potentially
// log an error
spawn(async move {
let result = child_process.wait();
if let Err(err) = result {
error!(?err, api_pid=?pid, "child process exited with error");
} else {
trace!(api_pid=?pid, "API server exited successfully");
}
});
}

/// Sends the SIGTERM signal to the API process group. If no API pid was set
/// then this function will do nothing.
pub fn stop_api(&self) {
let Some(api_pid) = self.api_pid.lock().expect("lock is poisoned").take() else {
trace!("No API running");
return;
};

Self::send_sigterm_signal(api_pid)
}

/// Send the SIGTERM signal to the specified process group.
///
/// This uses a Process ID type instead of a specific process group ID as
/// that does not exist.
fn send_sigterm_signal(api_pid: Pid) {
trace!(?api_pid, "sending SIGTERM signal to API process group");

let result = killpg(api_pid, Signal::SIGTERM);
if let Err(errno) = result {
warn!(
?errno,
?api_pid,
"failed to send SIGNTERM signal to API process group"
);
}
}
}
29 changes: 17 additions & 12 deletions fpx-app/src/commands/workspace.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::models::workspace::{OpenWorkspaceByPathError, Workspace};
use crate::api_manager::ApiManager;
use crate::models::workspace::{OpenWorkspaceError, Workspace};
use crate::state::AppState;
use crate::STORE_PATH;
use fpx::config::{FpxConfig, FpxConfigError};
Expand Down Expand Up @@ -37,22 +38,23 @@ pub fn get_current_workspace(state: State<'_, AppState>) -> Option<Workspace> {
#[tauri::command]
pub fn open_workspace_by_path<R: Runtime>(
path: String,
state: State<'_, AppState>,
app_state: State<'_, AppState>,
api_manager: State<'_, ApiManager>,
app: AppHandle<R>,
stores: State<'_, StoreCollection<R>>,
) -> Result<Workspace, OpenWorkspaceByPathError> {
) -> Result<Workspace, OpenWorkspaceError> {
api_manager.stop_api();

let path_buf = PathBuf::from(path.clone());
let config = match FpxConfig::load(Some(path_buf)) {
Ok((config, _config_path)) => config,
Err(err) => {
return Err(match err {
FpxConfigError::FileNotFound(path_buf) => {
OpenWorkspaceByPathError::ConfigFileMissing {
path: path_buf.to_string_lossy().to_string(),
}
}
FpxConfigError::FileNotFound(path_buf) => OpenWorkspaceError::ConfigFileMissing {
path: path_buf.to_string_lossy().to_string(),
},
FpxConfigError::InvalidFpxConfig { message, .. } => {
OpenWorkspaceByPathError::InvalidConfiguration { message }
OpenWorkspaceError::InvalidConfiguration { message }
}
FpxConfigError::RootDirectoryNotFound => {
unreachable!("FpxConfig::load takes a path, so this cannot occur")
Expand All @@ -61,16 +63,18 @@ pub fn open_workspace_by_path<R: Runtime>(
}
};

api_manager.start_api(config.clone());

let workspace = Workspace::new(path.clone(), config);
state.set_workspace(workspace.clone());
app_state.set_workspace(workspace.clone());

with_store(app, stores, STORE_PATH, |store| {
let mut recents: Vec<String> = store
.get(RECENT_WORKSPACES_STORE_KEY)
.and_then(|value| value.as_array())
.map(|arr| {
arr.iter()
.filter_map(|item| item.as_str().filter(|s| s == &path).map(|s| s.to_string()))
.filter_map(|item| item.as_str().filter(|s| s != &path).map(|s| s.to_string()))
.collect()
})
.unwrap_or_default();
Expand All @@ -89,6 +93,7 @@ pub fn open_workspace_by_path<R: Runtime>(
}

#[tauri::command]
pub fn close_workspace(state: State<'_, AppState>) {
pub fn close_workspace(state: State<'_, AppState>, api_manager: State<'_, ApiManager>) {
api_manager.stop_api();
state.close_workspace();
}
Loading
Loading