Skip to content

Commit

Permalink
Merge pull request #41 from RADAR-CNS/v0.6.1_release
Browse files Browse the repository at this point in the history
V0.6.1 release
  • Loading branch information
blootsvoets authored Nov 23, 2017
2 parents aa4927d + 42f3de3 commit cd5d89b
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 60 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ repositories {
}
dependencies {
compile group: 'org.radarcns', name: 'radar-commons', version: '0.6'
compile group: 'org.radarcns', name: 'radar-commons', version: '0.6.1'
}
```

Expand All @@ -26,7 +26,7 @@ repositories {
}
dependencies {
testCompile group: 'org.radarcns', name: 'radar-commons-testing', version: '0.6'
testCompile group: 'org.radarcns', name: 'radar-commons-testing', version: '0.6.1'
}
```

Expand All @@ -51,7 +51,7 @@ configurations.all {
}
dependencies {
compile group: 'org.radarcns', name: 'radar-commons', version: '0.6.1-SNAPSHOT', changing: true
compile group: 'org.radarcns', name: 'radar-commons', version: '0.6.2-SNAPSHOT', changing: true
}
```

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ allprojects {
// Configuration //
//---------------------------------------------------------------------------//

version = '0.6'
version = '0.6.1'
group = 'org.radarcns'
ext.githubRepoName = 'RADAR-CNS/RADAR-Commons'

Expand Down
53 changes: 39 additions & 14 deletions src/main/java/org/radarcns/config/ServerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSetter;
import okhttp3.HttpUrl;

import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.Proxy;
Expand Down Expand Up @@ -95,10 +97,34 @@ public static String getPaths(List<ServerConfig> configList) {
* Get the server as a URL.
*
* @return URL to the server.
* @throws MalformedURLException if protocol is not set or the host name is invalid.
* @throws IllegalStateException if the URL is invalid
*/
public URL getUrl() throws MalformedURLException {
return new URL(protocol, host, port, path == null ? "" : path);
public URL getUrl() {
try {
return new URL(protocol, host, port, path == null ? "" : path);
} catch (MalformedURLException ex) {
throw new IllegalStateException("Already parsed a URL but it turned out invalid", ex);
}
}

/**
* Get the server as an HttpUrl.
* @return HttpUrl to the server
* @throws IllegalStateException if the URL is invalid
*/
public HttpUrl getHttpUrl() {
HttpUrl.Builder urlBuilder = new HttpUrl.Builder()
.scheme(protocol)
.host(host);

if (port != -1) {
urlBuilder.port(port);
}
if (path != null) {
urlBuilder.encodedPath(path);
}

return urlBuilder.build();
}

/**
Expand Down Expand Up @@ -185,11 +211,15 @@ public final void setPath(String path) {
throw new IllegalArgumentException("Cannot set server path with query string");
} else {
this.path = path.trim();
if (!this.path.isEmpty() && this.path.charAt(0) != '/') {
this.path = '/' + this.path;
}
if (!this.path.isEmpty() && this.path.charAt(this.path.length() - 1) != '/') {
this.path += '/';
if (this.path.isEmpty()) {
this.path = "/";
} else {
if (this.path.charAt(0) != '/') {
this.path = '/' + this.path;
}
if (this.path.charAt(this.path.length() - 1) != '/') {
this.path += '/';
}
}
}
}
Expand Down Expand Up @@ -218,12 +248,7 @@ public boolean equals(Object other) {

@Override
public int hashCode() {
int result = host != null ? host.hashCode() : 0;
result = 31 * result + port;
result = 31 * result + (protocol != null ? protocol.hashCode() : 0);
result = 31 * result + (proxyHost != null ? proxyHost.hashCode() : 0);
result = 31 * result + proxyPort;
return result;
return Objects.hash(protocol, host, port);
}

public boolean isUnsafe() {
Expand Down
39 changes: 29 additions & 10 deletions src/main/java/org/radarcns/producer/rest/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.radarcns.producer.rest;

import okhttp3.Callback;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.OkHttpClient.Builder;
import okhttp3.Request;
Expand All @@ -32,7 +34,6 @@
import java.io.Closeable;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
Expand Down Expand Up @@ -185,6 +186,17 @@ public Response request(Request request) throws IOException {
return httpClient.newCall(request).execute();
}

/**
* Make an asynchronous request.
* @param request request, possibly built with {@link #requestBuilder(String)}
* @param callback callback to activate once the request is done.
*/
public void request(Request request, Callback callback) {
Objects.requireNonNull(request);
Objects.requireNonNull(callback);
httpClient.newCall(request).enqueue(callback);
}

/**
* Make a request to given relative path. This does not set any request properties except the
* URL.
Expand All @@ -207,13 +219,8 @@ public Response request(String relativePath) throws IOException {
*/
public String requestString(Request request) throws IOException {
try (Response response = request(request)) {
ResponseBody body = response.body();

String bodyString = null;
String bodyString = responseBody(response);

if (body != null) {
bodyString = body.string();
}
if (!response.isSuccessful() || bodyString == null) {
throw new RestException(response.code(), bodyString);
}
Expand All @@ -223,7 +230,7 @@ public String requestString(Request request) throws IOException {
}

/**
* Create a OkHttp3 request builder with {@link Request.Builder#url(URL)} set.
* Create a OkHttp3 request builder with {@link Request.Builder#url(HttpUrl)} set.
* Call{@link Request.Builder#build()} to make the actual request with
* {@link #request(Request)}.
*
Expand All @@ -241,12 +248,16 @@ public Request.Builder requestBuilder(String relativePath) throws MalformedURLEx
* @return URL
* @throws MalformedURLException if the path is malformed
*/
public URL getRelativeUrl(String path) throws MalformedURLException {
public HttpUrl getRelativeUrl(String path) throws MalformedURLException {
String strippedPath = path;
while (!strippedPath.isEmpty() && strippedPath.charAt(0) == '/') {
strippedPath = strippedPath.substring(1);
}
return new URL(getConfig().getUrl(), strippedPath);
HttpUrl.Builder builder = getConfig().getHttpUrl().newBuilder(strippedPath);
if (builder == null) {
throw new MalformedURLException();
}
return builder.build();
}

@Override
Expand Down Expand Up @@ -285,4 +296,12 @@ public void close() {
connectionPool.release();
}
}

public static String responseBody(Response response) throws IOException {
ResponseBody body = response.body();
if (body == null) {
return null;
}
return body.string();
}
}
53 changes: 28 additions & 25 deletions src/main/java/org/radarcns/producer/rest/RestSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,16 @@

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import static org.radarcns.producer.rest.RestClient.responseBody;
import static org.radarcns.producer.rest.TopicRequestBody.topicRequestContent;

/**
* RestSender sends records to the Kafka REST Proxy. It does so using an Avro JSON encoding. A new
* sender must be constructed with {@link #sender(AvroTopic)} per AvroTopic. This implementation is
Expand Down Expand Up @@ -130,8 +132,8 @@ public synchronized void setKafkaConfig(ServerConfig kafkaConfig) {

private void setRestClient(RestClient newClient) {
try {
schemalessKeyUrl = HttpUrl.get(newClient.getRelativeUrl("topics/schemaless-key"));
schemalessValueUrl = HttpUrl.get(newClient.getRelativeUrl("topics/schemaless-value"));
schemalessKeyUrl = newClient.getRelativeUrl("topics/schemaless-key");
schemalessValueUrl = newClient.getRelativeUrl("topics/schemaless-value");
isConnectedRequest = newClient.requestBuilder("").head();
} catch (MalformedURLException ex) {
throw new IllegalArgumentException("Schemaless topics do not have a valid URL", ex);
Expand Down Expand Up @@ -187,11 +189,7 @@ private class RestTopicSender<L extends K, W extends V> implements KafkaTopicSen

private RestTopicSender(AvroTopic<L, W> topic) throws IOException {
this.topic = topic;
URL rawUrl = getRestClient().getRelativeUrl("topics/" + topic.getName());
url = HttpUrl.get(rawUrl);
if (url == null) {
throw new MalformedURLException("Cannot parse " + rawUrl);
}
url = getRestClient().getRelativeUrl("topics/" + topic.getName());
requestData = new TopicRequestData<>(topic, keyEncoder, valueEncoder);
}

Expand Down Expand Up @@ -223,7 +221,7 @@ public void send(List<Record<L, W>> records) throws IOException {
state.didConnect();
if (logger.isDebugEnabled()) {
logger.debug("Added message to topic {} -> {}",
topic, response.body().string());
topic, responseBody(response));
}
lastOffsetSent = records.get(records.size() - 1).offset;
} else if (response.code() == 401) {
Expand All @@ -239,23 +237,10 @@ public void send(List<Record<L, W>> records) throws IOException {
}
doResend = true;
} else {
state.didDisconnect();
String content = response.body().string();
String requestContent = ((TopicRequestBody)request.body()).content();
requestContent = requestContent.substring(0,
Math.min(requestContent.length(), LOG_CONTENT_LENGTH));
logger.error("FAILED to transmit message: {} -> {}...",
content, requestContent);
throw new IOException("Failed to submit (HTTP status code " + response.code()
+ "): " + content);
logFailure(request, response, null);
}
} catch (IOException ex) {
state.didDisconnect();
String requestContent = ((TopicRequestBody)request.body()).content();
requestContent = requestContent.substring(0,
Math.min(requestContent.length(), LOG_CONTENT_LENGTH));
logger.error("FAILED to transmit message:\n{}...", requestContent);
throw ex;
logFailure(request, null, ex);
} finally {
requestData.reset();
}
Expand All @@ -265,6 +250,23 @@ public void send(List<Record<L, W>> records) throws IOException {
}
}

@SuppressWarnings("ConstantConditions")
private void logFailure(Request request, Response response, Exception ex)
throws IOException {
state.didDisconnect();
String content = response == null ? null : responseBody(response);
int code = response == null ? -1 : response.code();
String requestContent = topicRequestContent(request);
if (requestContent != null) {
requestContent = requestContent.substring(0,
Math.min(requestContent.length(), LOG_CONTENT_LENGTH));
}
logger.error("FAILED to transmit message: {} -> {}...",
content, requestContent);
throw new IOException("Failed to submit (HTTP status code " + code
+ "): " + content, ex);
}

private Request buildRequest(List<Record<L, W>> records) throws IOException {
HttpUrl sendToUrl = updateRequestData(records);

Expand Down Expand Up @@ -371,8 +373,9 @@ public boolean resetConnection() {
return true;
} else {
state.didDisconnect();
String bodyString = responseBody(response);
logger.warn("Failed to make heartbeat request to {} (HTTP status code {}): {}",
httpClient, response.code(), response.body().string());
httpClient, response.code(), bodyString);
return false;
}
} catch (IOException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.OutputStream;
import okhttp3.MediaType;
import okhttp3.Request;
import okhttp3.RequestBody;
import okio.BufferedSink;

Expand Down Expand Up @@ -53,4 +54,12 @@ String content() throws IOException {
return out.toString();
}
}

public static String topicRequestContent(Request request) throws IOException {
TopicRequestBody body = (TopicRequestBody) request.body();
if (body == null) {
return null;
}
return body.content();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ void writeToStream(OutputStream out) throws IOException {
writer.append("\"key_schema_id\":").append(keySchemaId.toString());
} else {
writer.append("\"key_schema\":");
JSONObject.quote(keySchemaString, writer);
writer.append(JSONObject.quote(keySchemaString));
}
if (valueSchemaId != null) {
writer.append(",\"value_schema_id\":").append(valueSchemaId.toString());
} else {
writer.append(",\"value_schema\":");
JSONObject.quote(valueSchemaString, writer);
writer.append(JSONObject.quote(valueSchemaString));
}
writer.append(",\"records\":[");

Expand Down
25 changes: 25 additions & 0 deletions src/test/java/org/radarcns/config/ServerConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;

import okhttp3.HttpUrl;
import org.junit.Test;

/**
Expand Down Expand Up @@ -60,4 +63,26 @@ public void jacksonUrl() throws IOException {
+ "path: /schema"))
.getUrlString());
}

@Test
public void getHttpUrl() throws MalformedURLException {
ServerConfig config = new ServerConfig("http://something.else/that");
HttpUrl url = config.getHttpUrl();
assertEquals("http://something.else/that/", url.toString());
assertEquals("something.else", url.host());
assertEquals("http", url.scheme());
assertEquals(80, url.port());
assertEquals("/that/", url.encodedPath());
}

@Test
public void getHttpUrlWitoutRoot() throws MalformedURLException {
ServerConfig config = new ServerConfig("http://something.else");
HttpUrl url = config.getHttpUrl();
assertEquals("http://something.else/", url.toString());
assertEquals("something.else", url.host());
assertEquals("http", url.scheme());
assertEquals(80, url.port());
assertEquals("/", url.encodedPath());
}
}
Loading

0 comments on commit cd5d89b

Please sign in to comment.