Skip to content

Commit

Permalink
Feature/update sqlx (#45)
Browse files Browse the repository at this point in the history
* Update sqlx version

* save Stage.step

* fmt

* cargo sqlx prepare
  • Loading branch information
xander42280 authored Sep 10, 2024
1 parent a410c74 commit 8ff3ef5
Show file tree
Hide file tree
Showing 14 changed files with 80 additions and 65 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

This file was deleted.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions service/migrations/20240910062459_update_stage_task.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Add migration script here
ALTER TABLE stage_task ADD COLUMN `step` int not null default 0 AFTER `status`;
11 changes: 11 additions & 0 deletions service/proto/src/proto/stage/v1/stage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ enum Status {
FINAL_ERROR = 8;
}

enum Step {
Init = 0;
InSplit = 1;
InProve = 2;
InAgg = 3;
InAggAll = 4;
InFinal = 5;
End = 6;
}

message BlockFileItem {
string file_name = 1;
bytes file_content = 2;
Expand Down Expand Up @@ -60,4 +70,5 @@ message GetStatusResponse {
string stark_proof_url = 5;
string solidity_verifier_url = 6;
bytes output_stream = 7;
int32 step = 8; // Step
}
9 changes: 6 additions & 3 deletions service/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub struct StageTask {
pub context: Option<String>,
pub result: Option<String>,
pub check_at: i64,
pub step: i32,
}

#[warn(unused_macros)]
Expand Down Expand Up @@ -47,7 +48,7 @@ impl Database {
) -> anyhow::Result<Vec<StageTask>> {
let rows = sqlx::query_as!(
StageTask,
"SELECT id, status, context, result, check_at from stage_task where status = ? and check_at < ? limit ?",
"SELECT id, status, context, result, check_at, step from stage_task where status = ? and check_at < ? limit ?",
status,
check_at,
limit,
Expand All @@ -61,7 +62,7 @@ impl Database {
pub async fn get_stage_task(&self, proof_id: &str) -> anyhow::Result<StageTask> {
let row = sqlx::query_as!(
StageTask,
"SELECT id, status, context, result, check_at from stage_task where id = ?",
"SELECT id, status, context, result, check_at, step from stage_task where id = ?",
proof_id,
)
.fetch_one(&self.db_pool)
Expand Down Expand Up @@ -113,10 +114,12 @@ impl Database {
proof_id: &str,
old_check_at: u64,
check_at: u64,
step: i32,
) -> anyhow::Result<u64> {
let rows_affected = sqlx::query!(
"UPDATE stage_task set check_at = ? where id = ? and check_at = ?",
"UPDATE stage_task set check_at = ?, step = ? where id = ? and check_at = ?",
check_at,
step,
proof_id,
old_check_at
)
Expand Down
1 change: 1 addition & 0 deletions service/src/stage_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ impl StageService for StageServiceSVC {
};
if let Ok(task) = task {
response.status = task.status as u32;
response.step = task.step;
let execute_only = if let Some(context) = task.context {
match serde_json::from_str::<stage::contexts::GenerateContext>(&context) {
Ok(context) => {
Expand Down
11 changes: 9 additions & 2 deletions service/src/stage_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ async fn run_stage_task(
let (tx, mut rx) = tokio::sync::mpsc::channel(128);
stage.dispatch();
loop {
let current_step = stage.step.clone();
match stage.step {
Step::InSplit => {
let split_task = stage.get_split_task();
Expand Down Expand Up @@ -164,10 +165,15 @@ async fn run_stage_task(
}
stage.dispatch();
let ts_now = now_timestamp();
if check_at + 10 < ts_now {
if check_at + 10 < ts_now || current_step != stage.step {
check_at = ts_now;
let rows_affected = db
.update_stage_task_check_at(&task.id, task.check_at as u64, check_at)
.update_stage_task_check_at(
&task.id,
task.check_at as u64,
check_at,
stage.step.clone().into(),
)
.await;
if let Ok(rows_affected) = rows_affected {
if rows_affected == 1 {
Expand Down Expand Up @@ -239,6 +245,7 @@ async fn load_stage_task(tls_config: Option<TlsConfig>, db: database::Database)
&task.id,
task.check_at as u64,
check_at,
task.step,
)
.await;
if let Ok(rows_affected) = rows_affected {
Expand Down
Loading

0 comments on commit 8ff3ef5

Please sign in to comment.