Skip to content

Commit

Permalink
[V8keNXu0] error details while using apoc.cypher.run* procedures (#3671)
Browse files Browse the repository at this point in the history
  • Loading branch information
vga91 authored Jul 21, 2023
1 parent 88649df commit 36abce1
Show file tree
Hide file tree
Showing 12 changed files with 348 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ apoc.cypher.runFile(file :: STRING?, config = {} :: MAP?) :: (row :: INTEGER?, r
|config|MAP?|{}
|===

== Config parameters
include::partial$usage/config/apoc.cypher.runExtended.adoc[]

== Output parameters
[.procedures, opts=header]
|===
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ apoc.cypher.runFiles(file :: LIST? OF STRING?, config = {} :: MAP?) :: (row :: I
|config|MAP?|{}
|===

== Config parameters
include::partial$usage/config/apoc.cypher.runExtended.adoc[]

== Output parameters
[.procedures, opts=header]
|===
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ apoc.cypher.runSchemaFile(file :: STRING?, config = {} :: MAP?) :: (row :: INTEG
|config|MAP?|{}
|===

== Config parameters
include::partial$usage/config/apoc.cypher.runExtended.adoc[]

== Output parameters
[.procedures, opts=header]
|===
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ apoc.cypher.runSchemaFiles(file :: LIST? OF STRING?, config = {} :: MAP?) :: (ro
|config|MAP?|{}
|===

== Config parameters
include::partial$usage/config/apoc.cypher.runExtended.adoc[]

== Output parameters
[.procedures, opts=header]
|===
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
The procedure support the following config parameters:

.Config parameters
[opts=header, cols="1,1,1,5"]
|===
| name | type | default | description
| reportError | boolean | false | Returns a entry row with key `error` and value the error occurred, if any.
| statistics | boolean | true | Returns an additional row with the query statistics, leveraging the `org.neo4j.graphdb.QueryStatistics` api
| timeout | long | 10 | The single query timeout (in seconds)
| queueCapacity | long | 100 | The capacity of the `java.util.concurrent.BlockingQueue` used to aggregate the results.
| parameters | Map<String, Object> | Empty map | Optional parameter map to be used with the `apoc.schema.runFile` and `apoc.schema.runFiles` procedures.
|===
58 changes: 49 additions & 9 deletions extended/src/main/java/apoc/cypher/CypherExtended.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import apoc.util.CompressionAlgo;
import apoc.util.FileUtils;
import apoc.util.QueueBasedSpliterator;
import apoc.util.QueueUtil;
import apoc.util.Util;
import apoc.util.collection.Iterators;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -93,13 +94,14 @@ public Stream<RowResult> runFiles(@Name("file") List<String> fileNames, @Name(va

// This runs the files sequentially
private Stream<RowResult> runFiles(List<String> fileNames, Map<String, Object> config, Map<String, Object> parameters, boolean schemaOperation) {
boolean reportError = Util.toBoolean(config.get("reportError"));
boolean addStatistics = Util.toBoolean(config.getOrDefault("statistics",true));
int timeout = Util.toInteger(config.getOrDefault("timeout",10));
int queueCapacity = Util.toInteger(config.getOrDefault("queueCapacity",100));
var result = fileNames.stream().flatMap(fileName -> {
final Reader reader = readerForFile(fileName);
final Scanner scanner = createScannerFor(reader);
return runManyStatements(scanner, parameters, schemaOperation, addStatistics, timeout, queueCapacity)
return runManyStatements(scanner, parameters, schemaOperation, addStatistics, timeout, queueCapacity, reportError, fileName)
.onClose(() -> Util.close(scanner, (e) -> log.info("Cannot close the scanner for file " + fileName + " because the following exception", e)));
});

Expand All @@ -120,12 +122,12 @@ public Stream<RowResult> runSchemaFiles(@Name("file") List<String> fileNames, @N
return runFiles(fileNames, config, parameters, schemaOperation);
}

private Stream<RowResult> runManyStatements(Scanner scanner, Map<String, Object> params, boolean schemaOperation, boolean addStatistics, int timeout, int queueCapacity) {
private Stream<RowResult> runManyStatements(Scanner scanner, Map<String, Object> params, boolean schemaOperation, boolean addStatistics, int timeout, int queueCapacity, boolean reportError, String fileName) {
BlockingQueue<RowResult> queue = runInSeparateThreadAndSendTombstone(queueCapacity, internalQueue -> {
if (schemaOperation) {
runSchemaStatementsInTx(scanner, internalQueue, params, addStatistics, timeout);
runSchemaStatementsInTx(scanner, internalQueue, params, addStatistics, timeout, reportError, fileName);
} else {
runDataStatementsInTx(scanner, internalQueue, params, addStatistics, timeout);
runDataStatementsInTx(scanner, internalQueue, params, addStatistics, timeout, reportError, fileName);
}
}, RowResult.TOMBSTONE);
return StreamSupport.stream(new QueueBasedSpliterator<>(queue, RowResult.TOMBSTONE, terminationGuard, Integer.MAX_VALUE), false);
Expand Down Expand Up @@ -155,39 +157,77 @@ private <T> BlockingQueue<T> runInSeparateThreadAndSendTombstone(int queueCapaci
return queue;
}

private void runDataStatementsInTx(Scanner scanner, BlockingQueue<RowResult> queue, Map<String, Object> params, boolean addStatistics, long timeout) {
private void runDataStatementsInTx(Scanner scanner, BlockingQueue<RowResult> queue, Map<String, Object> params, boolean addStatistics, long timeout, boolean reportError, String fileName) {
while (scanner.hasNext()) {
String stmt = removeShellControlCommands(scanner.next());
if (stmt.trim().isEmpty()) continue;
if (!isSchemaOperation(stmt)) {
boolean schemaOperation;
try {
schemaOperation = isSchemaOperation(stmt);
} catch (Exception e) {
getError(queue, reportError, e, fileName);
return;
}

if (!schemaOperation) {
if (isPeriodicOperation(stmt)) {
Util.inThread(pools , () -> db.executeTransactionally(stmt, params, result -> consumeResult(result, queue, addStatistics, timeout)));
Util.inThread(pools , () -> {
try {
return db.executeTransactionally(stmt, params, result -> consumeResult(result, queue, addStatistics, timeout));
} catch (Exception e) {
return getError(queue, reportError, e, fileName);
}
});
}
else {
Util.inTx(db, pools, threadTx -> {
try (Result result = threadTx.execute(stmt, params)) {
return consumeResult(result, queue, addStatistics, timeout);
} catch (Exception e) {
return getError(queue, reportError, e, fileName);
}
});
}
}
}
}

private Object getError(BlockingQueue<RowResult> queue, boolean reportError, Exception e, String fileName) {
if (reportError) {
String error = String.format("Error in `%s`:\n%s ",
fileName, e.getMessage()
);

RowResult result = new RowResult(-1, Map.of("error", error));
QueueUtil.put(queue, result, 10);
return null;
}
throw new RuntimeException(e);
}

private Scanner createScannerFor(Reader reader) {
Scanner scanner = new Scanner(reader);
scanner.useDelimiter(";\r?\n");
return scanner;
}

private void runSchemaStatementsInTx(Scanner scanner, BlockingQueue<RowResult> queue, Map<String, Object> params, boolean addStatistics, long timeout) {
private void runSchemaStatementsInTx(Scanner scanner, BlockingQueue<RowResult> queue, Map<String, Object> params, boolean addStatistics, long timeout, boolean reportError, String fileName) {
while (scanner.hasNext()) {
String stmt = removeShellControlCommands(scanner.next());
if (stmt.trim().isEmpty()) continue;
if (isSchemaOperation(stmt)) {
boolean schemaOperation;
try {
schemaOperation = isSchemaOperation(stmt);
} catch (Exception e) {
getError(queue, reportError, e, fileName);
return;
}
if (schemaOperation) {
Util.inTx(db, pools, txInThread -> {
try (Result result = txInThread.execute(stmt, params)) {
return consumeResult(result, queue, addStatistics, timeout);
} catch (Exception e) {
return getError(queue, reportError, e, fileName);
}
});
}
Expand Down
Loading

0 comments on commit 36abce1

Please sign in to comment.