Skip to content

Commit

Permalink
fix: continue on errors during indexing (#72)
Browse files Browse the repository at this point in the history
fixes #71
fixes #66
  • Loading branch information
SimonThormeyer authored Aug 19, 2024
1 parent daf6381 commit bee26d6
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 14 deletions.
34 changes: 30 additions & 4 deletions index/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::fs::{create_dir_all, File};
use std::io::{self, Read};
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use tantivy::query::QueryParser;
use tantivy::schema::{Schema, TantivyDocument};
Expand All @@ -38,6 +39,7 @@ pub enum Index {
schema: SearchSchema,
reader: IndexReader,
documents_path: PathBuf,
failed_documents: Vec<String>,
},
}

Expand Down Expand Up @@ -70,6 +72,7 @@ impl Index {
schema,
reader,
documents_path,
failed_documents: vec![],
})
}

Expand Down Expand Up @@ -100,16 +103,23 @@ impl Index {
let checksum_map = self.open_checksum_map().ok();
let dir_entries = self.collect_document_files();

let new_checksum_map_result: Result<HashMap<_, _>> = dir_entries
let failed_documents: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(vec![]));

let new_checksum_map: HashMap<_, _> = dir_entries
.par_iter()
.map(|path| {
.filter_map(|path| {
let key = path.path().to_string_lossy().to_string();
let existing_checksum = checksum_map.as_ref().and_then(|map| map.get(&key));
self.process_file(path, existing_checksum)
match self.process_file(path, existing_checksum) {
Ok(success) => Some(success),
Err(e) => failed_documents.lock().ok().and_then(|mut failed_files| {
failed_files.push(format!("path: {}, error: {}", path.path().display(), e));
None
}),
}
})
.collect();

let new_checksum_map = new_checksum_map_result?;
self.store_checksum_map(new_checksum_map)?;

// We need to call .commit() explicitly to force the
Expand All @@ -131,6 +141,10 @@ impl Index {
schema,
reader,
documents_path,
failed_documents: failed_documents
.lock()
.map_err(|e| WriteError(e.to_string()))?
.to_vec(),
};
Ok(self)
} else {
Expand Down Expand Up @@ -159,6 +173,17 @@ impl Index {
}
}

pub fn failed_documents(&self) -> Result<Vec<String>> {
if let Index::Reading {
failed_documents, ..
} = self
{
Ok(failed_documents.to_vec())
} else {
Err(StateError("Reading".to_string()))
}
}

pub fn process_file(
&self,
path: &DirEntry,
Expand Down Expand Up @@ -424,6 +449,7 @@ impl Index {
}
}

/// Calculates the checksum of a file that consists of the metadata length and last modified time
fn calculate_checksum(path: &str) -> Result<(String, (u64, SystemTime))> {
let file = File::open(path).map_err(|e| CreationError(e.to_string()))?;
let metadata = file.metadata().map_err(|e| CreationError(e.to_string()))?;
Expand Down
38 changes: 28 additions & 10 deletions litt/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,17 @@ fn open_std_programm(path: String) -> Result<(), LittError> {
Ok(())
}

fn show_failed_documents_error(index: &Index) {
let failed_documents: Vec<String> = index.failed_documents().unwrap_or_default();
if !failed_documents.is_empty() {
let error_message = format!(
"The following documents failed to process:\n{}",
failed_documents.join("\n")
);
println!("{}", error_message);
}
}

fn main() -> Result<(), LittError> {
let mut index_tracker = match IndexTracker::create(".litt".into()) {
Ok(index_tracker) => index_tracker,
Expand Down Expand Up @@ -186,6 +197,7 @@ fn main() -> Result<(), LittError> {
searcher.num_docs(),
start.elapsed()
);
show_failed_documents_error(&index);
return Ok(());
}

Expand Down Expand Up @@ -223,7 +235,7 @@ fn main() -> Result<(), LittError> {
let old_num_docs = searcher.num_docs();
let start = Instant::now();
return match index.update() {
Ok(_) => {
Ok(ref updated_index) => {
println!(
"Update done. Successfully indexed {} new document pages in {:?}. Now {} document pages.",
searcher
Expand All @@ -232,6 +244,7 @@ fn main() -> Result<(), LittError> {
searcher
.num_docs(),
);
show_failed_documents_error(updated_index);
Ok(())
}
Err(e) => Err(LittError::General(e.to_string())),
Expand All @@ -242,16 +255,21 @@ fn main() -> Result<(), LittError> {
println!("Reloading index \"{}\".", index_name);
let old_num_docs = searcher.num_docs();
let start = Instant::now();
if let Err(e) = index.reload() {
return Err(LittError::General(e.to_string()));
match index.reload() {
Ok(index) => {
println!(
"Reload done. Successfully indexed {} new document pages in {:?}. Now {} document pages.",
searcher.num_docs()-old_num_docs,
start.elapsed(),
searcher.num_docs(),
);
show_failed_documents_error(&index);
return Ok(());
}
Err(e) => {
return Err(LittError::General(e.to_string()));
}
}
println!(
"Reload done. Successfully indexed {} new document pages in {:?}. Now {} document pages.",
searcher.num_docs()-old_num_docs,
start.elapsed(),
searcher.num_docs(),
);
return Ok(());
}
// do normal search
else if !cli.term.is_empty() {
Expand Down

0 comments on commit bee26d6

Please sign in to comment.