diff --git a/pom.xml b/pom.xml index ba9087f0..7ecfb330 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ org.xbib.elasticsearch.plugin elasticsearch-river-jdbc - 1.3.0.3 + 1.3.0.4 jar diff --git a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/SQLCommand.java b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/SQLCommand.java index 8d95e09e..e9f4d089 100644 --- a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/SQLCommand.java +++ b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/SQLCommand.java @@ -8,12 +8,14 @@ import java.io.InputStreamReader; import java.io.Reader; import java.util.Arrays; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.regex.Pattern; +import static org.elasticsearch.common.collect.Lists.newLinkedList; +import static org.elasticsearch.common.collect.Maps.newHashMap; + /** * The SQL command */ @@ -23,9 +25,9 @@ public class SQLCommand { private static final Pattern STATEMENT_PATTERN = Pattern.compile("^\\s*(update|insert)", Pattern.CASE_INSENSITIVE); - private List params = new LinkedList(); + private List params = newLinkedList(); - private Map results = new HashMap(); + private Map register = newHashMap(); private boolean callable; @@ -80,12 +82,20 @@ public boolean isQuery() { return p3 < 0 || p1 < p2 && p1 < p3; } - public void setResults(Map results) { - this.results = results; + /** + * A register is for parameters of a callable statement. + * @param register a map for registering parameters + */ + public void setRegister(Map register) { + this.register = register; } - public Map getResults() { - return results; + /** + * Get the parameters of a callable statement + * @return the register map + */ + public Map getRegister() { + return register; } @SuppressWarnings({"unchecked"}) @@ -110,7 +120,7 @@ public static List parse(Map settings) { command.setCallable(XContentMapValues.nodeBooleanValue(m.get("callable"))); } if (m.containsKey("register")) { - command.setResults(XContentMapValues.nodeMapValue(m.get("register"), null)); + command.setRegister(XContentMapValues.nodeMapValue(m.get("register"), null)); } } else if (entry instanceof String) { command.setSQL((String) entry); diff --git a/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverSource.java b/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverSource.java index de12d117..a8c169c3 100644 --- a/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverSource.java +++ b/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverSource.java @@ -43,6 +43,8 @@ import java.util.Map; import java.util.TimeZone; +import static org.elasticsearch.common.collect.Lists.newLinkedList; + /** * Simple river source. *

@@ -383,21 +385,22 @@ private void executeCallable(SQLCommand command) throws Exception { if (!command.getParameters().isEmpty()) { bind(statement, command.getParameters()); } - if (!command.getResults().isEmpty()) { - register(statement, command.getResults()); + if (!command.getRegister().isEmpty()) { + register(statement, command.getRegister()); } boolean hasRows = statement.execute(); RiverMouthKeyValueStreamListener listener = new RiverMouthKeyValueStreamListener() .output(context.getRiverMouth()); - if (!hasRows) { - // merge from registered params - merge(statement, command, listener); - } else { + if (hasRows) { + logger.debug("callable execution created result set"); while (hasRows) { - // merge result set + // merge result set, but use register merge(statement.getResultSet(), listener); hasRows = statement.getMoreResults(); } + } else { + // no result set, merge from registered params only + merge(statement, command, listener); } } finally { close(statement); @@ -442,17 +445,22 @@ public void merge(ResultSet results, KeyValueStreamListener listener) @SuppressWarnings({"unchecked"}) public void merge(CallableStatement statement, SQLCommand command, KeyValueStreamListener listener) throws SQLException, IOException { - Map map = command.getResults(); + Map map = command.getRegister(); if (map.isEmpty()) { + // no register given, return without doing anything return; } - List keys = new LinkedList(); - List values = new LinkedList(); + List keys = newLinkedList(); + List values = newLinkedList(); for (Map.Entry entry : map.entrySet()) { - keys.add(entry.getKey()); - Map m = (Map) entry.getValue(); - values.add(statement.getObject((Integer) m.get("pos"))); + String k = entry.getKey(); + Map v = (Map) entry.getValue(); + Integer pos = (Integer) v.get("pos"); // the parameter position of the value + String field = (String) v.get("field"); // the field for indexing the value (if not key name) + keys.add(field != null ? field : k); + values.add(statement.getObject(pos)); } + logger.trace("merge callable statement result: keys={} values={}", keys, values); listener.keys(keys); listener.values(values); listener.end(); @@ -532,11 +540,18 @@ public SimpleRiverSource register(CallableStatement statement, Map me : values.entrySet()) { - // { "fieldname" : { "pos": n, "type" : "VARCHAR" }, ... } + // { "key" : { "pos": n, "type" : "VARCHAR", "field" : "fieldname" }, ... } Map m = (Map) me.getValue(); Integer n = (Integer) m.get("pos"); String type = (String) m.get("type"); - register(statement, n, type); + if (n != null && type != null) { + logger.info("n={} type={}", n, toJDBCType(type)); + try { + statement.registerOutParameter(n, toJDBCType(type)); + } catch (Throwable t) { + logger.warn("can't register out parameter " + n + " of type " + type); + } + } } return this; } @@ -938,10 +953,6 @@ private void bind(PreparedStatement statement, int i, Object value) throws SQLEx } } - private void register(CallableStatement statement, Integer pos, String type) throws SQLException { - statement.registerOutParameter(pos, toJDBCType(type)); - } - /** * Parse of value of result set * diff --git a/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/RiverStoredProcedureTests.java b/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/RiverStoredProcedureTests.java index 1256fd0a..7edb0bd5 100644 --- a/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/RiverStoredProcedureTests.java +++ b/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/RiverStoredProcedureTests.java @@ -1,14 +1,13 @@ package org.xbib.elasticsearch.river.jdbc.strategy.simple; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.QueryBuilders; import org.testng.annotations.Parameters; import org.testng.annotations.Test; import org.xbib.elasticsearch.plugin.jdbc.RiverContext; import org.xbib.elasticsearch.river.jdbc.RiverSource; import org.xbib.elasticsearch.support.helper.AbstractRiverNodeTest; -import java.sql.Connection; -import java.sql.Statement; - public class RiverStoredProcedureTests extends AbstractRiverNodeTest { @Override @@ -22,18 +21,24 @@ public RiverContext getRiverContext() { } @Test - @Parameters({"river5", "sql1", "sql2"}) - public void testSimpleStoredProcedure(String riverResource, String sql, String storedProcSQL) + @Parameters({"river8"}) + public void testSimpleStoredProcedure(String riverResource) + throws Exception { + createRiver(riverResource); + waitForInactiveRiver(); + assertHits("1", 5); + } + + @Test + @Parameters({"river9"}) + public void testRegisterStoredProcedure(String riverResource) throws Exception { - createRandomProducts(sql, 100); - // create stored procedure - Connection connection = source.getConnectionForWriting(); - Statement statement = connection.createStatement(); - statement.execute(storedProcSQL); - statement.close(); - source.closeWriting(); createRiver(riverResource); waitForInactiveRiver(); + assertHits("1", 1); + SearchResponse response = client("1").prepareSearch("my_jdbc_river_index") + .setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(); + assertEquals("{supplierName=Acme, Inc.}", response.getHits().getHits()[0].getSource().toString()); } } diff --git a/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/storedprocedure/StoredProcedureJavaDBSample.java b/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/storedprocedure/StoredProcedureJavaDBSample.java new file mode 100644 index 00000000..bd440ffe --- /dev/null +++ b/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/storedprocedure/StoredProcedureJavaDBSample.java @@ -0,0 +1,55 @@ +package org.xbib.elasticsearch.river.jdbc.strategy.simple.storedprocedure; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +public class StoredProcedureJavaDBSample { + + public static void showSuppliers(ResultSet[] rs) + throws SQLException { + + Connection con = DriverManager.getConnection("jdbc:default:connection"); + Statement stmt = null; + + String query = + "select SUPPLIERS.SUP_NAME, " + + "COFFEES.COF_NAME " + + "from SUPPLIERS, COFFEES " + + "where SUPPLIERS.SUP_ID = " + + "COFFEES.SUP_ID " + + "order by SUP_NAME"; + + stmt = con.createStatement(); + rs[0] = stmt.executeQuery(query); + } + + public static void getSupplierOfCoffee(String coffeeName, String[] supplierName) + throws SQLException { + + Connection con = DriverManager.getConnection("jdbc:default:connection"); + PreparedStatement pstmt = null; + ResultSet rs = null; + + String query = + "select SUPPLIERS.SUP_NAME " + + "from SUPPLIERS, COFFEES " + + "where " + + "SUPPLIERS.SUP_ID = COFFEES.SUP_ID " + + "and ? = COFFEES.COF_NAME"; + + pstmt = con.prepareStatement(query); + pstmt.setString(1, coffeeName); + rs = pstmt.executeQuery(); + + if (rs.next()) { + supplierName[0] = rs.getString(1); + } else { + supplierName[0] = null; + } + } + +} diff --git a/src/test/java/org/xbib/elasticsearch/support/helper/AbstractRiverNodeTest.java b/src/test/java/org/xbib/elasticsearch/support/helper/AbstractRiverNodeTest.java index db2b296d..4d107925 100644 --- a/src/test/java/org/xbib/elasticsearch/support/helper/AbstractRiverNodeTest.java +++ b/src/test/java/org/xbib/elasticsearch/support/helper/AbstractRiverNodeTest.java @@ -153,7 +153,8 @@ public void afterMethod(String stopurl, String user, String password, @Optional protected void createRiver(String resource) { try { waitForYellow("1"); - Map map = XContentHelper.convertToMap(Streams.copyToByteArray(getClass().getResourceAsStream(resource)), false).v2(); + byte[] b = Streams.copyToByteArray(getClass().getResourceAsStream(resource)); + Map map = XContentHelper.convertToMap(b, false).v2(); XContentBuilder builder = jsonBuilder() .startObject() .field("type", "jdbc") diff --git a/src/test/resources/log4j2.xml b/src/test/resources/log4j2.xml index 9e44b419..adf34acf 100644 --- a/src/test/resources/log4j2.xml +++ b/src/test/resources/log4j2.xml @@ -10,7 +10,7 @@ diff --git a/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/derby/river-5.json b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/derby/river-5.json index 9e3b67fa..ee2ee7ec 100644 --- a/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/derby/river-5.json +++ b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/derby/river-5.json @@ -5,7 +5,7 @@ "password" : "", "sql" : [ { - "statement : "{call count_products(?)}", + "statement" : "{call count_products(?)}", "parameter" : [ 1 ], "callable" : true } diff --git a/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/create-suppliertables.sql b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/create-suppliertables.sql new file mode 100644 index 00000000..5c076b57 --- /dev/null +++ b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/create-suppliertables.sql @@ -0,0 +1,15 @@ +drop table IF EXISTS SUPPLIERS +drop table IF EXISTS COFFEES +drop procedure IF EXISTS SHOW_SUPPLIERS +create table SUPPLIERS (SUP_ID integer NOT NULL, SUP_NAME varchar(40) NOT NULL, STREET varchar(40) NOT NULL, CITY varchar(20) NOT NULL, STATE char(2) NOT NULL, ZIP char(5), PRIMARY KEY (SUP_ID)); +create table COFFEES (COF_NAME varchar(32) NOT NULL, SUP_ID int NOT NULL, PRICE numeric(10,2) NOT NULL, SALES integer NOT NULL, TOTAL integer NOT NULL, PRIMARY KEY (COF_NAME), FOREIGN KEY (SUP_ID) REFERENCES SUPPLIERS (SUP_ID)); +insert into SUPPLIERS values(49, 'Superior Coffee', '1 Party Place', 'Mendocino', 'CA', '95460') +insert into SUPPLIERS values(101, 'Acme, Inc.', '99 Market Street', 'Groundsville', 'CA', '95199') +insert into SUPPLIERS values(150, 'The High Ground', '100 Coffee Lane', 'Meadows', 'CA', '93966') +insert into COFFEES values('Colombian', 00101, 7.99, 0, 0) +insert into COFFEES values('French_Roast', 00049, 8.99, 0, 0) +insert into COFFEES values('Espresso', 00150, 9.99, 0, 0) +insert into COFFEES values('Colombian_Decaf', 00101, 8.99, 0, 0) +insert into COFFEES values('French_Roast_Decaf', 00049, 9.99, 0, 0) +create procedure SHOW_SUPPLIERS() begin select SUPPLIERS.SUP_NAME, COFFEES.COF_NAME from SUPPLIERS, COFFEES where SUPPLIERS.SUP_ID = COFFEES.SUP_ID order by SUP_NAME; end +create procedure GET_SUPPLIER_OF_COFFEE(IN coffeeName varchar(32), OUT supplierName varchar(40)) begin select SUPPLIERS.SUP_NAME into supplierName from SUPPLIERS, COFFEES where SUPPLIERS.SUP_ID = COFFEES.SUP_ID and coffeeName = COFFEES.COF_NAME; select supplierName; end \ No newline at end of file diff --git a/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/delete-suppliertables.sql b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/delete-suppliertables.sql new file mode 100644 index 00000000..d2cb3ce4 --- /dev/null +++ b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/delete-suppliertables.sql @@ -0,0 +1,3 @@ +drop table COFFEES +drop table SUPPLIERS +drop procedure SHOW_SUPPLIERS \ No newline at end of file diff --git a/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/river-6.json b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/river-6.json new file mode 100644 index 00000000..21aae687 --- /dev/null +++ b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/river-6.json @@ -0,0 +1,12 @@ +{ + "jdbc" : { + "url" : "jdbc:mysql://localhost:3306/test", + "user" : "", + "password" : "", + "sql" : "select * from products", + "schedule" : "0/5 0-59 0-23 ? * *", + "bulk_flush_interval" : "1s", + "index" : "my_jdbc_river_index", + "type" : "my_jdbc_river_type" + } +} diff --git a/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/river-7.json b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/river-7.json new file mode 100644 index 00000000..1720cb69 --- /dev/null +++ b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/river-7.json @@ -0,0 +1,19 @@ +{ + "jdbc" : { + "url" : "jdbc:mysql://localhost:3306/test", + "user" : "", + "password" : "", + + "sql" : [ + { + "statement" : "select message from logs where {fn timestampdiff(SQL_TSI_HOUR, modified ,?)} > 0", + "parameter" : [ "$now" ] + } + ], + "schedule" : "0/5 0-59 0-23 ? * *", + "index" : "my_jdbc_river_index", + "type" : "my_jdbc_river_type", + "timezone" : "Asia/Jerusalem", + "locale" : "iw_IL" + } +} diff --git a/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/river-8.json b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/river-8.json new file mode 100644 index 00000000..a459f5c2 --- /dev/null +++ b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/river-8.json @@ -0,0 +1,15 @@ +{ + "jdbc" : { + "url" : "jdbc:mysql://localhost:3306/test", + "user" : "", + "password" : "", + "sql" : [ + { + "callable" : true, + "statement" : "{call SHOW_SUPPLIERS()}" + } + ], + "index" : "my_jdbc_river_index", + "type" : "my_jdbc_river_type" + } +} \ No newline at end of file diff --git a/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/river-9.json b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/river-9.json new file mode 100644 index 00000000..825e3e13 --- /dev/null +++ b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/river-9.json @@ -0,0 +1,21 @@ +{ + "jdbc" : { + "url" : "jdbc:mysql://localhost:3306/test", + "user" : "", + "password" : "", + "sql" : [ + { + "callable" : true, + "statement" : "{call GET_SUPPLIER_OF_COFFEE(?,?)}", + "parameter" : [ + "Colombian" + ], + "register" : { + "supplierName" : { "pos" : 2, "type" : "varchar", "field" : "mySupplier" } + } + } + ], + "index" : "my_jdbc_river_index", + "type" : "my_jdbc_river_type" + } +} \ No newline at end of file diff --git a/src/test/resources/testsuite/simple/derby.xml b/src/test/resources/testsuite/simple/derby.xml index 28d9f166..69c01781 100644 --- a/src/test/resources/testsuite/simple/derby.xml +++ b/src/test/resources/testsuite/simple/derby.xml @@ -43,21 +43,6 @@ - - diff --git a/src/test/resources/testsuite/simple/mysql.xml b/src/test/resources/testsuite/simple/mysql.xml index 36181f07..10540eea 100644 --- a/src/test/resources/testsuite/simple/mysql.xml +++ b/src/test/resources/testsuite/simple/mysql.xml @@ -54,4 +54,15 @@ + + + + + + + + + + + \ No newline at end of file