Skip to content

Commit

Permalink
first step to support stored procedures
Browse files Browse the repository at this point in the history
Support for stored procedures added with tests for MySQL. The tests are taken from
http://docs.oracle.com/javase/tutorial/jdbc/basics/storedprocedures.html

The out parameters can not be renamed to Elasticsearch field names right now, this will be done as a followup.
  • Loading branch information
jprante committed Aug 5, 2014
1 parent e606421 commit 247a6f5
Show file tree
Hide file tree
Showing 16 changed files with 221 additions and 58 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<groupId>org.xbib.elasticsearch.plugin</groupId>
<artifactId>elasticsearch-river-jdbc</artifactId>
<version>1.3.0.3</version>
<version>1.3.0.4</version>

<packaging>jar</packaging>

Expand Down
26 changes: 18 additions & 8 deletions src/main/java/org/xbib/elasticsearch/plugin/jdbc/SQLCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

import static org.elasticsearch.common.collect.Lists.newLinkedList;
import static org.elasticsearch.common.collect.Maps.newHashMap;

/**
* The SQL command
*/
Expand All @@ -23,9 +25,9 @@ public class SQLCommand {

private static final Pattern STATEMENT_PATTERN = Pattern.compile("^\\s*(update|insert)", Pattern.CASE_INSENSITIVE);

private List<Object> params = new LinkedList<Object>();
private List<Object> params = newLinkedList();

private Map<String, Object> results = new HashMap<String, Object>();
private Map<String, Object> register = newHashMap();

private boolean callable;

Expand Down Expand Up @@ -80,12 +82,20 @@ public boolean isQuery() {
return p3 < 0 || p1 < p2 && p1 < p3;
}

public void setResults(Map<String, Object> results) {
this.results = results;
/**
* A register is for parameters of a callable statement.
* @param register a map for registering parameters
*/
public void setRegister(Map<String, Object> register) {
this.register = register;
}

public Map<String, Object> getResults() {
return results;
/**
* Get the parameters of a callable statement
* @return the register map
*/
public Map<String, Object> getRegister() {
return register;
}

@SuppressWarnings({"unchecked"})
Expand All @@ -110,7 +120,7 @@ public static List<SQLCommand> parse(Map<String, Object> settings) {
command.setCallable(XContentMapValues.nodeBooleanValue(m.get("callable")));
}
if (m.containsKey("register")) {
command.setResults(XContentMapValues.nodeMapValue(m.get("register"), null));
command.setRegister(XContentMapValues.nodeMapValue(m.get("register"), null));
}
} else if (entry instanceof String) {
command.setSQL((String) entry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import java.util.Map;
import java.util.TimeZone;

import static org.elasticsearch.common.collect.Lists.newLinkedList;

/**
* Simple river source.
* <p/>
Expand Down Expand Up @@ -383,21 +385,22 @@ private void executeCallable(SQLCommand command) throws Exception {
if (!command.getParameters().isEmpty()) {
bind(statement, command.getParameters());
}
if (!command.getResults().isEmpty()) {
register(statement, command.getResults());
if (!command.getRegister().isEmpty()) {
register(statement, command.getRegister());
}
boolean hasRows = statement.execute();
RiverMouthKeyValueStreamListener<Object, Object> listener = new RiverMouthKeyValueStreamListener<Object, Object>()
.output(context.getRiverMouth());
if (!hasRows) {
// merge from registered params
merge(statement, command, listener);
} else {
if (hasRows) {
logger.debug("callable execution created result set");
while (hasRows) {
// merge result set
// merge result set, but use register
merge(statement.getResultSet(), listener);
hasRows = statement.getMoreResults();
}
} else {
// no result set, merge from registered params only
merge(statement, command, listener);
}
} finally {
close(statement);
Expand Down Expand Up @@ -442,17 +445,22 @@ public void merge(ResultSet results, KeyValueStreamListener listener)
@SuppressWarnings({"unchecked"})
public void merge(CallableStatement statement, SQLCommand command, KeyValueStreamListener listener)
throws SQLException, IOException {
Map<String, Object> map = command.getResults();
Map<String, Object> map = command.getRegister();
if (map.isEmpty()) {
// no register given, return without doing anything
return;
}
List<String> keys = new LinkedList<String>();
List<Object> values = new LinkedList<Object>();
List<String> keys = newLinkedList();
List<Object> values = newLinkedList();
for (Map.Entry<String, Object> entry : map.entrySet()) {
keys.add(entry.getKey());
Map<String, Object> m = (Map<String, Object>) entry.getValue();
values.add(statement.getObject((Integer) m.get("pos")));
String k = entry.getKey();
Map<String, Object> v = (Map<String, Object>) entry.getValue();
Integer pos = (Integer) v.get("pos"); // the parameter position of the value
String field = (String) v.get("field"); // the field for indexing the value (if not key name)
keys.add(field != null ? field : k);
values.add(statement.getObject(pos));
}
logger.trace("merge callable statement result: keys={} values={}", keys, values);
listener.keys(keys);
listener.values(values);
listener.end();
Expand Down Expand Up @@ -532,11 +540,18 @@ public SimpleRiverSource register(CallableStatement statement, Map<String, Objec
return this;
}
for (Map.Entry<String, Object> me : values.entrySet()) {
// { "fieldname" : { "pos": n, "type" : "VARCHAR" }, ... }
// { "key" : { "pos": n, "type" : "VARCHAR", "field" : "fieldname" }, ... }
Map<String, Object> m = (Map<String, Object>) me.getValue();
Integer n = (Integer) m.get("pos");
String type = (String) m.get("type");
register(statement, n, type);
if (n != null && type != null) {
logger.info("n={} type={}", n, toJDBCType(type));
try {
statement.registerOutParameter(n, toJDBCType(type));
} catch (Throwable t) {
logger.warn("can't register out parameter " + n + " of type " + type);
}
}
}
return this;
}
Expand Down Expand Up @@ -938,10 +953,6 @@ private void bind(PreparedStatement statement, int i, Object value) throws SQLEx
}
}

private void register(CallableStatement statement, Integer pos, String type) throws SQLException {
statement.registerOutParameter(pos, toJDBCType(type));
}

/**
* Parse of value of result set
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package org.xbib.elasticsearch.river.jdbc.strategy.simple;

import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilders;
import org.testng.annotations.Parameters;
import org.testng.annotations.Test;
import org.xbib.elasticsearch.plugin.jdbc.RiverContext;
import org.xbib.elasticsearch.river.jdbc.RiverSource;
import org.xbib.elasticsearch.support.helper.AbstractRiverNodeTest;

import java.sql.Connection;
import java.sql.Statement;

public class RiverStoredProcedureTests extends AbstractRiverNodeTest {

@Override
Expand All @@ -22,18 +21,24 @@ public RiverContext getRiverContext() {
}

@Test
@Parameters({"river5", "sql1", "sql2"})
public void testSimpleStoredProcedure(String riverResource, String sql, String storedProcSQL)
@Parameters({"river8"})
public void testSimpleStoredProcedure(String riverResource)
throws Exception {
createRiver(riverResource);
waitForInactiveRiver();
assertHits("1", 5);
}

@Test
@Parameters({"river9"})
public void testRegisterStoredProcedure(String riverResource)
throws Exception {
createRandomProducts(sql, 100);
// create stored procedure
Connection connection = source.getConnectionForWriting();
Statement statement = connection.createStatement();
statement.execute(storedProcSQL);
statement.close();
source.closeWriting();
createRiver(riverResource);
waitForInactiveRiver();
assertHits("1", 1);
SearchResponse response = client("1").prepareSearch("my_jdbc_river_index")
.setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
assertEquals("{supplierName=Acme, Inc.}", response.getHits().getHits()[0].getSource().toString());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.xbib.elasticsearch.river.jdbc.strategy.simple.storedprocedure;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class StoredProcedureJavaDBSample {

public static void showSuppliers(ResultSet[] rs)
throws SQLException {

Connection con = DriverManager.getConnection("jdbc:default:connection");
Statement stmt = null;

String query =
"select SUPPLIERS.SUP_NAME, " +
"COFFEES.COF_NAME " +
"from SUPPLIERS, COFFEES " +
"where SUPPLIERS.SUP_ID = " +
"COFFEES.SUP_ID " +
"order by SUP_NAME";

stmt = con.createStatement();
rs[0] = stmt.executeQuery(query);
}

public static void getSupplierOfCoffee(String coffeeName, String[] supplierName)
throws SQLException {

Connection con = DriverManager.getConnection("jdbc:default:connection");
PreparedStatement pstmt = null;
ResultSet rs = null;

String query =
"select SUPPLIERS.SUP_NAME " +
"from SUPPLIERS, COFFEES " +
"where " +
"SUPPLIERS.SUP_ID = COFFEES.SUP_ID " +
"and ? = COFFEES.COF_NAME";

pstmt = con.prepareStatement(query);
pstmt.setString(1, coffeeName);
rs = pstmt.executeQuery();

if (rs.next()) {
supplierName[0] = rs.getString(1);
} else {
supplierName[0] = null;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ public void afterMethod(String stopurl, String user, String password, @Optional
protected void createRiver(String resource) {
try {
waitForYellow("1");
Map<String, Object> map = XContentHelper.convertToMap(Streams.copyToByteArray(getClass().getResourceAsStream(resource)), false).v2();
byte[] b = Streams.copyToByteArray(getClass().getResourceAsStream(resource));
Map<String, Object> map = XContentHelper.convertToMap(b, false).v2();
XContentBuilder builder = jsonBuilder()
.startObject()
.field("type", "jdbc")
Expand Down
2 changes: 1 addition & 1 deletion src/test/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<AppenderRef ref="Console" />
</Root>
<!--
<Logger name="NodeClient" level="OFF">
<Logger name="BulkNodeClient" level="INFO">
<AppenderRef ref="Console" />
</Logger>
-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"password" : "",
"sql" : [
{
"statement : "{call count_products(?)}",
"statement" : "{call count_products(?)}",
"parameter" : [ 1 ],
"callable" : true
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
drop table IF EXISTS SUPPLIERS
drop table IF EXISTS COFFEES
drop procedure IF EXISTS SHOW_SUPPLIERS
create table SUPPLIERS (SUP_ID integer NOT NULL, SUP_NAME varchar(40) NOT NULL, STREET varchar(40) NOT NULL, CITY varchar(20) NOT NULL, STATE char(2) NOT NULL, ZIP char(5), PRIMARY KEY (SUP_ID));
create table COFFEES (COF_NAME varchar(32) NOT NULL, SUP_ID int NOT NULL, PRICE numeric(10,2) NOT NULL, SALES integer NOT NULL, TOTAL integer NOT NULL, PRIMARY KEY (COF_NAME), FOREIGN KEY (SUP_ID) REFERENCES SUPPLIERS (SUP_ID));
insert into SUPPLIERS values(49, 'Superior Coffee', '1 Party Place', 'Mendocino', 'CA', '95460')
insert into SUPPLIERS values(101, 'Acme, Inc.', '99 Market Street', 'Groundsville', 'CA', '95199')
insert into SUPPLIERS values(150, 'The High Ground', '100 Coffee Lane', 'Meadows', 'CA', '93966')
insert into COFFEES values('Colombian', 00101, 7.99, 0, 0)
insert into COFFEES values('French_Roast', 00049, 8.99, 0, 0)
insert into COFFEES values('Espresso', 00150, 9.99, 0, 0)
insert into COFFEES values('Colombian_Decaf', 00101, 8.99, 0, 0)
insert into COFFEES values('French_Roast_Decaf', 00049, 9.99, 0, 0)
create procedure SHOW_SUPPLIERS() begin select SUPPLIERS.SUP_NAME, COFFEES.COF_NAME from SUPPLIERS, COFFEES where SUPPLIERS.SUP_ID = COFFEES.SUP_ID order by SUP_NAME; end
create procedure GET_SUPPLIER_OF_COFFEE(IN coffeeName varchar(32), OUT supplierName varchar(40)) begin select SUPPLIERS.SUP_NAME into supplierName from SUPPLIERS, COFFEES where SUPPLIERS.SUP_ID = COFFEES.SUP_ID and coffeeName = COFFEES.COF_NAME; select supplierName; end
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
drop table COFFEES
drop table SUPPLIERS
drop procedure SHOW_SUPPLIERS
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"jdbc" : {
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from products",
"schedule" : "0/5 0-59 0-23 ? * *",
"bulk_flush_interval" : "1s",
"index" : "my_jdbc_river_index",
"type" : "my_jdbc_river_type"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"jdbc" : {
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",

"sql" : [
{
"statement" : "select message from logs where {fn timestampdiff(SQL_TSI_HOUR, modified ,?)} > 0",
"parameter" : [ "$now" ]
}
],
"schedule" : "0/5 0-59 0-23 ? * *",
"index" : "my_jdbc_river_index",
"type" : "my_jdbc_river_type",
"timezone" : "Asia/Jerusalem",
"locale" : "iw_IL"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"jdbc" : {
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : [
{
"callable" : true,
"statement" : "{call SHOW_SUPPLIERS()}"
}
],
"index" : "my_jdbc_river_index",
"type" : "my_jdbc_river_type"
}
}
Loading

0 comments on commit 247a6f5

Please sign in to comment.