From bf666fed134736bbbcf9a2a96f115ecadc2b9f0e Mon Sep 17 00:00:00 2001 From: mkcons Date: Thu, 9 Jul 2020 11:33:55 +0200 Subject: [PATCH 1/4] =?UTF-8?q?Implement=20the=20following=20changes:=20-?= =?UTF-8?q?=20Clear=20the=20log=20buffer=20so=20that=20when=20the=20progra?= =?UTF-8?q?m=20is=20about=20to=20finish=20pending=20messages=20are=20sent?= =?UTF-8?q?=20to=20Elasticsearch.=20-=20Add=20the=20capability=20of=20tran?= =?UTF-8?q?smitting=20compressed=20messages.=20-=20Add=20the=20capability?= =?UTF-8?q?=20of=20logging=20Java=20POJOs=20as=20JSON.=20-=20Whenever=20a?= =?UTF-8?q?=20key-value=20parameter=20is=20logged=20as=20part=20of=20a=20m?= =?UTF-8?q?essage=20=E2=80=9Cserialized=20document=20with=20docOid=3D1234?= =?UTF-8?q?=E2=80=9D,=20we=20log=20it=20outside=20as=20well,=20as=20other?= =?UTF-8?q?=20appenders=20do=20=E2=80=9Cserialized=20document=20with=20doc?= =?UTF-8?q?Oid=3D1234=E2=80=9D=20+=20=E2=80=9CdocOid=3D1234=E2=80=9D.=20-?= =?UTF-8?q?=20Allow=20the=20users=20to=20add=20a=20keyPrefix,=20so=20we=20?= =?UTF-8?q?would=20have=20prefix.docOid=3D1234.=20-=20Add=20closing=20reso?= =?UTF-8?q?urces=20following=20https://github.com/internetitem/logback-ela?= =?UTF-8?q?sticsearch-appender/pull/72/commits/6a5bc60aea6dfa501383a074643?= =?UTF-8?q?11272703a4dcf?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 11 +++ .../AbstractElasticsearchAppender.java | 25 +++++++ .../AbstractElasticsearchPublisher.java | 29 +++++++- .../StructuredArgsElasticsearchAppender.java | 21 ++++++ .../StructuredArgsElasticsearchPublisher.java | 67 +++++++++++++++++++ .../elasticsearch/config/Settings.java | 19 ++++++ .../writer/ElasticsearchWriter.java | 29 ++++++-- 7 files changed, 194 insertions(+), 7 deletions(-) create mode 100644 src/main/java/com/internetitem/logback/elasticsearch/StructuredArgsElasticsearchAppender.java create mode 100644 src/main/java/com/internetitem/logback/elasticsearch/StructuredArgsElasticsearchPublisher.java diff --git a/pom.xml b/pom.xml index 27f073b..a2040a1 100644 --- a/pom.xml +++ b/pom.xml @@ -67,6 +67,11 @@ jackson-core 2.8.0 + + com.fasterxml.jackson.core + jackson-databind + 2.8.0 + com.amazonaws aws-java-sdk-core @@ -85,6 +90,12 @@ 1.10.19 test + + + net.logstash.logback + logstash-logback-encoder + 6.3 + diff --git a/src/main/java/com/internetitem/logback/elasticsearch/AbstractElasticsearchAppender.java b/src/main/java/com/internetitem/logback/elasticsearch/AbstractElasticsearchAppender.java index b0e7cb1..bec6566 100644 --- a/src/main/java/com/internetitem/logback/elasticsearch/AbstractElasticsearchAppender.java +++ b/src/main/java/com/internetitem/logback/elasticsearch/AbstractElasticsearchAppender.java @@ -22,12 +22,28 @@ public abstract class AbstractElasticsearchAppender extends UnsynchronizedApp public AbstractElasticsearchAppender() { this.settings = new Settings(); this.headers = new HttpRequestHeaders(); + registerShutdownHook(); } public AbstractElasticsearchAppender(Settings settings) { this.settings = settings; + registerShutdownHook(); } + private void registerShutdownHook() { + Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook())); + } + + private class ShutdownHook implements Runnable { + @Override + public void run() { + stop(); + if(publisher != null) { + publisher.close(); + } + } + } + @Override public void start() { super.start(); @@ -138,4 +154,13 @@ public void setAuthentication(Authentication auth) { public void setMaxMessageSize(int maxMessageSize) { settings.setMaxMessageSize(maxMessageSize); } + + public void setKeyPrefix(String keyPrefix) { + settings.setKeyPrefix(keyPrefix); + } + + public void setObjectSerialization(boolean objectSerialization) { + settings.setObjectSerialization(objectSerialization); + } + } diff --git a/src/main/java/com/internetitem/logback/elasticsearch/AbstractElasticsearchPublisher.java b/src/main/java/com/internetitem/logback/elasticsearch/AbstractElasticsearchPublisher.java index 98d7034..816fc7e 100644 --- a/src/main/java/com/internetitem/logback/elasticsearch/AbstractElasticsearchPublisher.java +++ b/src/main/java/com/internetitem/logback/elasticsearch/AbstractElasticsearchPublisher.java @@ -3,6 +3,7 @@ import ch.qos.logback.core.Context; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; import com.internetitem.logback.elasticsearch.config.ElasticsearchProperties; import com.internetitem.logback.elasticsearch.config.HttpRequestHeaders; import com.internetitem.logback.elasticsearch.config.Property; @@ -51,6 +52,8 @@ protected DateFormat initialValue() { private final PropertySerializer propertySerializer; + private Thread thread; + public AbstractElasticsearchPublisher(Context context, ErrorReporter errorReporter, Settings settings, ElasticsearchProperties properties, HttpRequestHeaders headers) throws IOException { this.errorReporter = errorReporter; this.events = new ArrayList(); @@ -59,7 +62,8 @@ public AbstractElasticsearchPublisher(Context context, ErrorReporter errorReport this.outputAggregator = configureOutputAggregator(settings, errorReporter, headers); - this.jf = new JsonFactory(); + this.jf = buildJsonFactory(settings); + this.jf.setRootValueSeparator(null); this.jsonGenerator = jf.createGenerator(outputAggregator); @@ -67,6 +71,20 @@ public AbstractElasticsearchPublisher(Context context, ErrorReporter errorReport this.propertyList = generatePropertyList(context, properties); this.propertySerializer = new PropertySerializer(); + } + + public void close() { + if(thread != null) { + thread.interrupt(); + } + } + + private JsonFactory buildJsonFactory(Settings settings) { + if(settings.isObjectSerialization()) { + ObjectMapper mapper = new ObjectMapper(); + return mapper.getFactory(); + } + return new JsonFactory(); } private static ElasticsearchOutputAggregator configureOutputAggregator(Settings settings, ErrorReporter errorReporter, HttpRequestHeaders httpRequestHeaders) { @@ -108,7 +126,7 @@ public void addEvent(T event) { events.add(event); if (!working) { working = true; - Thread thread = new Thread(this, THREAD_NAME_PREFIX + THREAD_COUNTER.getAndIncrement()); + thread = new Thread(this, THREAD_NAME_PREFIX + THREAD_COUNTER.getAndIncrement()); thread.start(); } } @@ -119,7 +137,11 @@ public void run() { int maxRetries = settings.getMaxRetries(); while (true) { try { - Thread.sleep(settings.getSleepTime()); + try { + Thread.sleep(settings.getSleepTime()); + } catch(InterruptedException e) { + // we are waking up the thread + } List eventsCopy = null; synchronized (lock) { @@ -152,6 +174,7 @@ public void run() { if (!outputAggregator.sendData()) { currentTry++; } + } catch (Exception e) { errorReporter.logError("Internal error handling log data: " + e.getMessage(), e); currentTry++; diff --git a/src/main/java/com/internetitem/logback/elasticsearch/StructuredArgsElasticsearchAppender.java b/src/main/java/com/internetitem/logback/elasticsearch/StructuredArgsElasticsearchAppender.java new file mode 100644 index 0000000..c1bf2b8 --- /dev/null +++ b/src/main/java/com/internetitem/logback/elasticsearch/StructuredArgsElasticsearchAppender.java @@ -0,0 +1,21 @@ +package com.internetitem.logback.elasticsearch; + +import com.internetitem.logback.elasticsearch.config.Settings; + +import java.io.IOException; + +public class StructuredArgsElasticsearchAppender extends ElasticsearchAppender { + + public StructuredArgsElasticsearchAppender() { + } + + public StructuredArgsElasticsearchAppender(Settings settings) { + super(settings); + } + + protected StructuredArgsElasticsearchPublisher buildElasticsearchPublisher() throws IOException { + return new StructuredArgsElasticsearchPublisher(this.getContext(), this.errorReporter, this.settings, + this.elasticsearchProperties, this.headers); + } + +} diff --git a/src/main/java/com/internetitem/logback/elasticsearch/StructuredArgsElasticsearchPublisher.java b/src/main/java/com/internetitem/logback/elasticsearch/StructuredArgsElasticsearchPublisher.java new file mode 100644 index 0000000..428f5bd --- /dev/null +++ b/src/main/java/com/internetitem/logback/elasticsearch/StructuredArgsElasticsearchPublisher.java @@ -0,0 +1,67 @@ +package com.internetitem.logback.elasticsearch; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.Context; +import com.fasterxml.jackson.core.JsonGenerator; +import com.internetitem.logback.elasticsearch.config.ElasticsearchProperties; +import com.internetitem.logback.elasticsearch.config.HttpRequestHeaders; +import com.internetitem.logback.elasticsearch.config.Settings; +import com.internetitem.logback.elasticsearch.util.ErrorReporter; +import net.logstash.logback.marker.ObjectAppendingMarker; + +import java.io.IOException; +import java.lang.reflect.Field; + +public class StructuredArgsElasticsearchPublisher extends ClassicElasticsearchPublisher { + private String keyPrefix; + private Field field; + private ErrorReporter errorReporter; + + public StructuredArgsElasticsearchPublisher(Context context, ErrorReporter errorReporter, Settings settings, ElasticsearchProperties properties, + HttpRequestHeaders headers) throws IOException { + super(context, errorReporter, settings, properties, headers); + + this.errorReporter = errorReporter; + + keyPrefix = ""; + if(settings != null && settings.getKeyPrefix() != null) { + keyPrefix = settings.getKeyPrefix(); + } + + try { + field = ObjectAppendingMarker.class.getDeclaredField("object"); + field.setAccessible(true); + } catch (NoSuchFieldException e) { + // message will be logged without object + errorReporter.logError("error in logging with object serialization", e); + } + } + + protected void serializeCommonFields(JsonGenerator gen, ILoggingEvent event) throws IOException { + super.serializeCommonFields(gen, event); + + if(event.getArgumentArray() != null) { + Object[] eventArgs = event.getArgumentArray(); + for(Object eventArg:eventArgs) { + if(eventArg instanceof ObjectAppendingMarker) { + ObjectAppendingMarker marker = (ObjectAppendingMarker) eventArg; + if(field != null && settings != null && settings.isObjectSerialization() && + marker.getFieldValue().toString().contains("@")) { + try { + Object obj = field.get(marker); + if(obj != null) { + gen.writeObjectField(keyPrefix + marker.getFieldName(), obj); + } + } catch (IllegalAccessException e) { + // message will be logged without object + errorReporter.logError("error in logging with object serialization", e); + } + } + else + gen.writeObjectField(keyPrefix + marker.getFieldName(), marker.getFieldValue()); + } + } + } + } + +} diff --git a/src/main/java/com/internetitem/logback/elasticsearch/config/Settings.java b/src/main/java/com/internetitem/logback/elasticsearch/config/Settings.java index e349b47..ac56973 100644 --- a/src/main/java/com/internetitem/logback/elasticsearch/config/Settings.java +++ b/src/main/java/com/internetitem/logback/elasticsearch/config/Settings.java @@ -23,6 +23,8 @@ public class Settings { private int maxQueueSize = 100 * 1024 * 1024; private Authentication authentication; private int maxMessageSize = -1; + private String keyPrefix; + private boolean objectSerialization; public String getIndex() { return index; @@ -162,4 +164,21 @@ public int getMaxMessageSize() { public void setMaxMessageSize(int maxMessageSize) { this.maxMessageSize = maxMessageSize; } + + public String getKeyPrefix() { + return this.keyPrefix; + } + + public void setKeyPrefix(String keyPrefix) { + this.keyPrefix = keyPrefix; + } + + public boolean isObjectSerialization() { + return objectSerialization; + } + + public void setObjectSerialization(boolean objectSerialization) { + this.objectSerialization = objectSerialization; + } + } diff --git a/src/main/java/com/internetitem/logback/elasticsearch/writer/ElasticsearchWriter.java b/src/main/java/com/internetitem/logback/elasticsearch/writer/ElasticsearchWriter.java index 93def3c..4f4a7ec 100644 --- a/src/main/java/com/internetitem/logback/elasticsearch/writer/ElasticsearchWriter.java +++ b/src/main/java/com/internetitem/logback/elasticsearch/writer/ElasticsearchWriter.java @@ -8,11 +8,13 @@ import java.net.HttpURLConnection; import java.util.Collection; import java.util.Collections; +import java.util.zip.GZIPOutputStream; import com.internetitem.logback.elasticsearch.config.HttpRequestHeader; import com.internetitem.logback.elasticsearch.config.HttpRequestHeaders; import com.internetitem.logback.elasticsearch.config.Settings; import com.internetitem.logback.elasticsearch.util.ErrorReporter; +import org.apache.http.HttpHeaders; public class ElasticsearchWriter implements SafeWriter { @@ -23,6 +25,7 @@ public class ElasticsearchWriter implements SafeWriter { private Collection headerList; private boolean bufferExceeded; + private boolean compressedTransfer; public ElasticsearchWriter(ErrorReporter errorReporter, Settings settings, HttpRequestHeaders headers) { this.errorReporter = errorReporter; @@ -32,6 +35,13 @@ public ElasticsearchWriter(ErrorReporter errorReporter, Settings settings, HttpR : Collections.emptyList(); this.sendBuffer = new StringBuilder(); + compressedTransfer = false; + for(HttpRequestHeader header : this.headerList) { + if(header.getName().toLowerCase().equals(HttpHeaders.CONTENT_ENCODING.toLowerCase()) && header.getValue().equals("gzip")) { + compressedTransfer = true; + break; + } + } } public void write(char[] cbuf, int off, int len) { @@ -72,10 +82,7 @@ public void sendData() throws IOException { settings.getAuthentication().addAuth(urlConnection, body); } - Writer writer = new OutputStreamWriter(urlConnection.getOutputStream(), "UTF-8"); - writer.write(body); - writer.flush(); - writer.close(); + writeData(urlConnection, body); int rc = urlConnection.getResponseCode(); if (rc != 200) { @@ -117,4 +124,18 @@ private static String slurpErrors(HttpURLConnection urlConnection) { } } + private void writeData(HttpURLConnection urlConnection, String body) throws IOException { + if(this.compressedTransfer) { + try(Writer writer = new OutputStreamWriter(new GZIPOutputStream(urlConnection.getOutputStream()), "UTF-8")) { + writer.write(body); + writer.flush(); + } + } else { + try(Writer writer = new OutputStreamWriter(urlConnection.getOutputStream(), "UTF-8")) { + writer.write(body); + writer.flush(); + } + } + } + } From b11db67fa0a7ab5f449d38ea666586f131ef2c01 Mon Sep 17 00:00:00 2001 From: mkcons Date: Thu, 9 Jul 2020 11:55:20 +0200 Subject: [PATCH 2/4] Close resources in slurpErrors https://github.com/internetitem/logback-elasticsearch-appender/pull/72/commits/6a5bc60aea6dfa501383a07464311272703a4dcf Fix readme --- README.md | 2 ++ .../elasticsearch/writer/ElasticsearchWriter.java | 14 +++++++------- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index f0b15b1..e35fd83 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,7 @@ In your `logback.xml`: false 100 + true host @@ -110,6 +111,7 @@ Configuration Reference * `includeMdc` (optional, default false): If set to `true`, then all [MDC](http://www.slf4j.org/api/org/slf4j/MDC.html) values will be mapped to properties on the JSON payload. * `maxMessageSize` (optional, default -1): If set to a number greater than 0, truncate messages larger than this length, then append "`..`" to denote that the message was truncated * `authentication` (optional): Add the ability to send authentication headers (see below) + * `objectSerialization` (optional): specifies whether to use POJO to JSON serialization The fields `@timestamp` and `message` are always sent and can not currently be configured. Additional fields can be sent by adding `` elements to the `` set. diff --git a/src/main/java/com/internetitem/logback/elasticsearch/writer/ElasticsearchWriter.java b/src/main/java/com/internetitem/logback/elasticsearch/writer/ElasticsearchWriter.java index 4f4a7ec..0018ffd 100644 --- a/src/main/java/com/internetitem/logback/elasticsearch/writer/ElasticsearchWriter.java +++ b/src/main/java/com/internetitem/logback/elasticsearch/writer/ElasticsearchWriter.java @@ -105,18 +105,18 @@ public boolean hasPendingData() { } private static String slurpErrors(HttpURLConnection urlConnection) { - try { - InputStream stream = urlConnection.getErrorStream(); + try (InputStream stream = urlConnection.getErrorStream()) { if (stream == null) { return ""; } StringBuilder builder = new StringBuilder(); - InputStreamReader reader = new InputStreamReader(stream, "UTF-8"); - char[] buf = new char[2048]; - int numRead; - while ((numRead = reader.read(buf)) > 0) { - builder.append(buf, 0, numRead); + try(InputStreamReader reader = new InputStreamReader(stream, "UTF-8")) { + char[] buf = new char[2048]; + int numRead; + while ((numRead = reader.read(buf)) > 0) { + builder.append(buf, 0, numRead); + } } return builder.toString(); } catch (Exception e) { From 4a79fa85334e733a43f483a910854edee206f184 Mon Sep 17 00:00:00 2001 From: mkcons Date: Thu, 9 Jul 2020 12:01:39 +0200 Subject: [PATCH 3/4] Update readme with keyPrefix --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index e35fd83..857ef8c 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,7 @@ In your `logback.xml`: 100 true + data. host @@ -112,6 +113,7 @@ Configuration Reference * `maxMessageSize` (optional, default -1): If set to a number greater than 0, truncate messages larger than this length, then append "`..`" to denote that the message was truncated * `authentication` (optional): Add the ability to send authentication headers (see below) * `objectSerialization` (optional): specifies whether to use POJO to JSON serialization + * `keyPrefix` (optional): objects logged within a message will also be logged separately with this prefix added The fields `@timestamp` and `message` are always sent and can not currently be configured. Additional fields can be sent by adding `` elements to the `` set. From 824c552631bd69ffb7ceb1e3bf8be207b22e5135 Mon Sep 17 00:00:00 2001 From: mkcons Date: Thu, 9 Jul 2020 12:04:06 +0200 Subject: [PATCH 4/4] Update readme with keyPrefix --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 857ef8c..3ab4f44 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,7 @@ In your `logback.xml`: true data. + host