Skip to content

Commit

Permalink
feat(notification): send notification to admin when jobs failed
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Zhang <kweizh@tabbyml.com>
  • Loading branch information
zwpaper committed Jan 8, 2025
1 parent 6d46695 commit d994118
Showing 1 changed file with 53 additions and 16 deletions.
69 changes: 53 additions & 16 deletions ee/tabby-webserver/src/service/background_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tabby_schema::{
integration::IntegrationService,
job::JobService,
license::LicenseService,
notification::NotificationService,
notification::{NotificationRecipient, NotificationService},
repository::{GitRepositoryService, RepositoryService, ThirdPartyRepositoryService},
};
use third_party_integration::SchedulerGithubGitlabJob;
Expand Down Expand Up @@ -60,6 +60,15 @@ impl BackgroundJobEvent {
}
}

// marco to create a error with message and warn log and push it to errors
macro_rules! log_error {
($errors:expr, $($arg:tt)*) => {{
let msg = format!($($arg)*);
warn!("{}", msg);
$errors.push(msg);
}};
}

pub async fn start(
db: DbConn,
job_service: Arc<dyn JobService>,
Expand All @@ -81,7 +90,7 @@ pub async fn start(

tokio::spawn(async move {
loop {
tokio::select! {
let result = tokio::select! {
job = db.get_next_job_to_execute() => {
let Some(job) = job else {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
Expand All @@ -100,7 +109,7 @@ pub async fn start(
continue;
};

if let Err(err) = match event {
let result = match event {
BackgroundJobEvent::SchedulerGitRepository(repository_config) => {
let job = SchedulerGitJob::new(repository_config);
job.run(embedding.clone()).await
Expand All @@ -120,46 +129,74 @@ pub async fn start(
let job = IndexGarbageCollection;
job.run(repository_service.clone(), context_service.clone()).await
}
} {
logkit::info!(exit_code = 1; "Job failed {}", err);
} else {
logkit::info!(exit_code = 0; "Job completed successfully");
}
logger.finalize().await;
};
debug!("Background job {} completed", job.id);

match &result {
Err(err) => {
logkit::info!(exit_code = 1; "Job failed {}", err);
logger.finalize().await;
vec![err.to_string()]
},
_ => {
logkit::info!(exit_code = 0; "Job completed successfully");
logger.finalize().await;
vec![]
}
}
},
Some(now) = hourly.next() => {
let mut errors = vec![];
if let Err(err) = DbMaintainanceJob::cron(now, context_service.clone(), db.clone()).await {
warn!("Database maintainance failed: {:?}", err);
log_error!(errors, "Database maintenance failed: {:?}", err);
}

if let Err(err) = SchedulerGitJob::cron(now, git_repository_service.clone(), job_service.clone()).await {
warn!("Scheduler job failed: {:?}", err);
log_error!(errors, "Scheduler job failed: {:?}", err);
}

if let Err(err) = SyncIntegrationJob::cron(now, integration_service.clone(), job_service.clone()).await {
warn!("Sync integration job failed: {:?}", err);
log_error!(errors, "Sync integration job failed: {:?}", err);
}

if let Err(err) = SchedulerGithubGitlabJob::cron(now, third_party_repository_service.clone(), job_service.clone()).await {
warn!("Index issues job failed: {err:?}");
log_error!(errors, "Index issues job failed: {err:?}");
}

if let Err(err) = IndexGarbageCollection.run(repository_service.clone(), context_service.clone()).await {
warn!("Index garbage collection job failed: {err:?}");
log_error!(errors, "Index garbage collection job failed: {err:?}");
}

errors
},
Some(now) = daily.next() => {
let mut errors = vec![];
if let Err(err) = LicenseCheckJob::cron(now, license_service.clone(), notification_service.clone()).await {
warn!("License check job failed: {err:?}");
log_error!(errors, "License check job failed: {err:?}");
}

errors
}
else => {
warn!("Background job channel closed");
break;
return;
}
};

if !result.is_empty() {
notification_service
.create(
NotificationRecipient::Admin,
&format!(
r#"Background job failed
{}"#,
&result.join("\n\n")
),
)
.await
.unwrap();
}
}
});
}

0 comments on commit d994118

Please sign in to comment.