From c934b05a4b2c413561fc4ca1da5a96f19d89367c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Tue, 5 Aug 2014 23:38:27 +0200 Subject: [PATCH] Stored procedure support Add field name renaming in the register map. Change of RiverSource API, the SQL command is passed through the result set processing because it carries the callable statement register map information for the field names. --- README.md | 62 ++++++++- .../elasticsearch/river/jdbc/RiverSource.java | 28 +++- .../strategy/column/ColumnRiverSource.java | 2 +- .../strategy/simple/SimpleRiverSource.java | 127 +++++++++++------- .../jdbc/strategy/mock/MockRiverSource.java | 16 +++ .../simple/RiverStoredProcedureTests.java | 9 +- .../simple/mysql/create-suppliertables.sql | 1 + .../simple/mysql/delete-suppliertables.sql | 3 +- .../jdbc/strategy/simple/mysql/river-9.json | 2 +- 9 files changed, 188 insertions(+), 62 deletions(-) diff --git a/README.md b/README.md index 7ddcd0e2..ee0caff2 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,7 @@ bulk mode ensures high throughput when indexing to Elasticsearch. | Elasticsearch version | Plugin | Release date | | ------------------------ | -----------| -------------| +| 1.3.1 | 1.3.0.4 | Aug 5, 2014 | | 1.3.1 | 1.3.0.3 | Aug 4, 2014 | | 1.3.1 | 1.3.0.2 | Aug 2, 2014 | | 1.3.1 | 1.3.0.1 | Jul 31, 2014 | @@ -60,7 +61,7 @@ bulk mode ensures high throughput when indexing to Elasticsearch. ## Installation - ./bin/plugin --install jdbc --url http://xbib.org/repository/org/xbib/elasticsearch/plugin/elasticsearch-river-jdbc/1.3.0.3/elasticsearch-river-jdbc-1.3.0.3-plugin.zip + ./bin/plugin --install jdbc --url http://xbib.org/repository/org/xbib/elasticsearch/plugin/elasticsearch-river-jdbc/1.3.0.4/elasticsearch-river-jdbc-1.3.0.4-plugin.zip Do not forget to restart the node after installing. @@ -75,6 +76,7 @@ Change into this directory to invoke the `./bin/plugin` command line tool. | File | SHA1 | | ---------------------------------------------| -----------------------------------------| +| elasticsearch-river-jdbc-1.3.0.4-plugin.zip | dcb412285f6274ef07c05068311dacb745fe8046 | | elasticsearch-river-jdbc-1.3.0.3-plugin.zip | 7e3fe518c716305a7878fddb299f0c263fb5ed4b | | elasticsearch-river-jdbc-1.3.0.2-plugin.zip | 7f87af3055223d15238da9c81ae95ff6ea0ce934 | | elasticsearch-river-jdbc-1.3.0.1-plugin.zip | ee58c51acfb4bc2294939c655ff2f790890808bc | @@ -231,8 +233,7 @@ Example: "sql" : [ { "statement" : "select ... from ... where a = ?, b = ?, c = ?", - "parameter" : [ "value for a", "value for b", "value for c" ], - "callable" : false + "parameter" : [ "value for a", "value for b", "value for c" ] }, { "statement" : ... @@ -241,6 +242,8 @@ Example: `sql.statement` - the SQL statement +`sql.callable` - boolean flag, if true, the SQL statement is interpreted as a JDBC CallableStatement for stored procedures (default: false). + `sql.parameter` - bind parameters for the SQL statement (in order). Some special values can be used with the following meanings: * `$now` - the current timestamp @@ -255,8 +258,6 @@ Example: * `$river.state.timestamp` - last timestamp of river activity (from river state) * `$river.state.counter` - counter from river state, counts the numbers of runs -`sql.callable` - boolean flag, if true, the SQL statement is interpreted as a JDBC CallableStatement (default: false). Note: callable statement support is experimental and not well tested. - `locale` - the default locale (used for parsing numerical values, floating point character. Recommended values is "en_US") `timezone` - the timezone for JDBC setTimestamp() calls when binding parameters with timestamp values @@ -694,6 +695,57 @@ will result into the following JSON documents id= {"product":"Apples","created":1338501600000,"department":"German Fruits","quantity":2,"customer":"Good"} id= {"product":"Oranges","created":1338501600000,"department":"English Fruits","quantity":3,"customer":"Bad"} +# Stored procedures or callable statements + +Stored procedures can also be used for fetchng data, like this example fo MySQL illustrates. +See also [Using Stored Procedures](http://docs.oracle.com/javase/tutorial/jdbc/basics/storedprocedures.html) +from where the example is taken. + + create procedure GET_SUPPLIER_OF_COFFEE( + IN coffeeName varchar(32), + OUT supplierName varchar(40)) + begin + select SUPPLIERS.SUP_NAME into supplierName + from SUPPLIERS, COFFEES + where SUPPLIERS.SUP_ID = COFFEES.SUP_ID + and coffeeName = COFFEES.COF_NAME; + select supplierName; + end + +Now it is possible to call the procedure from the JDBC plugin and index the result in Elasticsearch. + + { + "jdbc" : { + "url" : "jdbc:mysql://localhost:3306/test", + "user" : "", + "password" : "", + "sql" : [ + { + "callable" : true, + "statement" : "{call GET_SUPPLIER_OF_COFFEE(?,?)}", + "parameter" : [ + "Colombian" + ], + "register" : { + "mySupplierName" : { "pos" : 2, "type" : "varchar" } + } + } + ], + "index" : "my_jdbc_river_index", + "type" : "my_jdbc_river_type" + } + } + +Note, the `parameter` lists the input parameters in the order they should be applied, like in an +ordinary statement. The `register` declares a list of output parameters in the particular order +the `pos` number indicates. It is required to declare the JDBC type in the `type` attribute. +`mySupplierName`, the key of the output parameter, is used as the Elasticsearch field name specification, +like the column name specification in an ordinary SQL statement, because column names are not available +in callable statement result sets. + +If there is more than one result sets returned by a callable statement, +the JDBC plugin enters a loop and iterates through all result sets. + # Monitoring the JDBC river state While a river/feed is running, you can monitor the activity by using the `_state` command. diff --git a/src/main/java/org/xbib/elasticsearch/river/jdbc/RiverSource.java b/src/main/java/org/xbib/elasticsearch/river/jdbc/RiverSource.java index 34bbad81..22b9349f 100644 --- a/src/main/java/org/xbib/elasticsearch/river/jdbc/RiverSource.java +++ b/src/main/java/org/xbib/elasticsearch/river/jdbc/RiverSource.java @@ -1,6 +1,7 @@ package org.xbib.elasticsearch.river.jdbc; import org.xbib.elasticsearch.plugin.jdbc.RiverContext; +import org.xbib.elasticsearch.plugin.jdbc.SQLCommand; import org.xbib.keyvalue.KeyValueStreamListener; import java.io.IOException; @@ -156,6 +157,20 @@ public interface RiverSource { void beforeRows(ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException; + boolean nextRow(ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException; + + void afterRows(ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException; + + /** + * This routine is executed before the result set is evaluated + * @param command the SQL command that created this result set + * @param results the result set + * @param listener listener for the key/value stream generated from the result set + * @throws SQLException + * @throws IOException + */ + void beforeRows(SQLCommand command, ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException; + /** * Action for the next row of the result set to be processed * @@ -165,9 +180,18 @@ public interface RiverSource { * @throws SQLException when SQL execution gives an error * @throws IOException when input/output error occurs */ - boolean nextRow(ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException; + boolean nextRow(SQLCommand command, ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException; - void afterRows(ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException; + /** + * After the result set is processed, this method is called. + * + * @param command the SQL command that created this result set + * @param results the result set + * @param listener listener for the key/value stream generated from the result set + * @throws SQLException + * @throws IOException + */ + void afterRows(SQLCommand command, ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException; /** * Parse a value in a row column diff --git a/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/column/ColumnRiverSource.java b/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/column/ColumnRiverSource.java index 20f48d9f..f07c9de3 100644 --- a/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/column/ColumnRiverSource.java +++ b/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/column/ColumnRiverSource.java @@ -106,7 +106,7 @@ private void fetch(Connection connection, SQLCommand command, OpInfo opInfo, Tim KeyValueStreamListener listener = new ColumnKeyValueStreamListener(opInfo.opType) .output(context.getRiverMouth()); - merge(result, listener); + merge(command, result, listener); } catch (Exception e) { throw new IOException(e); } finally { diff --git a/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverSource.java b/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverSource.java index a8c169c3..17c557c9 100644 --- a/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverSource.java +++ b/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverSource.java @@ -322,7 +322,7 @@ private void execute(SQLCommand command) throws Exception { RiverMouthKeyValueStreamListener listener = new RiverMouthKeyValueStreamListener() .output(context.getRiverMouth()) .shouldIgnoreNull(context.shouldIgnoreNull()); - merge(results, listener); + merge(command, results, listener); } } else { // use write connection @@ -356,7 +356,7 @@ private void executeWithParameter(SQLCommand command) throws Exception { RiverMouthKeyValueStreamListener listener = new RiverMouthKeyValueStreamListener() .output(context.getRiverMouth()) .shouldIgnoreNull(context.shouldIgnoreNull()); - merge(results, listener); + merge(command, results, listener); } else { statement = prepareUpdate(command.getSQL()); bind(statement, command.getParameters()); @@ -395,12 +395,12 @@ private void executeCallable(SQLCommand command) throws Exception { logger.debug("callable execution created result set"); while (hasRows) { // merge result set, but use register - merge(statement.getResultSet(), listener); + merge(command, statement.getResultSet(), listener); hasRows = statement.getMoreResults(); } } else { // no result set, merge from registered params only - merge(statement, command, listener); + merge(command, statement, listener); } } finally { close(statement); @@ -415,14 +415,14 @@ private void executeCallable(SQLCommand command) throws Exception { * @throws SQLException when SQL execution gives an error * @throws IOException when input/output error occurs */ - public void merge(ResultSet results, KeyValueStreamListener listener) + public void merge(SQLCommand command, ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException, ParseException { if (listener == null) { return; } - beforeRows(results, listener); + beforeRows(command, results, listener); long rows = 0L; - while (nextRow(results, listener)) { + while (nextRow(command, results, listener)) { rows++; } context.setLastRowCount(rows); @@ -431,40 +431,10 @@ public void merge(ResultSet results, KeyValueStreamListener listener) } else { logger().debug("no rows merged "); } - afterRows(results, listener); + afterRows(command, results, listener); } - /** - * Merge key/values from registered params of a callable statement - * - * @param statement callable statement - * @param listener the value listener - * @throws SQLException when SQL execution gives an error - * @throws IOException when input/output error occurs - */ - @SuppressWarnings({"unchecked"}) - public void merge(CallableStatement statement, SQLCommand command, KeyValueStreamListener listener) - throws SQLException, IOException { - Map map = command.getRegister(); - if (map.isEmpty()) { - // no register given, return without doing anything - return; - } - List keys = newLinkedList(); - List values = newLinkedList(); - for (Map.Entry entry : map.entrySet()) { - String k = entry.getKey(); - Map v = (Map) entry.getValue(); - Integer pos = (Integer) v.get("pos"); // the parameter position of the value - String field = (String) v.get("field"); // the field for indexing the value (if not key name) - keys.add(field != null ? field : k); - values.add(statement.getObject(pos)); - } - logger.trace("merge callable statement result: keys={} values={}", keys, values); - listener.keys(keys); - listener.values(values); - listener.end(); - } + /** * Prepare a query statement @@ -525,6 +495,38 @@ public SimpleRiverSource bind(PreparedStatement statement, List values) return this; } + /** + * Merge key/values from registered params of a callable statement + * + * @param statement callable statement + * @param listener the value listener + * @throws SQLException when SQL execution gives an error + * @throws IOException when input/output error occurs + */ + @SuppressWarnings({"unchecked"}) + public void merge(SQLCommand command, CallableStatement statement, KeyValueStreamListener listener) + throws SQLException, IOException { + Map map = command.getRegister(); + if (map.isEmpty()) { + // no register given, return without doing anything + return; + } + List keys = newLinkedList(); + List values = newLinkedList(); + for (Map.Entry entry : map.entrySet()) { + String k = entry.getKey(); + Map v = (Map) entry.getValue(); + Integer pos = (Integer) v.get("pos"); // the parameter position of the value + String field = (String) v.get("field"); // the field for indexing the value (if not key name) + keys.add(field != null ? field : k); + values.add(statement.getObject(pos)); + } + logger.trace("merge callable statement result: keys={} values={}", keys, values); + listener.keys(keys); + listener.values(values); + listener.end(); + } + /** * Register variables in callable statement * @@ -545,7 +547,7 @@ public SimpleRiverSource register(CallableStatement statement, Map keys = new LinkedList(); - for (int i = 1; i <= columns; i++) { - keys.add(metadata.getColumnLabel(i)); + if (command != null && command.isCallable() && !command.getRegister().isEmpty()) { + for (Map.Entry me : command.getRegister().entrySet()) { + keys.add(me.getKey()); + } + } else { + ResultSetMetaData metadata = results.getMetaData(); + int columns = metadata.getColumnCount(); + for (int i = 1; i <= columns; i++) { + keys.add(metadata.getColumnLabel(i)); + } } listener.begin(); listener.keys(keys); } + public boolean nextRow(ResultSet results, KeyValueStreamListener listener) + throws SQLException, IOException { + return nextRow(null, results, listener); + } + /** * Get next row and prepare the values for processing. The labels of each * columns are used for the ValueListener as paths for JSON object merging. * + * @param command the SQL command that created this result set * @param results the result set * @param listener the listener * @return true if row exists and was processed, false otherwise @@ -650,32 +673,38 @@ public void beforeRows(ResultSet results, KeyValueStreamListener listener) * @throws IOException when input/output error occurs */ @Override - public boolean nextRow(ResultSet results, KeyValueStreamListener listener) + public boolean nextRow(SQLCommand command, ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException { if (results.next() && (context.getRiverFlow() == null || !context.getRiverFlow().getFeeder().isInterrupted())) { - processRow(results, listener); + processRow(command, results, listener); return true; } return false; } + public void afterRows(ResultSet results, KeyValueStreamListener listener) + throws SQLException, IOException { + afterRows(null, results, listener); + } + /** * After the rows keys and values, let the listener know about the end of * the result set. * + * @param command the SQL command that created this result set * @param results the result set * @param listener the key/value stream listener * @throws SQLException when SQL execution gives an error * @throws IOException when input/output error occurs */ @Override - public void afterRows(ResultSet results, KeyValueStreamListener listener) + public void afterRows(SQLCommand command, ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException { listener.end(); } @SuppressWarnings({"unchecked"}) - private void processRow(ResultSet results, KeyValueStreamListener listener) + private void processRow(SQLCommand command, ResultSet results, KeyValueStreamListener listener) throws SQLException, IOException { Locale locale = context != null ? context.getLocale() != null ? context.getLocale() : Locale.getDefault() : Locale.getDefault(); List values = new LinkedList(); diff --git a/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/mock/MockRiverSource.java b/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/mock/MockRiverSource.java index 7260c720..dbeb6b06 100644 --- a/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/mock/MockRiverSource.java +++ b/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/mock/MockRiverSource.java @@ -1,6 +1,7 @@ package org.xbib.elasticsearch.river.jdbc.strategy.mock; import org.xbib.elasticsearch.plugin.jdbc.RiverContext; +import org.xbib.elasticsearch.plugin.jdbc.SQLCommand; import org.xbib.elasticsearch.river.jdbc.RiverSource; import org.xbib.keyvalue.KeyValueStreamListener; @@ -107,16 +108,31 @@ public void beforeRows(ResultSet result, KeyValueStreamListener listener) throws throw new UnsupportedOperationException("Not supported yet."); } + @Override + public void beforeRows(SQLCommand command, ResultSet result, KeyValueStreamListener listener) throws SQLException, IOException { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override public boolean nextRow(ResultSet result, KeyValueStreamListener listener) throws SQLException, IOException { throw new UnsupportedOperationException("Not supported yet."); } + @Override + public boolean nextRow(SQLCommand command, ResultSet result, KeyValueStreamListener listener) throws SQLException, IOException { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override public void afterRows(ResultSet result, KeyValueStreamListener listener) throws SQLException, IOException { throw new UnsupportedOperationException("Not supported yet."); } + @Override + public void afterRows(SQLCommand command, ResultSet result, KeyValueStreamListener listener) throws SQLException, IOException { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override public Object parseType(ResultSet result, Integer num, int type, Locale locale) throws SQLException, IOException, ParseException { throw new UnsupportedOperationException("Not supported yet."); diff --git a/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/RiverStoredProcedureTests.java b/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/RiverStoredProcedureTests.java index 7edb0bd5..6abdc56b 100644 --- a/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/RiverStoredProcedureTests.java +++ b/src/test/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/RiverStoredProcedureTests.java @@ -27,18 +27,21 @@ public void testSimpleStoredProcedure(String riverResource) createRiver(riverResource); waitForInactiveRiver(); assertHits("1", 5); + logger.info("got the five hits"); } @Test @Parameters({"river9"}) - public void testRegisterStoredProcedure(String riverResource) - throws Exception { + public void testRegisterStoredProcedure(String riverResource) throws Exception { createRiver(riverResource); waitForInactiveRiver(); assertHits("1", 1); + logger.info("got the hit"); SearchResponse response = client("1").prepareSearch("my_jdbc_river_index") .setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(); - assertEquals("{supplierName=Acme, Inc.}", response.getHits().getHits()[0].getSource().toString()); + String resp = response.getHits().getHits()[0].getSource().toString(); + logger.info("resp={}", resp); + assertEquals("{mySupplierName=Acme, Inc.}", resp); } } diff --git a/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/create-suppliertables.sql b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/create-suppliertables.sql index 5c076b57..255aed0c 100644 --- a/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/create-suppliertables.sql +++ b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/create-suppliertables.sql @@ -1,6 +1,7 @@ drop table IF EXISTS SUPPLIERS drop table IF EXISTS COFFEES drop procedure IF EXISTS SHOW_SUPPLIERS +drop procedure IF EXISTS GET_SUPPLIER_OF_COFFEE create table SUPPLIERS (SUP_ID integer NOT NULL, SUP_NAME varchar(40) NOT NULL, STREET varchar(40) NOT NULL, CITY varchar(20) NOT NULL, STATE char(2) NOT NULL, ZIP char(5), PRIMARY KEY (SUP_ID)); create table COFFEES (COF_NAME varchar(32) NOT NULL, SUP_ID int NOT NULL, PRICE numeric(10,2) NOT NULL, SALES integer NOT NULL, TOTAL integer NOT NULL, PRIMARY KEY (COF_NAME), FOREIGN KEY (SUP_ID) REFERENCES SUPPLIERS (SUP_ID)); insert into SUPPLIERS values(49, 'Superior Coffee', '1 Party Place', 'Mendocino', 'CA', '95460') diff --git a/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/delete-suppliertables.sql b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/delete-suppliertables.sql index d2cb3ce4..ad268096 100644 --- a/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/delete-suppliertables.sql +++ b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/delete-suppliertables.sql @@ -1,3 +1,4 @@ drop table COFFEES drop table SUPPLIERS -drop procedure SHOW_SUPPLIERS \ No newline at end of file +drop procedure SHOW_SUPPLIERS +drop procedure GET_SUPPLIER_OF_COFFEE \ No newline at end of file diff --git a/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/river-9.json b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/river-9.json index 825e3e13..0670ac32 100644 --- a/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/river-9.json +++ b/src/test/resources/org/xbib/elasticsearch/river/jdbc/strategy/simple/mysql/river-9.json @@ -11,7 +11,7 @@ "Colombian" ], "register" : { - "supplierName" : { "pos" : 2, "type" : "varchar", "field" : "mySupplier" } + "mySupplierName" : { "pos" : 2, "type" : "varchar" } } } ],