From 8ff3ef54358137376b11ae47bc357ec0c9a421e1 Mon Sep 17 00:00:00 2001 From: "xander.z" <162873981+xander42280@users.noreply.github.com> Date: Tue, 10 Sep 2024 22:32:37 +0800 Subject: [PATCH] Feature/update sqlx (#45) * Update sqlx version * save Stage.step * fmt * cargo sqlx prepare --- ...8b7c750eebe2149c355059e1ccd8e3cb2fd1.json} | 19 ++++++++++++------- ...1377eecfdaa03d0db142bacb2e6a4507c857d.json | 1 - ...a1a88f815bb80a06b2dc0fcf6cc30e402cb42.json | 8 -------- ...524d87c79b9e2c1f67477670edf4bea9147ce.json | 12 ------------ ...1dfc0007758938f92a26601b8c7331a95817e.json | 12 ------------ ...2634102c948a7ef56ad769ca43602a9d4c1ff.json | 12 ------------ ...2812673cf078c8ee6c984c6256fdc2b20b8a.json} | 19 ++++++++++++------- ...3bebdb3431ab64d3c4e8a5dbfc09d2d72f021.json | 12 ++++++++++++ .../20240910062459_update_stage_task.sql | 2 ++ service/proto/src/proto/stage/v1/stage.proto | 11 +++++++++++ service/src/database.rs | 9 ++++++--- service/src/stage_service.rs | 1 + service/src/stage_worker.rs | 11 +++++++++-- stage/src/stage.rs | 16 +++++++++++++++- 14 files changed, 80 insertions(+), 65 deletions(-) rename service/.sqlx/{query-f122855dd9a5b3cd6f09651c7ca3a802d1f195e5443075dae91c63b060510a85.json => query-17d5a4d511ecd38bd04426da89778b7c750eebe2149c355059e1ccd8e3cb2fd1.json} (75%) delete mode 100644 service/.sqlx/query-72c9449fa38f73e61b9811722fb524d87c79b9e2c1f67477670edf4bea9147ce.json delete mode 100644 service/.sqlx/query-8b7eec3975fa959571e127dc3f41dfc0007758938f92a26601b8c7331a95817e.json delete mode 100644 service/.sqlx/query-91c09a7ed9ef5194edd4072ca0d2634102c948a7ef56ad769ca43602a9d4c1ff.json rename service/.sqlx/{query-15acc9e35a09d7d61d74d2aad4b19962598d5c2d9568aef6d559170ffff8ef74.json => query-aebb2877a4e58d7676b62d731f412812673cf078c8ee6c984c6256fdc2b20b8a.json} (74%) create mode 100644 service/.sqlx/query-e74ded30df05a3e6c9fb23bb77f3bebdb3431ab64d3c4e8a5dbfc09d2d72f021.json create mode 100644 service/migrations/20240910062459_update_stage_task.sql diff --git a/service/.sqlx/query-f122855dd9a5b3cd6f09651c7ca3a802d1f195e5443075dae91c63b060510a85.json b/service/.sqlx/query-17d5a4d511ecd38bd04426da89778b7c750eebe2149c355059e1ccd8e3cb2fd1.json similarity index 75% rename from service/.sqlx/query-f122855dd9a5b3cd6f09651c7ca3a802d1f195e5443075dae91c63b060510a85.json rename to service/.sqlx/query-17d5a4d511ecd38bd04426da89778b7c750eebe2149c355059e1ccd8e3cb2fd1.json index 9bc4927..9366f48 100644 --- a/service/.sqlx/query-f122855dd9a5b3cd6f09651c7ca3a802d1f195e5443075dae91c63b060510a85.json +++ b/service/.sqlx/query-17d5a4d511ecd38bd04426da89778b7c750eebe2149c355059e1ccd8e3cb2fd1.json @@ -1,6 +1,6 @@ { "db_name": "MySQL", - "query": "SELECT id, status, context, result, check_at from stage_task where id = ?", + "query": "SELECT id, status, context, result, check_at, step from stage_task where id = ?", "describe": { "columns": [ { @@ -9,7 +9,6 @@ "type_info": { "type": "VarString", "flags": "NOT_NULL | PRIMARY_KEY | NO_DEFAULT_VALUE", - "char_set": 224, "max_size": 1020 } }, @@ -19,7 +18,6 @@ "type_info": { "type": "Long", "flags": "NOT_NULL", - "char_set": 63, "max_size": 11 } }, @@ -29,7 +27,6 @@ "type_info": { "type": "Blob", "flags": "BLOB", - "char_set": 224, "max_size": 262140 } }, @@ -39,7 +36,6 @@ "type_info": { "type": "Blob", "flags": "BLOB", - "char_set": 224, "max_size": 262140 } }, @@ -49,9 +45,17 @@ "type_info": { "type": "LongLong", "flags": "NOT_NULL", - "char_set": 63, "max_size": 20 } + }, + { + "ordinal": 5, + "name": "step", + "type_info": { + "type": "Long", + "flags": "NOT_NULL", + "max_size": 11 + } } ], "parameters": { @@ -62,8 +66,9 @@ false, true, true, + false, false ] }, - "hash": "f122855dd9a5b3cd6f09651c7ca3a802d1f195e5443075dae91c63b060510a85" + "hash": "17d5a4d511ecd38bd04426da89778b7c750eebe2149c355059e1ccd8e3cb2fd1" } diff --git a/service/.sqlx/query-4f109e0faa8b4395efdd1580ef21377eecfdaa03d0db142bacb2e6a4507c857d.json b/service/.sqlx/query-4f109e0faa8b4395efdd1580ef21377eecfdaa03d0db142bacb2e6a4507c857d.json index c119347..13aad42 100644 --- a/service/.sqlx/query-4f109e0faa8b4395efdd1580ef21377eecfdaa03d0db142bacb2e6a4507c857d.json +++ b/service/.sqlx/query-4f109e0faa8b4395efdd1580ef21377eecfdaa03d0db142bacb2e6a4507c857d.json @@ -9,7 +9,6 @@ "type_info": { "type": "VarString", "flags": "NOT_NULL | PRIMARY_KEY | NO_DEFAULT_VALUE", - "char_set": 224, "max_size": 256 } } diff --git a/service/.sqlx/query-6ee88920e5f030ac336bc779cf0a1a88f815bb80a06b2dc0fcf6cc30e402cb42.json b/service/.sqlx/query-6ee88920e5f030ac336bc779cf0a1a88f815bb80a06b2dc0fcf6cc30e402cb42.json index 8096727..e012212 100644 --- a/service/.sqlx/query-6ee88920e5f030ac336bc779cf0a1a88f815bb80a06b2dc0fcf6cc30e402cb42.json +++ b/service/.sqlx/query-6ee88920e5f030ac336bc779cf0a1a88f815bb80a06b2dc0fcf6cc30e402cb42.json @@ -9,7 +9,6 @@ "type_info": { "type": "VarString", "flags": "NOT_NULL | PRIMARY_KEY | NO_DEFAULT_VALUE", - "char_set": 224, "max_size": 1020 } }, @@ -19,7 +18,6 @@ "type_info": { "type": "Long", "flags": "NOT_NULL", - "char_set": 63, "max_size": 11 } }, @@ -29,7 +27,6 @@ "type_info": { "type": "VarString", "flags": "NOT_NULL", - "char_set": 224, "max_size": 1020 } }, @@ -39,7 +36,6 @@ "type_info": { "type": "Long", "flags": "NOT_NULL", - "char_set": 63, "max_size": 11 } }, @@ -49,7 +45,6 @@ "type_info": { "type": "Long", "flags": "NOT_NULL", - "char_set": 63, "max_size": 11 } }, @@ -59,7 +54,6 @@ "type_info": { "type": "VarString", "flags": "NOT_NULL", - "char_set": 224, "max_size": 1020 } }, @@ -69,7 +63,6 @@ "type_info": { "type": "Blob", "flags": "BLOB", - "char_set": 224, "max_size": 262140 } }, @@ -79,7 +72,6 @@ "type_info": { "type": "LongLong", "flags": "NOT_NULL", - "char_set": 63, "max_size": 20 } } diff --git a/service/.sqlx/query-72c9449fa38f73e61b9811722fb524d87c79b9e2c1f67477670edf4bea9147ce.json b/service/.sqlx/query-72c9449fa38f73e61b9811722fb524d87c79b9e2c1f67477670edf4bea9147ce.json deleted file mode 100644 index a654272..0000000 --- a/service/.sqlx/query-72c9449fa38f73e61b9811722fb524d87c79b9e2c1f67477670edf4bea9147ce.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "MySQL", - "query": "UPDATE stage_task set check_at = ? where id = ? and check_at = ?", - "describe": { - "columns": [], - "parameters": { - "Right": 3 - }, - "nullable": [] - }, - "hash": "72c9449fa38f73e61b9811722fb524d87c79b9e2c1f67477670edf4bea9147ce" -} diff --git a/service/.sqlx/query-8b7eec3975fa959571e127dc3f41dfc0007758938f92a26601b8c7331a95817e.json b/service/.sqlx/query-8b7eec3975fa959571e127dc3f41dfc0007758938f92a26601b8c7331a95817e.json deleted file mode 100644 index c158d5a..0000000 --- a/service/.sqlx/query-8b7eec3975fa959571e127dc3f41dfc0007758938f92a26601b8c7331a95817e.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "MySQL", - "query": "UPDATE stage_task set check_at = ? where id = ?", - "describe": { - "columns": [], - "parameters": { - "Right": 2 - }, - "nullable": [] - }, - "hash": "8b7eec3975fa959571e127dc3f41dfc0007758938f92a26601b8c7331a95817e" -} diff --git a/service/.sqlx/query-91c09a7ed9ef5194edd4072ca0d2634102c948a7ef56ad769ca43602a9d4c1ff.json b/service/.sqlx/query-91c09a7ed9ef5194edd4072ca0d2634102c948a7ef56ad769ca43602a9d4c1ff.json deleted file mode 100644 index f0b3145..0000000 --- a/service/.sqlx/query-91c09a7ed9ef5194edd4072ca0d2634102c948a7ef56ad769ca43602a9d4c1ff.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "MySQL", - "query": "INSERT INTO stage_task (id, status, context) values (?,?,?)", - "describe": { - "columns": [], - "parameters": { - "Right": 3 - }, - "nullable": [] - }, - "hash": "91c09a7ed9ef5194edd4072ca0d2634102c948a7ef56ad769ca43602a9d4c1ff" -} diff --git a/service/.sqlx/query-15acc9e35a09d7d61d74d2aad4b19962598d5c2d9568aef6d559170ffff8ef74.json b/service/.sqlx/query-aebb2877a4e58d7676b62d731f412812673cf078c8ee6c984c6256fdc2b20b8a.json similarity index 74% rename from service/.sqlx/query-15acc9e35a09d7d61d74d2aad4b19962598d5c2d9568aef6d559170ffff8ef74.json rename to service/.sqlx/query-aebb2877a4e58d7676b62d731f412812673cf078c8ee6c984c6256fdc2b20b8a.json index 9447c83..8598eee 100644 --- a/service/.sqlx/query-15acc9e35a09d7d61d74d2aad4b19962598d5c2d9568aef6d559170ffff8ef74.json +++ b/service/.sqlx/query-aebb2877a4e58d7676b62d731f412812673cf078c8ee6c984c6256fdc2b20b8a.json @@ -1,6 +1,6 @@ { "db_name": "MySQL", - "query": "SELECT id, status, context, result, check_at from stage_task where status = ? and check_at < ? limit ?", + "query": "SELECT id, status, context, result, check_at, step from stage_task where status = ? and check_at < ? limit ?", "describe": { "columns": [ { @@ -9,7 +9,6 @@ "type_info": { "type": "VarString", "flags": "NOT_NULL | PRIMARY_KEY | NO_DEFAULT_VALUE", - "char_set": 224, "max_size": 1020 } }, @@ -19,7 +18,6 @@ "type_info": { "type": "Long", "flags": "NOT_NULL", - "char_set": 63, "max_size": 11 } }, @@ -29,7 +27,6 @@ "type_info": { "type": "Blob", "flags": "BLOB", - "char_set": 224, "max_size": 262140 } }, @@ -39,7 +36,6 @@ "type_info": { "type": "Blob", "flags": "BLOB", - "char_set": 224, "max_size": 262140 } }, @@ -49,9 +45,17 @@ "type_info": { "type": "LongLong", "flags": "NOT_NULL", - "char_set": 63, "max_size": 20 } + }, + { + "ordinal": 5, + "name": "step", + "type_info": { + "type": "Long", + "flags": "NOT_NULL", + "max_size": 11 + } } ], "parameters": { @@ -62,8 +66,9 @@ false, true, true, + false, false ] }, - "hash": "15acc9e35a09d7d61d74d2aad4b19962598d5c2d9568aef6d559170ffff8ef74" + "hash": "aebb2877a4e58d7676b62d731f412812673cf078c8ee6c984c6256fdc2b20b8a" } diff --git a/service/.sqlx/query-e74ded30df05a3e6c9fb23bb77f3bebdb3431ab64d3c4e8a5dbfc09d2d72f021.json b/service/.sqlx/query-e74ded30df05a3e6c9fb23bb77f3bebdb3431ab64d3c4e8a5dbfc09d2d72f021.json new file mode 100644 index 0000000..f6f3d0d --- /dev/null +++ b/service/.sqlx/query-e74ded30df05a3e6c9fb23bb77f3bebdb3431ab64d3c4e8a5dbfc09d2d72f021.json @@ -0,0 +1,12 @@ +{ + "db_name": "MySQL", + "query": "UPDATE stage_task set check_at = ?, step = ? where id = ? and check_at = ?", + "describe": { + "columns": [], + "parameters": { + "Right": 4 + }, + "nullable": [] + }, + "hash": "e74ded30df05a3e6c9fb23bb77f3bebdb3431ab64d3c4e8a5dbfc09d2d72f021" +} diff --git a/service/migrations/20240910062459_update_stage_task.sql b/service/migrations/20240910062459_update_stage_task.sql new file mode 100644 index 0000000..4951f0b --- /dev/null +++ b/service/migrations/20240910062459_update_stage_task.sql @@ -0,0 +1,2 @@ +-- Add migration script here +ALTER TABLE stage_task ADD COLUMN `step` int not null default 0 AFTER `status`; \ No newline at end of file diff --git a/service/proto/src/proto/stage/v1/stage.proto b/service/proto/src/proto/stage/v1/stage.proto index c82eda1..1262239 100644 --- a/service/proto/src/proto/stage/v1/stage.proto +++ b/service/proto/src/proto/stage/v1/stage.proto @@ -20,6 +20,16 @@ enum Status { FINAL_ERROR = 8; } +enum Step { + Init = 0; + InSplit = 1; + InProve = 2; + InAgg = 3; + InAggAll = 4; + InFinal = 5; + End = 6; +} + message BlockFileItem { string file_name = 1; bytes file_content = 2; @@ -60,4 +70,5 @@ message GetStatusResponse { string stark_proof_url = 5; string solidity_verifier_url = 6; bytes output_stream = 7; + int32 step = 8; // Step } \ No newline at end of file diff --git a/service/src/database.rs b/service/src/database.rs index c4af67e..9dd9d2b 100644 --- a/service/src/database.rs +++ b/service/src/database.rs @@ -6,6 +6,7 @@ pub struct StageTask { pub context: Option, pub result: Option, pub check_at: i64, + pub step: i32, } #[warn(unused_macros)] @@ -47,7 +48,7 @@ impl Database { ) -> anyhow::Result> { let rows = sqlx::query_as!( StageTask, - "SELECT id, status, context, result, check_at from stage_task where status = ? and check_at < ? limit ?", + "SELECT id, status, context, result, check_at, step from stage_task where status = ? and check_at < ? limit ?", status, check_at, limit, @@ -61,7 +62,7 @@ impl Database { pub async fn get_stage_task(&self, proof_id: &str) -> anyhow::Result { let row = sqlx::query_as!( StageTask, - "SELECT id, status, context, result, check_at from stage_task where id = ?", + "SELECT id, status, context, result, check_at, step from stage_task where id = ?", proof_id, ) .fetch_one(&self.db_pool) @@ -113,10 +114,12 @@ impl Database { proof_id: &str, old_check_at: u64, check_at: u64, + step: i32, ) -> anyhow::Result { let rows_affected = sqlx::query!( - "UPDATE stage_task set check_at = ? where id = ? and check_at = ?", + "UPDATE stage_task set check_at = ?, step = ? where id = ? and check_at = ?", check_at, + step, proof_id, old_check_at ) diff --git a/service/src/stage_service.rs b/service/src/stage_service.rs index 31dbb8f..e58c4e1 100644 --- a/service/src/stage_service.rs +++ b/service/src/stage_service.rs @@ -94,6 +94,7 @@ impl StageService for StageServiceSVC { }; if let Ok(task) = task { response.status = task.status as u32; + response.step = task.step; let execute_only = if let Some(context) = task.context { match serde_json::from_str::(&context) { Ok(context) => { diff --git a/service/src/stage_worker.rs b/service/src/stage_worker.rs index 7c46330..ddebcea 100644 --- a/service/src/stage_worker.rs +++ b/service/src/stage_worker.rs @@ -55,6 +55,7 @@ async fn run_stage_task( let (tx, mut rx) = tokio::sync::mpsc::channel(128); stage.dispatch(); loop { + let current_step = stage.step.clone(); match stage.step { Step::InSplit => { let split_task = stage.get_split_task(); @@ -164,10 +165,15 @@ async fn run_stage_task( } stage.dispatch(); let ts_now = now_timestamp(); - if check_at + 10 < ts_now { + if check_at + 10 < ts_now || current_step != stage.step { check_at = ts_now; let rows_affected = db - .update_stage_task_check_at(&task.id, task.check_at as u64, check_at) + .update_stage_task_check_at( + &task.id, + task.check_at as u64, + check_at, + stage.step.clone().into(), + ) .await; if let Ok(rows_affected) = rows_affected { if rows_affected == 1 { @@ -239,6 +245,7 @@ async fn load_stage_task(tls_config: Option, db: database::Database) &task.id, task.check_at as u64, check_at, + task.step, ) .await; if let Ok(rows_affected) = rows_affected { diff --git a/stage/src/stage.rs b/stage/src/stage.rs index 137353e..3ee7c6b 100644 --- a/stage/src/stage.rs +++ b/stage/src/stage.rs @@ -14,7 +14,7 @@ pub fn get_timestamp() -> u64 { let duration_since_epoch = now.duration_since(UNIX_EPOCH).unwrap(); duration_since_epoch.as_secs() } -#[derive(Default, PartialEq)] +#[derive(Default, PartialEq, Clone)] pub enum Step { #[default] Init, @@ -26,6 +26,20 @@ pub enum Step { End, } +impl From for i32 { + fn from(item: Step) -> i32 { + match item { + Step::Init => 0, + Step::InSplit => 1, + Step::InProve => 2, + Step::InAgg => 3, + Step::InAggAll => 4, + Step::InFinal => 5, + Step::End => 6, + } + } +} + #[derive(Default)] pub struct Stage { pub generate_context: GenerateContext,