diff --git a/README.md b/README.md index 7ddcd0e2..ee0caff2 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,7 @@ bulk mode ensures high throughput when indexing to Elasticsearch. | Elasticsearch version | Plugin | Release date | | ------------------------ | -----------| -------------| +| 1.3.1 | 1.3.0.4 | Aug 5, 2014 | | 1.3.1 | 1.3.0.3 | Aug 4, 2014 | | 1.3.1 | 1.3.0.2 | Aug 2, 2014 | | 1.3.1 | 1.3.0.1 | Jul 31, 2014 | @@ -60,7 +61,7 @@ bulk mode ensures high throughput when indexing to Elasticsearch. ## Installation - ./bin/plugin --install jdbc --url http://xbib.org/repository/org/xbib/elasticsearch/plugin/elasticsearch-river-jdbc/1.3.0.3/elasticsearch-river-jdbc-1.3.0.3-plugin.zip + ./bin/plugin --install jdbc --url http://xbib.org/repository/org/xbib/elasticsearch/plugin/elasticsearch-river-jdbc/1.3.0.4/elasticsearch-river-jdbc-1.3.0.4-plugin.zip Do not forget to restart the node after installing. @@ -75,6 +76,7 @@ Change into this directory to invoke the `./bin/plugin` command line tool. | File | SHA1 | | ---------------------------------------------| -----------------------------------------| +| elasticsearch-river-jdbc-1.3.0.4-plugin.zip | dcb412285f6274ef07c05068311dacb745fe8046 | | elasticsearch-river-jdbc-1.3.0.3-plugin.zip | 7e3fe518c716305a7878fddb299f0c263fb5ed4b | | elasticsearch-river-jdbc-1.3.0.2-plugin.zip | 7f87af3055223d15238da9c81ae95ff6ea0ce934 | | elasticsearch-river-jdbc-1.3.0.1-plugin.zip | ee58c51acfb4bc2294939c655ff2f790890808bc | @@ -231,8 +233,7 @@ Example: "sql" : [ { "statement" : "select ... from ... where a = ?, b = ?, c = ?", - "parameter" : [ "value for a", "value for b", "value for c" ], - "callable" : false + "parameter" : [ "value for a", "value for b", "value for c" ] }, { "statement" : ... @@ -241,6 +242,8 @@ Example: `sql.statement` - the SQL statement +`sql.callable` - boolean flag, if true, the SQL statement is interpreted as a JDBC CallableStatement for stored procedures (default: false). + `sql.parameter` - bind parameters for the SQL statement (in order). Some special values can be used with the following meanings: * `$now` - the current timestamp @@ -255,8 +258,6 @@ Example: * `$river.state.timestamp` - last timestamp of river activity (from river state) * `$river.state.counter` - counter from river state, counts the numbers of runs -`sql.callable` - boolean flag, if true, the SQL statement is interpreted as a JDBC CallableStatement (default: false). Note: callable statement support is experimental and not well tested. - `locale` - the default locale (used for parsing numerical values, floating point character. Recommended values is "en_US") `timezone` - the timezone for JDBC setTimestamp() calls when binding parameters with timestamp values @@ -694,6 +695,57 @@ will result into the following JSON documents id= {"product":"Apples","created":1338501600000,"department":"German Fruits","quantity":2,"customer":"Good"} id= {"product":"Oranges","created":1338501600000,"department":"English Fruits","quantity":3,"customer":"Bad"} +# Stored procedures or callable statements + +Stored procedures can also be used for fetchng data, like this example fo MySQL illustrates. +See also [Using Stored Procedures](http://docs.oracle.com/javase/tutorial/jdbc/basics/storedprocedures.html) +from where the example is taken. + + create procedure GET_SUPPLIER_OF_COFFEE( + IN coffeeName varchar(32), + OUT supplierName varchar(40)) + begin + select SUPPLIERS.SUP_NAME into supplierName + from SUPPLIERS, COFFEES + where SUPPLIERS.SUP_ID = COFFEES.SUP_ID + and coffeeName = COFFEES.COF_NAME; + select supplierName; + end + +Now it is possible to call the procedure from the JDBC plugin and index the result in Elasticsearch. + + { + "jdbc" : { + "url" : "jdbc:mysql://localhost:3306/test", + "user" : "", + "password" : "", + "sql" : [ + { + "callable" : true, + "statement" : "{call GET_SUPPLIER_OF_COFFEE(?,?)}", + "parameter" : [ + "Colombian" + ], + "register" : { + "mySupplierName" : { "pos" : 2, "type" : "varchar" } + } + } + ], + "index" : "my_jdbc_river_index", + "type" : "my_jdbc_river_type" + } + } + +Note, the `parameter` lists the input parameters in the order they should be applied, like in an +ordinary statement. The `register` declares a list of output parameters in the particular order +the `pos` number indicates. It is required to declare the JDBC type in the `type` attribute. +`mySupplierName`, the key of the output parameter, is used as the Elasticsearch field name specification, +like the column name specification in an ordinary SQL statement, because column names are not available +in callable statement result sets. + +If there is more than one result sets returned by a callable statement, +the JDBC plugin enters a loop and iterates through all result sets. + # Monitoring the JDBC river state While a river/feed is running, you can monitor the activity by using the `_state` command. diff --git a/src/main/java/org/xbib/elasticsearch/river/jdbc/RiverSource.java b/src/main/java/org/xbib/elasticsearch/river/jdbc/RiverSource.java index 34bbad81..22b9349f 100644 --- a/src/main/java/org/xbib/elasticsearch/river/jdbc/RiverSource.java +++ b/src/main/java/org/xbib/elasticsearch/river/jdbc/RiverSource.java @@ -1,6 +1,7 @@ package org.xbib.elasticsearch.river.jdbc; import org.xbib.elasticsearch.plugin.jdbc.RiverContext; +import org.xbib.elasticsearch.plugin.jdbc.SQLCommand; import org.xbib.keyvalue.KeyValueStreamListener; import java.io.IOException; @@ -156,6 +157,20 @@ public interface RiverSource { void beforeRows(ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException; + boolean nextRow(ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException; + + void afterRows(ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException; + + /** + * This routine is executed before the result set is evaluated + * @param command the SQL command that created this result set + * @param results the result set + * @param listener listener for the key/value stream generated from the result set + * @throws SQLException + * @throws IOException + */ + void beforeRows(SQLCommand command, ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException; + /** * Action for the next row of the result set to be processed * @@ -165,9 +180,18 @@ public interface RiverSource { * @throws SQLException when SQL execution gives an error * @throws IOException when input/output error occurs */ - boolean nextRow(ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException; + boolean nextRow(SQLCommand command, ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException; - void afterRows(ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException; + /** + * After the result set is processed, this method is called. + * + * @param command the SQL command that created this result set + * @param results the result set + * @param listener listener for the key/value stream generated from the result set + * @throws SQLException + * @throws IOException + */ + void afterRows(SQLCommand command, ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException; /** * Parse a value in a row column diff --git a/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/column/ColumnRiverSource.java b/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/column/ColumnRiverSource.java index 20f48d9f..f07c9de3 100644 --- a/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/column/ColumnRiverSource.java +++ b/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/column/ColumnRiverSource.java @@ -106,7 +106,7 @@ private void fetch(Connection connection, SQLCommand command, OpInfo opInfo, Tim KeyValueStreamListener listener = new ColumnKeyValueStreamListener(opInfo.opType) .output(context.getRiverMouth()); - merge(result, listener); + merge(command, result, listener); } catch (Exception e) { throw new IOException(e); } finally { diff --git a/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverSource.java b/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverSource.java index a8c169c3..17c557c9 100644 --- a/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverSource.java +++ b/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverSource.java @@ -322,7 +322,7 @@ private void execute(SQLCommand command) throws Exception { RiverMouthKeyValueStreamListener listener = new RiverMouthKeyValueStreamListener() .output(context.getRiverMouth()) .shouldIgnoreNull(context.shouldIgnoreNull()); - merge(results, listener); + merge(command, results, listener); } } else { // use write connection @@ -356,7 +356,7 @@ private void executeWithParameter(SQLCommand command) throws Exception { RiverMouthKeyValueStreamListener listener = new RiverMouthKeyValueStreamListener() .output(context.getRiverMouth()) .shouldIgnoreNull(context.shouldIgnoreNull()); - merge(results, listener); + merge(command, results, listener); } else { statement = prepareUpdate(command.getSQL()); bind(statement, command.getParameters()); @@ -395,12 +395,12 @@ private void executeCallable(SQLCommand command) throws Exception { logger.debug("callable execution created result set"); while (hasRows) { // merge result set, but use register - merge(statement.getResultSet(), listener); + merge(command, statement.getResultSet(), listener); hasRows = statement.getMoreResults(); } } else { // no result set, merge from registered params only - merge(statement, command, listener); + merge(command, statement, listener); } } finally { close(statement); @@ -415,14 +415,14 @@ private void executeCallable(SQLCommand command) throws Exception { * @throws SQLException when SQL execution gives an error * @throws IOException when input/output error occurs */ - public void merge(ResultSet results, KeyValueStreamListener listener) + public void merge(SQLCommand command, ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException, ParseException { if (listener == null) { return; } - beforeRows(results, listener); + beforeRows(command, results, listener); long rows = 0L; - while (nextRow(results, listener)) { + while (nextRow(command, results, listener)) { rows++; } context.setLastRowCount(rows); @@ -431,40 +431,10 @@ public void merge(ResultSet results, KeyValueStreamListener listener) } else { logger().debug("no rows merged "); } - afterRows(results, listener); + afterRows(command, results, listener); } - /** - * Merge key/values from registered params of a callable statement - * - * @param statement callable statement - * @param listener the value listener - * @throws SQLException when SQL execution gives an error - * @throws IOException when input/output error occurs - */ - @SuppressWarnings({"unchecked"}) - public void merge(CallableStatement statement, SQLCommand command, KeyValueStreamListener listener) - throws SQLException, IOException { - Map map = command.getRegister(); - if (map.isEmpty()) { - // no register given, return without doing anything - return; - } - List keys = newLinkedList(); - List values = newLinkedList(); - for (Map.Entry entry : map.entrySet()) { - String k = entry.getKey(); - Map v = (Map) entry.getValue(); - Integer pos = (Integer) v.get("pos"); // the parameter position of the value - String field = (String) v.get("field"); // the field for indexing the value (if not key name) - keys.add(field != null ? field : k); - values.add(statement.getObject(pos)); - } - logger.trace("merge callable statement result: keys={} values={}", keys, values); - listener.keys(keys); - listener.values(values); - listener.end(); - } + /** * Prepare a query statement @@ -525,6 +495,38 @@ public SimpleRiverSource bind(PreparedStatement statement, List values) return this; } + /** + * Merge key/values from registered params of a callable statement + * + * @param statement callable statement + * @param listener the value listener + * @throws SQLException when SQL execution gives an error + * @throws IOException when input/output error occurs + */ + @SuppressWarnings({"unchecked"}) + public void merge(SQLCommand command, CallableStatement statement, KeyValueStreamListener listener) + throws SQLException, IOException { + Map map = command.getRegister(); + if (map.isEmpty()) { + // no register given, return without doing anything + return; + } + List keys = newLinkedList(); + List values = newLinkedList(); + for (Map.Entry entry : map.entrySet()) { + String k = entry.getKey(); + Map v = (Map) entry.getValue(); + Integer pos = (Integer) v.get("pos"); // the parameter position of the value + String field = (String) v.get("field"); // the field for indexing the value (if not key name) + keys.add(field != null ? field : k); + values.add(statement.getObject(pos)); + } + logger.trace("merge callable statement result: keys={} values={}", keys, values); + listener.keys(keys); + listener.values(values); + listener.end(); + } + /** * Register variables in callable statement * @@ -545,7 +547,7 @@ public SimpleRiverSource register(CallableStatement statement, Map keys = new LinkedList(); - for (int i = 1; i <= columns; i++) { - keys.add(metadata.getColumnLabel(i)); + if (command != null && command.isCallable() && !command.getRegister().isEmpty()) { + for (Map.Entry me : command.getRegister().entrySet()) { + keys.add(me.getKey()); + } + } else { + ResultSetMetaData metadata = results.getMetaData(); + int columns = metadata.getColumnCount(); + for (int i = 1; i <= columns; i++) { + keys.add(metadata.getColumnLabel(i)); + } } listener.begin(); listener.keys(keys); } + public boolean nextRow(ResultSet results, KeyValueStreamListener listener) + throws SQLException, IOException { + return nextRow(null, results, listener); + } + /** * Get next row and prepare the values for processing. The labels of each * columns are used for the ValueListener as paths for JSON object merging. * + * @param command the SQL command that created this result set * @param results the result set * @param listener the listener * @return true if row exists and was processed, false otherwise @@ -650,32 +673,38 @@ public void beforeRows(ResultSet results, KeyValueStreamListener listener) * @throws IOException when input/output error occurs */ @Override - public boolean nextRow(ResultSet results, KeyValueStreamListener listener) + public boolean nextRow(SQLCommand command, ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException { if (results.next() && (context.getRiverFlow() == null || !context.getRiverFlow().getFeeder().isInterrupted())) { - processRow(results, listener); + processRow(command, results, listener); return true; } return false; } + public void afterRows(ResultSet results, KeyValueStreamListener listener) + throws SQLException, IOException { + afterRows(null, results, listener); + } + /** * After the rows keys and values, let the listener know about the end of * the result set. * + * @param command the SQL command that created this result set * @param results the result set * @param listener the key/value stream listener * @throws SQLException when SQL execution gives an error * @throws IOException when input/output error occurs */ @Override - public void afterRows(ResultSet results, KeyValueStreamListener listener) + public void afterRows(SQLCommand command, ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException { listener.end(); } @SuppressWarnings({"unchecked"}) - private void processRow(ResultSet results, KeyValueStreamListener listener) + private void processRow(SQLCommand command, ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException { Locale locale = context != null ? context.getLocale() != null ? context.getLocale() : Locale.getDefault() : Locale.getDefault(); List values = new LinkedList(); diff --git a/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/mock/MockRiverSource.java b/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/mock/MockRiverSource.java index 7260c720..dbeb6b06 100644 --- a/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/mock/MockRiverSource.java +++ b/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/mock/MockRiverSource.java @@ -1,6 +1,7 @@ package org.xbib.elasticsearch.river.jdbc.strategy.mock; import org.xbib.elasticsearch.plugin.jdbc.RiverContext; +import org.xbib.elasticsearch.plugin.jdbc.SQLCommand; import org.xbib.elasticsearch.river.jdbc.RiverSource; import org.xbib.keyvalue.KeyValueStreamListener; @@ -107,16 +108,31 @@ public void beforeRows(ResultSet result, KeyValueStreamListener listener) throws throw new UnsupportedOperationException("Not supported yet."); } + @Override + public void beforeRows(SQLCommand command, ResultSet result, KeyValueStreamListener listener) throws SQLException, IOException { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override public boolean nextRow(ResultSet result, KeyValueStreamListener listener) throws SQLException, IOException { throw new UnsupportedOperationException("Not supported yet."); } + @Override + public boolean nextRow(SQLCommand command, ResultSet result, KeyValueStreamListener listener) throws SQLException, IOException { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override public void afterRows(ResultSet result, KeyValueStreamListener listener) throws SQLException, IOException { throw new UnsupportedOperationException("Not supported yet."); } + @Override + public void afterRows(SQLCommand command, ResultSet result, KeyValueStreamListener listener) throws SQLException, IOException { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override public Object parseType(ResultSet result, Integer num, int type, Locale locale) throws SQLException, IOException, ParseException { throw new UnsupportedOperationException("Not supported yet."); diff --git a/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/RiverStoredProcedureTests.java b/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/RiverStoredProcedureTests.java index 7edb0bd5..6abdc56b 100644 --- a/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/RiverStoredProcedureTests.java +++ b/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/RiverStoredProcedureTests.java @@ -27,18 +27,21 @@ public void testSimpleStoredProcedure(String riverResource) createRiver(riverResource); waitForInactiveRiver(); assertHits("1", 5); + logger.info("got the five hits"); } @Test @Parameters({"river9"}) - public void testRegisterStoredProcedure(String riverResource) - throws Exception { + public void testRegisterStoredProcedure(String riverResource) throws Exception { createRiver(riverResource); waitForInactiveRiver(); assertHits("1", 1); + logger.info("got the hit"); SearchResponse response = client("1").prepareSearch("my_jdbc_river_index") .setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(); - assertEquals("{supplierName=Acme, Inc.}", response.getHits().getHits()[0].getSource().toString()); + String resp = response.getHits().getHits()[0].getSource().toString(); + logger.info("resp={}", resp); + assertEquals("{mySupplierName=Acme, Inc.}", resp); } } diff --git a/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/create-suppliertables.sql b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/create-suppliertables.sql index 5c076b57..255aed0c 100644 --- a/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/create-suppliertables.sql +++ b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/create-suppliertables.sql @@ -1,6 +1,7 @@ drop table IF EXISTS SUPPLIERS drop table IF EXISTS COFFEES drop procedure IF EXISTS SHOW_SUPPLIERS +drop procedure IF EXISTS GET_SUPPLIER_OF_COFFEE create table SUPPLIERS (SUP_ID integer NOT NULL, SUP_NAME varchar(40) NOT NULL, STREET varchar(40) NOT NULL, CITY varchar(20) NOT NULL, STATE char(2) NOT NULL, ZIP char(5), PRIMARY KEY (SUP_ID)); create table COFFEES (COF_NAME varchar(32) NOT NULL, SUP_ID int NOT NULL, PRICE numeric(10,2) NOT NULL, SALES integer NOT NULL, TOTAL integer NOT NULL, PRIMARY KEY (COF_NAME), FOREIGN KEY (SUP_ID) REFERENCES SUPPLIERS (SUP_ID)); insert into SUPPLIERS values(49, 'Superior Coffee', '1 Party Place', 'Mendocino', 'CA', '95460') diff --git a/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/delete-suppliertables.sql b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/delete-suppliertables.sql index d2cb3ce4..ad268096 100644 --- a/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/delete-suppliertables.sql +++ b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/delete-suppliertables.sql @@ -1,3 +1,4 @@ drop table COFFEES drop table SUPPLIERS -drop procedure SHOW_SUPPLIERS \ No newline at end of file +drop procedure SHOW_SUPPLIERS +drop procedure GET_SUPPLIER_OF_COFFEE \ No newline at end of file diff --git a/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/river-9.json b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/river-9.json index 825e3e13..0670ac32 100644 --- a/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/river-9.json +++ b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/river-9.json @@ -11,7 +11,7 @@ "Colombian" ], "register" : { - "supplierName" : { "pos" : 2, "type" : "varchar", "field" : "mySupplier" } + "mySupplierName" : { "pos" : 2, "type" : "varchar" } } } ],