diff --git a/service/Cargo.toml b/service/Cargo.toml index 89885a9..94f77a1 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -15,7 +15,7 @@ prost = "0.11.0" tokio = { version = "1.21.0", features = ["macros", "rt-multi-thread", "signal"] } once_cell = "1.8" uuid = { version = "1.2", features = ["v4", "fast-rng", "macro-diagnostics"] } -mysql_async = { version = "*", default-features = false, features = ["minimal"]} +sqlx = { version = "0.7", features = ["mysql", "runtime-tokio" ] } serde = "1.0.92" serde_json = "1.0" serde_derive = "1.0.92" diff --git a/service/src/database.rs b/service/src/database.rs index 62aeffa..fd08ca4 100644 --- a/service/src/database.rs +++ b/service/src/database.rs @@ -1,74 +1,42 @@ -// use mysql_async::prelude::*; - pub struct Database { - // db_pool: mysql_async::Pool, + db_pool: sqlx::mysql::MySqlPool, } impl Database { - pub fn new(_database_url: &str) -> Self { - // let db_pool = mysql_async::Pool::new(database_url); - Database {} + pub fn new(database_url: &str) -> Self { + let db_pool = sqlx::mysql::MySqlPool::connect_lazy(database_url).unwrap(); + Database { db_pool } } - // #[allow(dead_code)] - // pub async fn insert_stage_task( - // &self, - // proof_id: &str, - // status: i32, - // context: &str, - // ) -> std::result::Result { - // let mut conn = self.db_pool.get_conn().await.map_err(|e| (e.to_string()))?; - // let stmt = conn - // .prep("INSERT INTO stage_task (id, status, context) values (:id, :status, :context)") - // .await - // .map_err(|e| (e.to_string()))?; - // let params = mysql_async::Params::from(vec![ - // ( - // String::from("id"), - // mysql_async::Value::Bytes(proof_id.as_bytes().to_vec()), - // ), - // ( - // String::from("status"), - // mysql_async::Value::Int(status as i64), - // ), - // ( - // String::from("context"), - // mysql_async::Value::Bytes(context.as_bytes().to_vec()), - // ), - // ]); - // let _: std::result::Result, String> = - // conn.exec(stmt, params).await.map_err(|e| (e.to_string())); - // Ok(true) - // } + #[allow(dead_code)] + pub async fn insert_stage_task( + &self, + proof_id: &str, + status: i32, + context: &str, + ) -> anyhow::Result { + sqlx::query("INSERT INTO stage_task (id, status, context) values (?,?,?)") + .bind(proof_id) + .bind(status) + .bind(context) + .execute(&self.db_pool) + .await?; + Ok(true) + } - // #[allow(dead_code)] - // pub async fn update_stage_task( - // &mut self, - // proof_id: &str, - // status: i32, - // result: &str, - // ) -> std::result::Result { - // let mut conn = self.db_pool.get_conn().await.map_err(|e| (e.to_string()))?; - // let stmt = conn - // .prep("UPDATE stage_task set status = :status, result = :result where id = :id)") - // .await - // .map_err(|e| (e.to_string()))?; - // let params = mysql_async::Params::from(vec![ - // ( - // String::from("id"), - // mysql_async::Value::Bytes(proof_id.as_bytes().to_vec()), - // ), - // ( - // String::from("status"), - // mysql_async::Value::Int(status as i64), - // ), - // ( - // String::from("result"), - // mysql_async::Value::Bytes(result.as_bytes().to_vec()), - // ), - // ]); - // let _: std::result::Result, String> = - // conn.exec(stmt, params).await.map_err(|e| (e.to_string())); - // Ok(true) - // } + #[allow(dead_code)] + pub async fn update_stage_task( + &mut self, + proof_id: &str, + status: i32, + result: &str, + ) -> anyhow::Result { + sqlx::query("UPDATE stage_task set status = ?, result = ? where id = ?)") + .bind(status) + .bind(result) + .bind(proof_id) + .execute(&self.db_pool) + .await?; + Ok(true) + } } diff --git a/service/src/stage_service.rs b/service/src/stage_service.rs index ae7639b..7a4370a 100644 --- a/service/src/stage_service.rs +++ b/service/src/stage_service.rs @@ -30,7 +30,7 @@ lazy_static! { pub struct StageServiceSVC { tls_config: Option, - _storage: database::Database, + _db: database::Database, } impl StageServiceSVC { @@ -48,10 +48,10 @@ impl StageServiceSVC { None }; let database_url = config.database_url.as_str(); - let storage = database::Database::new(database_url); + let db = database::Database::new(database_url); Ok(StageServiceSVC { tls_config, - _storage: storage, + _db: db, }) } } @@ -149,7 +149,7 @@ impl StageService for StageServiceSVC { ); // let _ = self - // .storage + // .db // .insert_stage_task( // &request.get_ref().proof_id, // stage_service::ExecutorError::Unspecified as i32,