Skip to content

Commit

Permalink
WatchImporter: Periodically scan for changes
Browse files Browse the repository at this point in the history
When pywb closes a WARC file we don't get any notification. So periodically scan to ensure we archive any closed WARCs.
  • Loading branch information
ato committed Oct 10, 2023
1 parent 68df153 commit 02b9fa3
Showing 1 changed file with 41 additions and 17 deletions.
58 changes: 41 additions & 17 deletions ui/src/bamboo/task/WatchImporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static java.nio.file.StandardWatchEventKinds.*;

Expand All @@ -21,40 +25,49 @@
*/
public class WatchImporter implements Runnable {
final static Logger log = LoggerFactory.getLogger(WatchImporter.class);
final Map<Path,Config.Watch> watches = new HashMap<>();
final Map<Path, Config.Watch> watches = new HashMap<>();
final Warcs warcs;
final Crawls crawls;
final Collections collections;
final CdxIndexer cdxIndexer;
/**
* Lock to prevent timer initiated scans from running currently with handling of filesystem event notifications.
*/
private final Lock scanLock = new ReentrantLock();

public WatchImporter(Collections collections, Crawls crawls, CdxIndexer cdxIndexer, Warcs warcs, List<Config.Watch> watches) {
this.collections = collections;
this.crawls = crawls;
this.cdxIndexer = cdxIndexer;
this.warcs = warcs;
for (Config.Watch watch: watches) {
for (Config.Watch watch : watches) {
this.watches.put(watch.dir, watch);
}
}

public void run() {
ScheduledExecutorService scanExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
var thread = new Thread(runnable, "WatchImporter periodic scanner");
thread.setDaemon(true);
return thread;
});
try (WatchService watcher = FileSystems.getDefault().newWatchService()) {


for (Config.Watch watch : watches.values()) {
log.info("Watching " + watch.dir + " for modified WARCs");
watch.dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
}

scanForChanges();
scanExecutor.scheduleWithFixedDelay(this::scanForChanges, 60, 60, java.util.concurrent.TimeUnit.SECONDS);

for (WatchKey key = watcher.take(); key.isValid(); key = watcher.take()) {
Config.Watch watch = watches.get(key.watchable());
Config.Watch watch = watches.get((Path) key.watchable());
if (watch == null) {
log.warn("Ignoring unexpected watch key " + key.watchable());
continue;
}
for (WatchEvent<?> event : key.pollEvents()) {
scanLock.lock();
try {
if (event.kind() == OVERFLOW) {
scanForChanges();
Expand All @@ -78,17 +91,21 @@ public void run() {
handleClosedWarc(watch, path);
}
} catch (IOException | UncheckedIOException e) {
e.printStackTrace();
log.error("Error handling watch event on {}", event.context());
} finally {
scanLock.unlock();
}
}

key.reset();
}
} catch (IOException e) {
e.printStackTrace();
log.error("Error watching for WARC changes", e);
throw new RuntimeException(e);
} catch (InterruptedException e) {
e.printStackTrace();
log.warn("Interrupted while watching for WARC changes", e);
} finally {
scanExecutor.shutdown();
}
}

Expand Down Expand Up @@ -171,18 +188,25 @@ private Path moveWarcToCrawlDir(Path path, Crawl crawl) throws IOException {
* Scan the entire directory for any changes we might have missed. We do this during startup or if the fs notify
* event queue overflows.
*/
private void scanForChanges() throws IOException {
for (Config.Watch watch : watches.values()) {
log.info("Scanning for changes: {}", watch.dir);
try (DirectoryStream<Path> stream = Files.newDirectoryStream(watch.dir)) {
for (Path entry : stream) {
if (entry.toString().endsWith(".warc.gz.open")) {
handleOpenWarc(watch, entry);
} else if (entry.toString().endsWith(".warc.gz")) {
handleClosedWarc(watch, entry);
private void scanForChanges() {
scanLock.lock();
try {
for (Config.Watch watch : watches.values()) {
log.info("Scanning for changes: {}", watch.dir);
try (DirectoryStream<Path> stream = Files.newDirectoryStream(watch.dir)) {
for (Path entry : stream) {
if (entry.toString().endsWith(".warc.gz.open")) {
handleOpenWarc(watch, entry);
} else if (entry.toString().endsWith(".warc.gz")) {
handleClosedWarc(watch, entry);
}
}
} catch (IOException e) {
log.error("Error scanning for WARC changes in {}", watch.dir, e);
}
}
} finally {
scanLock.unlock();
}
}
}

0 comments on commit 02b9fa3

Please sign in to comment.