From cbac8531af66e620c5e655032ac3f833211e4b47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Fri, 17 Oct 2014 23:22:15 +0200 Subject: [PATCH 1/4] 1.4.0.2.Beta1 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 97172f70..86593958 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ org.xbib.elasticsearch.plugin elasticsearch-river-jdbc - 1.4.0.1.Beta1 + 1.4.0.2.Beta1 jar From 62a31e478e45774fbaa071597d270cd30e02b209 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Sat, 18 Oct 2014 17:08:27 +0200 Subject: [PATCH 2/4] improve feeder mode, sensible MySQL defaults restored, dropping unnecessary files from assembly --- bin/feeder/mysql/create.sh | 33 +++++-------- bin/feeder/mysql/log4j.properties | 8 +++ pom.xml | 2 +- src/main/assemblies/plugin.xml | 25 ---------- .../jdbc/client/BaseTransportClient.java | 6 +-- .../client/transport/BulkTransportClient.java | 14 ++---- .../jdbc/feeder/CommandLineInterpreter.java | 36 -------------- .../plugin/jdbc/feeder/JDBCFeeder.java | 22 +++++---- .../plugin/jdbc/feeder/Runner.java | 49 ++++++++++--------- .../plugin/jdbc/river/JDBCRiver.java | 2 +- .../jdbc/strategy/simple/SimpleRiverFlow.java | 12 ++++- 11 files changed, 80 insertions(+), 129 deletions(-) create mode 100644 bin/feeder/mysql/log4j.properties delete mode 100644 src/main/java/org/xbib/elasticsearch/plugin/jdbc/feeder/CommandLineInterpreter.java diff --git a/bin/feeder/mysql/create.sh b/bin/feeder/mysql/create.sh index 6e5f62b4..570b6395 100755 --- a/bin/feeder/mysql/create.sh +++ b/bin/feeder/mysql/create.sh @@ -1,10 +1,10 @@ #!/bin/sh -# This example shows two concurrent feeds from a MySQL database (conncurreny = 2) -# It is possible to connect to many databases in parallel and fetch data for Elasticsearch. DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" -. ${DIR}/../../feeder.in.sh + +# ES_HOME reguired to detect elasticsearch jars +export ES_HOME=~es/elasticsearch-1.4.0.Beta1 echo ' { @@ -13,26 +13,19 @@ echo ' "host" : "localhost", "port" : 9300 }, + "max_bulk_actions" : 20000, "type" : "jdbc", "jdbc" : { - "url" : "jdbc:mysql://localhost:3306/test", - "user" : "", - "password" : "", - "sql" : [ - { - "statement" : "select *, created as _id, \"myjdbc\" as _index, \"mytype\" as _type from orders" - } - ], - "index" : "myjdbc", - "type" : "mytype", - "index_settings" : { - "index" : { - "number_of_shards" : 1 - } - } + "url" : "jdbc:mysql://localhost:3306/test", + "user" : "", + "password" : "", + "sql" : "select *, page_id as _id from page", + "fetchsize" : "min", + "treat_binary_as_string" : true, + "index" : "metawiki" } } -' | ${JAVA_HOME}/bin/java \ - -cp ${ES_JDBC_CLASSPATH} \ +' | java \ + -cp "${DIR}/*" \ org.xbib.elasticsearch.plugin.jdbc.feeder.Runner \ org.xbib.elasticsearch.plugin.jdbc.feeder.JDBCFeeder diff --git a/bin/feeder/mysql/log4j.properties b/bin/feeder/mysql/log4j.properties new file mode 100644 index 00000000..3332a608 --- /dev/null +++ b/bin/feeder/mysql/log4j.properties @@ -0,0 +1,8 @@ +# for feeder + +log4j.rootLogger=DEBUG, out + +log4j.appender.out=org.apache.log4j.ConsoleAppender +log4j.appender.out.layout=org.apache.log4j.PatternLayout +log4j.appender.out.layout.ConversionPattern=[%d{ABSOLUTE}][%-5p][%-25c][%t] %m%n + diff --git a/pom.xml b/pom.xml index 97172f70..86593958 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ org.xbib.elasticsearch.plugin elasticsearch-river-jdbc - 1.4.0.1.Beta1 + 1.4.0.2.Beta1 jar diff --git a/src/main/assemblies/plugin.xml b/src/main/assemblies/plugin.xml index 4337475d..cbdacd18 100644 --- a/src/main/assemblies/plugin.xml +++ b/src/main/assemblies/plugin.xml @@ -16,29 +16,4 @@ - - - ${project.build.directory}/releases - / - - *-uberjar.jar - - - - ${project.basedir} - / - - bin/*.sh - bin/**/*.sh - - - - ${project.basedir}/src/test/resources - / - - log4j.properties - log4j2.xml - - - diff --git a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/client/BaseTransportClient.java b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/client/BaseTransportClient.java index 8b1a78c5..6e9bad01 100644 --- a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/client/BaseTransportClient.java +++ b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/client/BaseTransportClient.java @@ -204,11 +204,7 @@ protected void connect(Settings settings) throws IOException { break; } } - // 10 seconds is used because it is longer than 5 seconds - long timeout = settings.getAsTime("timeout", settings.getAsTime("client.transport.ping_timeout", - TimeValue.timeValueSeconds(10))).millis(); - logger.info("configured addresses to connect = {}, waiting for {} to connect ...", addresses, - TimeValue.timeValueMillis(timeout).format()); + logger.info("configured addresses to connect = {} ...", addresses); if (client.connectedNodes() != null) { List nodes = client.connectedNodes().asList(); logger.info("connected nodes = {}", nodes); diff --git a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/client/transport/BulkTransportClient.java b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/client/transport/BulkTransportClient.java index 388ff8f6..26277e65 100644 --- a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/client/transport/BulkTransportClient.java +++ b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/client/transport/BulkTransportClient.java @@ -40,7 +40,6 @@ import java.io.IOException; import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; /** * Client using the BulkProcessor of Elasticsearch @@ -63,11 +62,6 @@ public class BulkTransportClient extends BaseIngestTransportClient implements In private TimeValue flushInterval = TimeValue.timeValueSeconds(30); - /** - * The concurrent requests - */ - private final AtomicLong concurrentRequestCounter = new AtomicLong(0L); - /** * The BulkProcessor */ @@ -129,7 +123,8 @@ public BulkTransportClient newClient(Settings settings) { BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { - long l = concurrentRequestCounter.getAndIncrement(); + metric.getCurrentIngest().inc(); + long l = metric.getCurrentIngest().count(); if (metric != null) { int n = request.numberOfActions(); metric.getSubmitted().inc(n); @@ -145,7 +140,8 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - long l = concurrentRequestCounter.decrementAndGet(); + metric.getCurrentIngest().dec(); + long l = metric.getCurrentIngest().count(); if (metric != null) { metric.getSucceeded().inc(response.getItems().length); metric.getTotalIngest().inc(response.getTookInMillis()); @@ -174,7 +170,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon @Override public void afterBulk(long executionId, BulkRequest requst, Throwable failure) { - concurrentRequestCounter.decrementAndGet(); + metric.getCurrentIngest().dec(); throwable = failure; closed = true; logger.error("bulk [" + executionId + "] error", failure); diff --git a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/feeder/CommandLineInterpreter.java b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/feeder/CommandLineInterpreter.java deleted file mode 100644 index 9cc87ff5..00000000 --- a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/feeder/CommandLineInterpreter.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (C) 2014 Jörg Prante - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.xbib.elasticsearch.plugin.jdbc.feeder; - -import java.io.PrintStream; -import java.io.Reader; -import java.io.Writer; - -public interface CommandLineInterpreter { - - CommandLineInterpreter readFrom(Reader reader); - - CommandLineInterpreter writeTo(Writer writer); - - CommandLineInterpreter errorsTo(PrintStream printer); - - CommandLineInterpreter start() throws Exception; - - void shutdown() throws Exception; - - Thread shutdownHook(); - -} diff --git a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/feeder/JDBCFeeder.java b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/feeder/JDBCFeeder.java index 316aef6b..a69a1ecd 100644 --- a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/feeder/JDBCFeeder.java +++ b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/feeder/JDBCFeeder.java @@ -41,6 +41,8 @@ import java.io.File; import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; import java.io.PrintStream; import java.io.Reader; import java.io.Writer; @@ -62,9 +64,9 @@ /** * Standalone feeder for JDBC */ -public class JDBCFeeder implements CommandLineInterpreter { +public class JDBCFeeder { - private final static ESLogger logger = ESLoggerFactory.getLogger(JDBCFeeder.class.getSimpleName()); + private final static ESLogger logger = ESLoggerFactory.getLogger("JDBCFeeder"); /** * Register metadata factory in Elasticsearch for being able to decode @@ -101,10 +103,17 @@ public class JDBCFeeder implements CommandLineInterpreter { * Constructor for running this from command line */ public JDBCFeeder() { + Runtime.getRuntime().addShutdownHook(shutdownHook()); + } + + public void exec() throws Exception { + readFrom(new InputStreamReader(System.in, "UTF-8")) + .writeTo(new OutputStreamWriter(System.out, "UTF-8")) + .errorsTo(System.err) + .start(); } @SuppressWarnings("unchecked") - @Override public JDBCFeeder readFrom(Reader reader) { this.reader = reader; try { @@ -145,19 +154,16 @@ protected RiverFlow createRiverFlow(Map spec, Settings settings) return riverFlow; } - @Override public JDBCFeeder writeTo(Writer writer) { this.writer = writer; return this; } - @Override public JDBCFeeder errorsTo(PrintStream printStream) { this.printStream = printStream; return this; } - @Override public JDBCFeeder start() throws Exception { this.closed = false; if (ingest.getConnectedNodes().isEmpty()) { @@ -206,7 +212,6 @@ private List> schedule(Thread thread) { * * @return shutdown thread */ - @Override public Thread shutdownHook() { return new Thread() { public void run() { @@ -219,7 +224,6 @@ public void run() { }; } - @Override public synchronized void shutdown() throws Exception { if (closed) { return; @@ -244,7 +248,7 @@ private IngestFactory createIngestFactory(final Settings settings) { return new IngestFactory() { @Override public Ingest create() { - Integer maxbulkactions = settings.getAsInt("max_bulk_actions", 100); + Integer maxbulkactions = settings.getAsInt("max_bulk_actions", 1000); Integer maxconcurrentbulkrequests = settings.getAsInt("max_concurrent_bulk_requests", Runtime.getRuntime().availableProcessors() * 2); ByteSizeValue maxvolume = settings.getAsBytesSize("max_bulk_volume", ByteSizeValue.parseBytesSizeValue("10m")); diff --git a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/feeder/Runner.java b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/feeder/Runner.java index 815c1795..51ce8a8f 100644 --- a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/feeder/Runner.java +++ b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/feeder/Runner.java @@ -18,8 +18,7 @@ import org.xbib.elasticsearch.plugin.jdbc.classloader.uri.URIClassLoader; import java.io.File; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; +import java.lang.reflect.Method; import java.util.Arrays; /** @@ -36,38 +35,44 @@ public static void main(String[] args) { System.err.println("Warning: ES_HOME not set, using current directory"); homeFile = System.getProperty("user.dir"); } - // add drivers and Elasticsearch libs - Thread.currentThread().setContextClassLoader(getClassLoader(Thread.currentThread().getContextClassLoader(), - new File(homeFile))); - // here we load the "concrete" class we want to execute. Add shutdown hook and pass stdin/stdio/stderr. - Class clazz = Class.forName(args[0]); - CommandLineInterpreter commandLineInterpreter = (CommandLineInterpreter) clazz.newInstance(); - Runtime.getRuntime().addShutdownHook(commandLineInterpreter.shutdownHook()); - commandLineInterpreter.readFrom(new InputStreamReader(System.in, "UTF-8")) - .writeTo(new OutputStreamWriter(System.out, "UTF-8")) - .errorsTo(System.err) - .start(); + // add drivers and Elasticsearch libs from ES_HOME + ClassLoader cl = getClassLoader(new File(homeFile)); + Thread.currentThread().setContextClassLoader(cl); + // here we load the "concrete" class we want to execute + Class clazz = cl.loadClass(args[0]); + // we can not cast classes, we have different classloaders. Just invoke "exec" method + Method execMethod = clazz.getMethod("exec"); + execMethod.invoke(clazz.newInstance()); } catch (Throwable e) { - // ensure fatal errors are printed to stderr + // ensure all errors are printed to stderr e.printStackTrace(); System.exit(1); } System.exit(0); } - private static ClassLoader getClassLoader(ClassLoader parent, File home) { - URIClassLoader classLoader = new URIClassLoader(parent); - // add driver jars from current directory - File[] drivers = new File(System.getProperty("user.dir")).listFiles(); - if (drivers != null) { - for (File file : drivers) { + private static ClassLoader getClassLoader(File home) { + URIClassLoader classLoader = new URIClassLoader(); + String[] jars = System.getProperty("java.class.path").split(File.pathSeparator); + for (String jar : jars) { + File file = new File(jar); + // add parent dir if jar + if (file.getName().toLowerCase().endsWith(".jar")) { + classLoader.addURI(file.getParentFile().toURI()); + } + classLoader.addURI(file.toURI()); + } + // add Elasticsearch jars + File[] libs = new File(home + "/lib").listFiles(); + if (libs != null) { + for (File file : libs) { if (file.getName().toLowerCase().endsWith(".jar")) { classLoader.addURI(file.toURI()); } } } - // add Elasticsearch jars - File[] libs = new File(home + "/lib").listFiles(); + // add JDBC plugin jars (if exist) + libs = new File(home + "/plugins/jdbc").listFiles(); if (libs != null) { for (File file : libs) { if (file.getName().toLowerCase().endsWith(".jar")) { diff --git a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/river/JDBCRiver.java b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/river/JDBCRiver.java index f7bd19bd..a6bb3539 100644 --- a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/river/JDBCRiver.java +++ b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/river/JDBCRiver.java @@ -222,7 +222,7 @@ private IngestFactory createIngestFactory(final Settings settings) { return new IngestFactory() { @Override public Ingest create() { - Integer maxbulkactions = settings.getAsInt("max_bulk_actions", 100); + Integer maxbulkactions = settings.getAsInt("max_bulk_actions", 1000); Integer maxconcurrentbulkrequests = settings.getAsInt("max_concurrent_bulk_requests", Runtime.getRuntime().availableProcessors() * 2); ByteSizeValue maxvolume = settings.getAsBytesSize("max_bulk_volume", ByteSizeValue.parseBytesSizeValue("10m")); diff --git a/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverFlow.java b/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverFlow.java index 4524c045..fced0a69 100644 --- a/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverFlow.java +++ b/src/main/java/org/xbib/elasticsearch/river/jdbc/strategy/simple/SimpleRiverFlow.java @@ -315,7 +315,17 @@ protected RC fillRiverContext(RC riverContext, RiverState state, if ("min".equals(fetchSizeStr)) { fetchsize = Integer.MIN_VALUE; // for MySQL streaming mode } else if (fetchSizeStr != null) { - fetchsize = Integer.parseInt(fetchSizeStr); + try { + fetchsize = Integer.parseInt(fetchSizeStr); + } catch (Exception e) { + // ignore unparseable + } + } else { + // if MySQL, enable streaming mode hack by default + String url = XContentMapValues.nodeStringValue(params.get("url"), null); + if (url != null && url.startsWith("jdbc:mysql")) { + fetchsize = Integer.MIN_VALUE; // for MySQL streaming mode + } } int maxrows = XContentMapValues.nodeIntegerValue(params.get("max_rows"), 0); int maxretries = XContentMapValues.nodeIntegerValue(params.get("max_retries"), 3); From 946131ad77adc33e32c59da75abb8999296636ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Sun, 19 Oct 2014 01:42:45 +0200 Subject: [PATCH 3/4] more docs, more sensible default for max bulk actions --- README.md | 139 +++++++----------- bin/feeder/mysql/simpleexample.bat | 39 +++++ .../mysql/{create.sh => simpleexample.sh} | 2 - .../plugin/jdbc/feeder/JDBCFeeder.java | 2 +- .../plugin/jdbc/river/JDBCRiver.java | 2 +- 5 files changed, 98 insertions(+), 86 deletions(-) create mode 100644 bin/feeder/mysql/simpleexample.bat rename bin/feeder/mysql/{create.sh => simpleexample.sh} (92%) diff --git a/README.md b/README.md index 817ed105..b632747e 100644 --- a/README.md +++ b/README.md @@ -24,19 +24,17 @@ Creating a JDBC river is easy: Assuming you have a table of name `orders`, you can issue this simple command from the command line curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{ - "max_bulk_actions" : 10000, "type" : "jdbc", "jdbc" : { "url" : "jdbc:mysql://localhost:3306/test", "user" : "", "password" : "", - "fetchsize" : "min", "sql" : "select * from orders" } }' -Note: the `max_bulk_actions` is set by default to 100 and have to be enlarged for most use cases, and +Note: the `max_bulk_actions` parameter is set by default to 10000 and have to be enlarged for most use cases, and MySQL streaming mode is activated only by setting the row fetch size to Integer.MIN_VALUE, which can be achieved by using the string `"min"` for the parameter `fetchsize`. @@ -171,13 +169,11 @@ Internet access (of course) ``` curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{ - "max_bulk_actions" : 10000, "type" : "jdbc", "jdbc" : { "url" : "jdbc:mysql://localhost:3306/test", "user" : "", "password" : "", - "fetchsize" : "min", "sql" : "select * from orders" } }' @@ -215,14 +211,12 @@ The general schema of a JDBC river instance declaration is Example: curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{ - "max_bulk_actions" : 1000, "type" : "jdbc", "jdbc" : { "url" : "jdbc:mysql://localhost:3306/test", "user" : "", "password" : "", "sql" : "select * from orders", - "fetchsize" : "min", "index" : "myindex", "type" : "mytype", ... @@ -267,9 +261,9 @@ Quartz cron expression format (see below). `interval` - a time value for the delay between two river runs (default: not set) -`max_bulk_actions` - the length of each bulk index request submitted +`max_bulk_actions` - the length of each bulk index request submitted (default: 10000) -`max_concurrrent_bulk_requests` - the maximum number of concurrent bulk requests +`max_concurrrent_bulk_requests` - the maximum number of concurrent bulk requests (default: 2 * number of CPU cores) `max_bulk_volume` - a byte size parameter for the maximum volume allowed for a bulk request (default: "10m") @@ -372,7 +366,7 @@ Quartz cron expression format (see below). "schedule" : null, "interval" : 0L, "threadpoolsize" : 4, - "max_bulk_actions" : 100, + "max_bulk_actions" : 10000, "max_concurrent_bulk_requests" : 2 * available CPU cores, "max_bulk_volume" : "10m", "max_request_wait" : "60s", @@ -387,7 +381,7 @@ Quartz cron expression format (see below). "rounding" : null, "scale" : 2, "autocommit" : false, - "fetchsize" : 10, + "fetchsize" : 10, /* MySQL: Integer.MIN */ "max_rows" : 0, "max_retries" : 3, "max_retries_wait" : "30s", @@ -502,24 +496,21 @@ It is very important to note that overuse of overflowing ranges creates ranges t and no effort has been made to determine which interpretation CronExpression chooses. An example would be "0 0 14-6 ? * FRI-MON". -## How to start a JDBC feeder - -In the `bin/feeder` directory, you find some feeder examples. - -A feed can be started from the `$ES_HOME/plugins/jdbc` folder. If not already present, you should -create a `bin` folder so it is easy to maintain feeder script side by side with the river. +## How to run a standalone JDBC feeder -The feeder script must include the Elasticsearch core libraries into the classpath. Note the `-cp` -parameter. +A feeder can be started from a shell script. For this , the Elasticsearch home directory must be set in +the environment variable ES_HOME. The JDBC plugin jar must be placed in the same directory of the script, +together with JDBC river jar(s). -Here is an example of a feeder bash script in `$ES_HOME/plugins/jdbc/bin/feeder/oracle.create.sh` +Here is an example of a feeder bash script: #!/bin/sh - java="/usr/bin/java" - #java="/Library/Java/JavaVirtualMachines/jdk1.8.0.jdk/Contents/Home/bin/java" - #java="/usr/java/jdk1.8.0/bin/java" - + DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + + # ES_HOME required to detect elasticsearch jars + export ES_HOME=~es/elasticsearch-1.4.0.Beta1 + echo ' { "elasticsearch" : { @@ -527,39 +518,39 @@ Here is an example of a feeder bash script in `$ES_HOME/plugins/jdbc/bin/feeder/ "host" : "localhost", "port" : 9300 }, - "concurrency" : 1, "type" : "jdbc", "jdbc" : { - "url" : "jdbc:oracle:thin:@//host:1521/sid", - "user" : "user", - "password" : "password", - "sql" : "select or_id as \"_id\", or_tan as \"tan\" from orders", - "index" : "myoracle", - "type" : "myoracle", - "index_settings" : { - "index" : { - "number_of_shards" : 1, - "number_of_replica" : 0 - } - } - } + "url" : "jdbc:mysql://localhost:3306/test", + "user" : "", + "password" : "", + "sql" : "select *, page_id as _id from page", + "treat_binary_as_string" : true, + "index" : "metawiki" + } } - ' | ${java} \ - -cp $(pwd):$(pwd)/\*:$(pwd)/../../lib/\* \ + ' | java \ + -cp "${DIR}/*" \ org.xbib.elasticsearch.plugin.jdbc.feeder.Runner \ org.xbib.elasticsearch.plugin.jdbc.feeder.JDBCFeeder -The `jdbc` parameter structure is exactly the same as in a river. +How does it work? + +- first the shell script finds out about the directory where the script is placed, and it is placed into a variable `DIR` -The feeder is invoked by `JDBCFeeder` class and understands some more parameters. In this example, -the default parameters are shown. +- second, the location of the Elasticsearch home is exported in a shell variable `ES_HOME` -`elasticsearch` - an structure describing cluster, host, and port of a host of an Elasticsearch cluster. +- the classpath must be set to `DIR/*` to detect the JDBC plugin jar in the same directory of the script -`concurrency` - how many `jdbc` jobs should be executed in parallel +- the "Runner" class is able to expand the classpath over the Elasticsearch jars in `ES_HOME/lib` and looks also in `ES_HOME/plugins/jdbc` -In the example, you can also see that you can change your favorite `java` executable when -executing a feed. You must use a Java JDK >= 1.7 +- the "Runner" class invokes the "JDBCFeeder", which reads a JSON file from stdin, which corresponds to a JDBC river definition + +- the `elasticsearch` structure specifies the cluster, host, and port of a connection to an Elasticsearch cluster + +The `jdbc` parameter structure in the definition is exactly the same as in a river. + +It is possible to write an equivalent of this bash script for Windows. +If you can send one to me for documentation on this page, I'd be very grateful. ## Structured objects @@ -812,56 +803,40 @@ in callable statement result sets. If there is more than one result sets returned by a callable statement, the JDBC plugin enters a loop and iterates through all result sets. -# Monitoring the JDBC river state +# Monitoring the JDBC plugin state While a river/feed is running, you can monitor the activity by using the `_state` command. -When running very large data fetches, it might be of interest to find out if the fetch is complete or still running. +The `_state` command can show the state of a specific river or of all rivers, +when an asterisk `*` is used as the river name. -The `_state` command can show the state of a specific river or of all rivers, when an asterisk `*` is used as the river name. +The river state mechanism is specific to JDBC plugin implementation. It is part of the cluster metadata. -In the result, you can evaluate the field `active`. If set to `true`, the river is actively fetching data from the database. +In the response, the field `started` will represent the time when the river/feeder was created. +The field `last_active_begin` will represent the last time when a river/feeder run had begun, and +the field `last_active_end` is null if th river/feeder runs, or will represent the last time the river/feeder +has completed a run. -In the field `timestamp`, the latest state modification of the river is recorded. +The `map` carries some flags for the river: `aborted`, `suspended`, and a `counter` for the number of +invocations on this node. Example: curl 'localhost:9200/_river/jdbc/*/_state?pretty' { "state" : [ { - "name" : "my_oracle_river", + "name" : "feeder", "type" : "jdbc", - "enabled" : true, - "started" : "2014-05-10T20:29:04.260Z", - "timestamp" : "2014-05-10T20:52:15.866Z", - "counter" : 3, - "active" : true, - "custom" : { - "rivername" : "feeder", - "settings" : { - "index" : "myoracle", - "sql" : [ "select or_id as \"_id\", or_tan as \"tan\" from orders" ], - "maxbulkactions" : 10, - "type" : "myoracle", - "password" : "...", - "user" : "...", - "url" : "jdbc:oracle:thin:@//localhost:1521/sid" - }, - "locale" : "de_", - "job" : null, - "sql" : [ "statement=select or_id as \"_id\", or_tan as \"tan\" from orders parameter=[] callable=false" ], - "autocommit" : false, - "fetchsize" : 10, - "maxrows" : 0, - "retries" : 3, - "maxretrywait" : "30s", - "resultsetconcurrency" : "CONCUR_UPDATABLE", - "resultsettype" : "TYPE_FORWARD_ONLY", - "rounding" : 0, - "scale" : 2 + "started" : "2014-10-18T13:38:14.436Z", + "last_active_begin" : "2014-10-18T17:46:47.548Z", + "last_active_end" : "2014-10-18T13:42:57.678Z", + "map" : { + "aborted" : false, + "suspended" : false, + "counter" : 6 } } ] - } + } # Advanced topics diff --git a/bin/feeder/mysql/simpleexample.bat b/bin/feeder/mysql/simpleexample.bat new file mode 100644 index 00000000..1c9285be --- /dev/null +++ b/bin/feeder/mysql/simpleexample.bat @@ -0,0 +1,39 @@ +@echo off + +SETLOCAL + +if NOT DEFINED ES_HOME goto err + +set DIR=%~dp0 + +set FEEDER_CLASSPATH=%DIR%/* + +REM ??? +echo {^ + "elasticsearch" : {^ + "cluster" : "elasticsearch",^ + "host" : "localhost",^ + "port" : 9300^ + },^ + "type" : "jdbc",^ + "jdbc" : {^ + "url" : "jdbc:mysql://localhost:3306/test",^ + "user" : "",^ + "password" : "",^ + "sql" : "select *, page_id as _id from page",^ + "treat_binary_as_string" : true,^ + "index" : "metawiki"^ + }^ +} + +"%JAVA_HOME%\bin\java" -cp "%FEEDER_CLASSPATH%" "org.xbib.elasticsearch.plugin.jdbc.feeder.Runner" "org.xbib.elasticsearch.plugin.jdbc.feeder.JDBCFeeder" +goto finally + +:err +echo JAVA_HOME and ES_HOME environment variable must be set! +pause + + +:finally + +ENDLOCAL \ No newline at end of file diff --git a/bin/feeder/mysql/create.sh b/bin/feeder/mysql/simpleexample.sh similarity index 92% rename from bin/feeder/mysql/create.sh rename to bin/feeder/mysql/simpleexample.sh index 570b6395..ac63317c 100755 --- a/bin/feeder/mysql/create.sh +++ b/bin/feeder/mysql/simpleexample.sh @@ -13,14 +13,12 @@ echo ' "host" : "localhost", "port" : 9300 }, - "max_bulk_actions" : 20000, "type" : "jdbc", "jdbc" : { "url" : "jdbc:mysql://localhost:3306/test", "user" : "", "password" : "", "sql" : "select *, page_id as _id from page", - "fetchsize" : "min", "treat_binary_as_string" : true, "index" : "metawiki" } diff --git a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/feeder/JDBCFeeder.java b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/feeder/JDBCFeeder.java index a69a1ecd..618c9b20 100644 --- a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/feeder/JDBCFeeder.java +++ b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/feeder/JDBCFeeder.java @@ -248,7 +248,7 @@ private IngestFactory createIngestFactory(final Settings settings) { return new IngestFactory() { @Override public Ingest create() { - Integer maxbulkactions = settings.getAsInt("max_bulk_actions", 1000); + Integer maxbulkactions = settings.getAsInt("max_bulk_actions", 10000); Integer maxconcurrentbulkrequests = settings.getAsInt("max_concurrent_bulk_requests", Runtime.getRuntime().availableProcessors() * 2); ByteSizeValue maxvolume = settings.getAsBytesSize("max_bulk_volume", ByteSizeValue.parseBytesSizeValue("10m")); diff --git a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/river/JDBCRiver.java b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/river/JDBCRiver.java index a6bb3539..88ad13b3 100644 --- a/src/main/java/org/xbib/elasticsearch/plugin/jdbc/river/JDBCRiver.java +++ b/src/main/java/org/xbib/elasticsearch/plugin/jdbc/river/JDBCRiver.java @@ -222,7 +222,7 @@ private IngestFactory createIngestFactory(final Settings settings) { return new IngestFactory() { @Override public Ingest create() { - Integer maxbulkactions = settings.getAsInt("max_bulk_actions", 1000); + Integer maxbulkactions = settings.getAsInt("max_bulk_actions", 10000); Integer maxconcurrentbulkrequests = settings.getAsInt("max_concurrent_bulk_requests", Runtime.getRuntime().availableProcessors() * 2); ByteSizeValue maxvolume = settings.getAsBytesSize("max_bulk_volume", ByteSizeValue.parseBytesSizeValue("10m")); From d9de01c33617c1441b4d07097f736fcfdeb9040c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Sun, 19 Oct 2014 11:06:29 +0200 Subject: [PATCH 4/4] 1.4.0.3.Beta1 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 86593958..1349eced 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ org.xbib.elasticsearch.plugin elasticsearch-river-jdbc - 1.4.0.2.Beta1 + 1.4.0.3.Beta1 jar