Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add routes endpoint #314

Open
wants to merge 9 commits into
base: tauri-main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ version = "0.1.0"
anyhow = { version = "1.0" }
axum = { version = "0.7", default-features = false }
clap = { version = "4.0", features = ["derive", "env"] }
schemars = "0.8.21"
schemars = { version = "0.8.21", features = ["url"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0" }
tokio = { version = "1.40", features = ["rt-multi-thread", "signal"] }
Expand Down
12 changes: 10 additions & 2 deletions fpx-app/src/api_manager/fpx_api.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use fpx::api;
use fpx::api::ApiConfig;
use fpx::config::FpxConfig;
use fpx::data::libsql_store::LibsqlStore;
use fpx::events::memory::InMemoryEvents;
use fpx::service::Service;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
use tauri::async_runtime::spawn;
use tokio::sync::broadcast::error::RecvError;
use tracing::{error, info, trace, warn};
Expand Down Expand Up @@ -64,10 +65,17 @@ impl ApiManager {

let service = Service::new(store.clone(), events.clone());

let config = ApiConfig {
base_url: Arc::new(RwLock::new(fpx_config.app_endpoint.map_or_else(
|| "http://localhost:8787".to_string(),
|url| url.to_string(),
))),
};

let app = api::Builder::new()
.enable_compression()
.allow_origin_any()
.build(service.clone(), store.clone());
.build(service.clone(), store.clone(), config);

let listener = tokio::net::TcpListener::from_std(listener).unwrap();
let api_server = axum::serve(listener, app).with_graceful_shutdown(async {
Expand Down
3 changes: 3 additions & 0 deletions fpx-cli/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub struct Args {
#[clap(long, env, default_value = "http://localhost:4317")]
pub otlp_endpoint: Url,

#[clap(global = true, long, env, default_value = None)]
pub app_endpoint: Option<Url>,

/// Change the fpx directory.
///
/// By default fpx will search for a `.fpx` directory in the current
Expand Down
21 changes: 17 additions & 4 deletions fpx-cli/src/commands/dev.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use crate::grpc::GrpcService;
use crate::initialize_fpx_dir;
use anyhow::{Context, Result};
use fpx::api::ApiConfig;
use fpx::data::libsql_store::LibsqlStore;
use fpx::events::memory::InMemoryEvents;
use fpx::{api, service};
use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer;
use std::future::IntoFuture;
use std::path::PathBuf;
use std::process::exit;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use tokio::select;
use tracing::{error, info, warn};
use url::Url;

#[derive(clap::Args, Debug)]
pub struct Args {
Expand All @@ -30,6 +32,9 @@ pub struct Args {
#[clap(long, env, hide = true)]
pub in_memory_database: bool,

#[arg(from_global)]
pub app_endpoint: Option<Url>,

/// fpx directory
#[arg(from_global)]
pub fpx_directory: Option<PathBuf>,
Expand All @@ -55,9 +60,17 @@ pub async fn handle_command(args: Args) -> Result<()> {

let service = service::Service::new(store.clone(), events.clone());

let app = api::Builder::new()
.enable_compression()
.build(service.clone(), store.clone());
let config = ApiConfig {
base_url: Arc::new(RwLock::new(args.app_endpoint.map_or_else(
|| "http://localhost:8787".to_string(),
|url| url.to_string(),
))),
};

let app =
api::Builder::new()
.enable_compression()
.build(service.clone(), store.clone(), config);
let grpc_service = GrpcService::new(service);

let listener = tokio::net::TcpListener::bind(&args.listen_address)
Expand Down
16 changes: 16 additions & 0 deletions fpx-cli/src/data/migrations/20240724_create_routes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
CREATE TABLE IF NOT EXISTS app_routes (
id INTEGER PRIMARY KEY,
path TEXT,
method TEXT,
handler TEXT,
handlerType TEXT,
currentlyRegistered BOOLEAN DEFAULT FALSE,
registrationOrder INTEGER DEFAULT -1,
routeOrigin TEXT DEFAULT 'discovered',
openapiSpec TEXT,
requestType TEXT DEFAULT 'http',

-- there are no enums in sqlite so we use check statements
CONSTRAINT route_origin_check CHECK (routeOrigin IN ('discovered', 'custom', 'open_api')),
CONSTRAINT request_type_check CHECK (requestType IN ('http', 'websocket'))
);
96 changes: 93 additions & 3 deletions fpx-workers/src/data.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use axum::async_trait;
use fpx::api::models::settings::Settings;
use fpx::data::models::HexEncodedId;
use fpx::data::models::{HexEncodedId, ProbedRoutes, Route};
use fpx::data::sql::SqlBuilder;
use fpx::data::{models, DbError, Result, Store, Transaction};
use serde::Deserialize;
Expand All @@ -25,6 +25,19 @@ impl D1Store {
}

async fn fetch_one<T>(&self, query: impl Into<String>, values: &[JsValue]) -> Result<T>
where
T: for<'a> Deserialize<'a>,
{
self.fetch_optional(query, values)
.await?
.ok_or(DbError::NotFound)
}

async fn fetch_optional<T>(
mellowagain marked this conversation as resolved.
Show resolved Hide resolved
&self,
query: impl Into<String>,
values: &[JsValue],
) -> Result<Option<T>>
where
T: for<'a> Deserialize<'a>,
{
Expand All @@ -37,8 +50,7 @@ impl D1Store {
let result = prepared_statement
.first(None)
.await
.map_err(|err| DbError::InternalError(err.to_string()))? // TODO: Correct error;
.ok_or(DbError::NotFound)?;
.map_err(|err| DbError::InternalError(err.to_string()))?; // TODO: Correct error;

Ok(result)
}
Expand Down Expand Up @@ -281,4 +293,82 @@ impl Store for D1Store {
})
.await
}

async fn routes_get(&self, _tx: &Transaction) -> Result<Vec<Route>> {
SendFuture::new(async {
let routes = self.fetch_all(&self.sql_builder.routes_get(), &[]).await?;

Ok(routes)
})
.await
}

async fn route_insert(&self, _tx: &Transaction, route: Route) -> Result<Route> {
SendFuture::new(async {
self.fetch_one(
&self.sql_builder.routes_insert(),
&[
route.id.into(),
route.path.into(),
route.method.into(),
route.handler.into(),
route.handler_type.into(),
route.currently_registered.into(),
route.registration_order.into(),
route.route_origin.to_string().into(),
route.openapi_spec.into(),
route.request_type.to_string().into(),
],
)
.await
})
.await
}

async fn route_delete(
&self,
_tx: &Transaction,
method: &str,
path: &str,
) -> Result<Option<Route>> {
SendFuture::new(async {
self.fetch_optional(
&self.sql_builder.routes_delete(),
&[method.into(), path.into()],
)
.await
})
.await
}

async fn probed_route_upsert(
&self,
_tx: &Transaction,
routes: ProbedRoutes,
) -> Result<ProbedRoutes> {
SendFuture::new(async {
let mut results = Vec::with_capacity(routes.routes.len());

for route in routes.routes {
// this gets coerced into a `ProbedRoute` instead of a `Route`
// but it should be fine as serde just ignores not-found fields
// methinks at least
results.push(
self.fetch_one(
&self.sql_builder.probed_route_upsert(),
&[
route.path.into(),
route.method.into(),
route.handler.into(),
route.handler_type.into(),
],
)
.await?,
);
}

Ok(ProbedRoutes { routes: results })
})
.await
}
}
10 changes: 8 additions & 2 deletions fpx-workers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use axum::async_trait;
use axum::routing::get;
use data::D1Store;
use fpx::api::models::ServerMessage;
use fpx::api::ApiConfig;
use fpx::events::ServerEvents;
use fpx::{api, service};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use tower_service::Service;
use tracing_subscriber::fmt::format::Pretty;
use tracing_subscriber::fmt::time::UtcTime;
Expand Down Expand Up @@ -53,7 +54,12 @@ async fn fetch(
let boxed_store = Arc::new(store);

let service = service::Service::new(boxed_store.clone(), boxed_events.clone());
let api_router = api::Builder::new().build(service, boxed_store);

let config = ApiConfig {
base_url: Arc::new(RwLock::new("http://localhost:8787".to_string())), // todo: find out a way to get the app url in workers
};

let api_router = api::Builder::new().build(service, boxed_store, config);

let mut router: axum::Router = axum::Router::new()
.route("/api/ws", get(ws_connect))
Expand Down
2 changes: 1 addition & 1 deletion fpx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ opentelemetry-proto = { version = "0.7", default-features = false, features = [
"with-schemars",
] }
prost = { version = "0.13", default-features = false }
schemars = { workspace = true, default-features = false, features = ["derive"] }
schemars = { workspace = true, default-features = false, features = ["derive", "url"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = { version = "1.0", default-features = false }
strum = { version = "0.26", default-features = false, features = ["derive"] }
Expand Down
27 changes: 24 additions & 3 deletions fpx/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::api::handlers::routes::{route_create, route_delete, route_get, route_probe};
use crate::data::BoxedStore;
use crate::otel::OtelTraceLayer;
use crate::service::Service;
use axum::extract::FromRef;
use axum::routing::{get, post};
use axum::routing::{delete, get, post};
use http::StatusCode;
use std::sync::{Arc, RwLock};
use tower_http::compression::CompressionLayer;
use tower_http::cors::{Any, CorsLayer};
use tower_http::decompression::RequestDecompressionLayer;
Expand All @@ -12,10 +14,16 @@ pub mod errors;
pub mod handlers;
pub mod models;

#[derive(Clone, Debug)]
pub struct ApiConfig {
pub base_url: Arc<RwLock<String>>,
}

#[derive(Clone)]
pub struct ApiState {
service: Service,
store: BoxedStore,
config: ApiConfig,
}

impl FromRef<ApiState> for BoxedStore {
Expand All @@ -30,6 +38,12 @@ impl FromRef<ApiState> for Service {
}
}

impl FromRef<ApiState> for ApiConfig {
fn from_ref(state: &ApiState) -> Self {
state.config.clone()
}
}

#[derive(Default)]
pub struct Builder {
allow_origin_any: bool,
Expand All @@ -56,8 +70,12 @@ impl Builder {
}

/// Create an API and expose it through an axum router.
pub fn build(self, service: Service, store: BoxedStore) -> axum::Router {
let api_state = ApiState { service, store };
pub fn build(self, service: Service, store: BoxedStore, config: ApiConfig) -> axum::Router {
let api_state = ApiState {
service,
store,
config,
};

let mut router = axum::Router::new()
.route("/v1/traces", post(handlers::otel::trace_collector_handler))
Expand All @@ -79,6 +97,9 @@ impl Builder {
"/v0/settings",
get(handlers::settings::settings_get).post(handlers::settings::settings_upsert),
)
.route("/v0/probed-routes", post(route_probe))
.route("/v0/app-routes", get(route_get).post(route_create))
.route("/v0/app-routes/:method/:path", delete(route_delete))
.with_state(api_state)
.fallback(StatusCode::NOT_FOUND)
.layer(OtelTraceLayer::default())
Expand Down
1 change: 1 addition & 0 deletions fpx/src/api/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod otel;
pub mod routes;
pub mod settings;
pub mod spans;
pub mod traces;
Loading
Loading