Skip to content

Commit

Permalink
feat: task filter with generic field #1624
Browse files Browse the repository at this point in the history
  • Loading branch information
bamthomas committed Dec 18, 2024
1 parent 6514f73 commit fc8c1e9
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import me.xuender.unidecode.Unidecode;

import java.util.List;
import java.util.Map;

public class StringUtils {
public static String normalize(String unicoded) {
return Unidecode.decode(unicoded).trim().replaceAll("(\\s+)", " ").toLowerCase();
Expand All @@ -10,4 +13,18 @@ public static String normalize(String unicoded) {
public static boolean isEmpty(String str) {
return str == null || str.trim().isEmpty();
}

public static Object getValue(Map<String, Object> map, String dottedKey) {
List<String> jsonKeys = List.of(dottedKey.split("\\."));
Map<String, Object> node = map;
for (String key: jsonKeys) {
Object o = node.get(key);
if (o instanceof Map<?,?>) {
node = (Map<String, Object>) o;
} else if (jsonKeys.indexOf(key) == jsonKeys.size() -1) {
return o;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import net.codestory.http.errors.HttpException;
import net.codestory.http.errors.NotFoundException;
import net.codestory.http.payload.Payload;
import org.apache.commons.lang3.StringUtils;
import org.icij.datashare.PropertiesProvider;
import org.icij.datashare.asynctasks.Task;
import org.icij.datashare.asynctasks.TaskManager;
import org.icij.datashare.asynctasks.bus.amqp.UriResult;
import org.icij.datashare.batch.BatchDownload;
import org.icij.datashare.batch.BatchSearch;
import org.icij.datashare.batch.BatchSearchRecord;
Expand All @@ -41,7 +41,6 @@
import org.icij.datashare.tasks.IndexTask;
import org.icij.datashare.tasks.ScanIndexTask;
import org.icij.datashare.tasks.ScanTask;
import org.icij.datashare.asynctasks.bus.amqp.UriResult;
import org.icij.datashare.text.Project;
import org.icij.datashare.user.User;

Expand Down Expand Up @@ -69,6 +68,7 @@
import static java.util.Arrays.stream;
import static java.util.Optional.ofNullable;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static net.codestory.http.errors.NotFoundException.notFoundIfNull;
import static net.codestory.http.payload.Payload.badRequest;
import static net.codestory.http.payload.Payload.forbidden;
Expand Down Expand Up @@ -105,13 +105,15 @@ public TaskResource(final DatashareTaskFactory taskFactory, final TaskManager ta
this.batchSearchRepository = batchSearchRepository;
}
@Operation(description = "Gets all the user tasks.<br>" +
"A filter can be added with a pattern contained in the task name.",
parameters = {@Parameter(name = "filter", description = "pattern contained in the task name", in = ParameterIn.QUERY)})
"Filters can be added with <pre>name=value</pre>. For example if <pre>name=foo</pre> is given in the request url query," +
"the tasks containing the term \"foo\" are going to be returned. It can contain also dotted keys. " +
"For example if <pre>args.dataDir=bar</pre> is provided, tasks with \"dataDir\" containing \"bar\" are going to be selected.",
parameters = {@Parameter(name = "name", description = "pattern contained in the task name", in = ParameterIn.QUERY)})
@ApiResponse(responseCode = "200", description = "returns the list of tasks", useReturnTypeSchema = true)
@Get("/all")
public List<Task<?>> tasks(Context context) throws IOException {
Pattern pattern = Pattern.compile(StringUtils.isEmpty(context.get("filter")) ? ".*": String.format(".*%s.*", context.get("filter")));
return taskManager.getTasks((User) context.currentUser(), pattern);
Map<String, Pattern> filters = context.query().keys().stream().collect(toMap(s -> s, s -> Pattern.compile(String.format(".*%s.*", context.get(s)))));
return taskManager.getTasks((User) context.currentUser(), filters);
}

@Operation(description = "Gets one task with its id.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Pattern;

import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -74,6 +75,16 @@ public void tearDown() {
taskManager.clear();
}

@Test
public void test_get_tasks_filter() {
post("/api/task/batchUpdate/index/" + getClass().getResource("/docs/doc.txt").getPath().substring(1),
"{\"options\":{\"reportName\": \"foo\"}}").should().haveType("application/json");

get("/api/task/all").should().haveType("application/json").contain("IndexTask").contain("ScanTask");
get("/api/task/all?name=Index").should().contain("IndexTask").not().contain("ScanTask");
get("/api/task/all?args.dataDir=docs").should().contain("ScanTask").not().contain("IndexTask");
}

@Test
public void test_index_file() throws IOException {
RestAssert response = post("/api/task/batchUpdate/index/" + getClass().getResource("/docs/doc.txt").getPath().substring(1), "{}");
Expand All @@ -86,7 +97,6 @@ public void test_index_file() throws IOException {

assertThat(findTask(taskManager, "org.icij.datashare.tasks.IndexTask")).isNotNull();
assertThat(findTask(taskManager, "org.icij.datashare.tasks.IndexTask").get().args).excludes(entry("reportName", "extract:report:map"));

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void test_task_list_with_filter() throws IOException {
setupAppWith(new DummyUserTask<>("bar"), "bar");
String t2Id = taskManager.startTask(DummyUserTask.class, localUser("bar"), new HashMap<>());

get("/api/task/all?filter=DummyUserTask").withPreemptiveAuthentication("bar", "qux").should().
get("/api/task/all?name=DummyUserTask").withPreemptiveAuthentication("bar", "qux").should().
contain(format("{\"id\":\"%s\",\"name\":\"%s\",\"state\":\"DONE\",\"progress\":1.0",t2Id, DummyUserTask.class.getName())).
contain("\"details\":").
contain("\"uid\":\"bar\"").
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.icij.datashare.asynctasks.bus.amqp.ProgressEvent;
import org.icij.datashare.asynctasks.bus.amqp.ResultEvent;
import org.icij.datashare.asynctasks.bus.amqp.TaskEvent;
import org.icij.datashare.json.JsonObjectMapper;
import org.icij.datashare.user.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -15,11 +16,13 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.icij.datashare.text.StringUtils.getValue;

/**
* Task manager interface with default methods common for all managers implementations.
Expand All @@ -33,12 +36,22 @@ public interface TaskManager extends Closeable {
boolean stopTask(String taskId) throws IOException;

List<Task<?>> getTasks() throws IOException;
List<Task<?>> getTasks(User user, Pattern pattern) throws IOException;
List<Task<?>> clearDoneTasks() throws IOException;

boolean shutdown() throws IOException;

void clear() throws IOException;

default List<Task<?>> getTasks(User user, Map<String, Pattern> filters) throws IOException {
Stream<Task<?>> taskStream = getTasks().stream();
for (Map.Entry<String, Pattern> filter : filters.entrySet()) {
taskStream = taskStream.filter(task -> {
Map<String, Object> objectMap = JsonObjectMapper.getJson(task);
return filter.getValue().matcher(String.valueOf(getValue(objectMap, filter.getKey()))).matches();
});
}
return taskStream.filter(t -> user.equals(t.getUser())).collect(toList());
}

default int getTerminationPollingInterval() {return POLLING_INTERVAL;}
default boolean awaitTermination(int timeout, TimeUnit timeUnit) throws InterruptedException, IOException {
return !waitTasksToBeDone(timeout, timeUnit).isEmpty();
Expand Down Expand Up @@ -217,13 +230,6 @@ default void setLatch(String taskId, StateLatch stateLatch) throws IOException {
getTask(taskId).setLatch(stateLatch);
}

static List<Task<?>> getTasks(Stream<Task<?>> stream, User user, Pattern pattern) {
return stream.
filter(t -> user.equals(t.getUser())).
filter(t -> pattern.matcher(t.name).matches()).
collect(toList());
}

class TaskEventHandlingException extends RuntimeException {
public TaskEventHandlingException(Exception cause) {
super(cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -100,11 +99,6 @@ public List<Task<?>> getTasks() {
return new LinkedList<>(tasks.values());
}

@Override
public List<Task<?>> getTasks(User user, Pattern pattern) {
return TaskManager.getTasks(tasks.values().stream(), user, pattern);
}

@Override
public List<Task<?>> clearDoneTasks() {
return tasks.values().stream().filter(f -> f.getState() != Task.State.RUNNING).map(t -> tasks.remove(t.id)).collect(toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.io.Serializable;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
Expand Down Expand Up @@ -56,11 +57,6 @@ public List<Task<?>> getTasks() {
return new LinkedList<>(tasks.values());
}

@Override
public List<Task<?>> getTasks(User user, Pattern pattern) {
return TaskManager.getTasks(tasks.values().stream(), user, pattern);
}

@Override
public Void progress(String taskId, double rate) {
Task<?> taskView = tasks.get(taskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import java.nio.charset.Charset;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -72,11 +72,6 @@ public List<Task<?>> getTasks() {
return new LinkedList<>(tasks.values());
}

@Override
public List<Task<?>> getTasks(User user, Pattern pattern) {
return TaskManager.getTasks(tasks.values().stream(), user, pattern);
}

@Override
public List<Task<?>> clearDoneTasks() {
return tasks.values().stream().filter(Task::isFinished).map(t -> tasks.remove(t.id)).collect(toList());
Expand Down

0 comments on commit fc8c1e9

Please sign in to comment.