Skip to content

Commit

Permalink
Merge pull request #18 from RADAR-CNS/0.3_release
Browse files Browse the repository at this point in the history
0.3 release
  • Loading branch information
blootsvoets committed May 17, 2017
2 parents 8aab748 + 7d54b1c commit 80fefa0
Show file tree
Hide file tree
Showing 40 changed files with 1,819 additions and 1,396 deletions.
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ repositories {
}
dependencies {
compile group: 'org.radarcns', name: 'radar-commons', version: '0.2.1'
compile group: 'org.radarcns', name: 'radar-commons', version: '0.3'
}
```

Expand All @@ -26,10 +26,16 @@ repositories {
}
dependencies {
testCompile group: 'org.radarcns', name: 'radar-commons-testing', version: '0.2.1'
testCompile group: 'org.radarcns', name: 'radar-commons-testing', version: '0.3'
}
```

To test your backend with a MockProducer, copy `testing/mock.yml.template` to `testing/mock.yml` and edit its parameters. Then run
```
./gradlew :testing:run
```
to send data to your backend.

## Contributing

For latest code use `dev` branch.
Expand Down
9 changes: 8 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ allprojects {
// Configuration //
//---------------------------------------------------------------------------//

version = '0.2.1'
version = '0.3'
group = 'org.radarcns'
ext.githubRepoName = 'RADAR-CNS/RADAR-Commons'

Expand Down Expand Up @@ -87,6 +87,12 @@ allprojects {
email 'joris@thehyve.nl'
organization 'The Hyve'
}
developer {
id 'fnobilia'
name 'Francesco Nobilia'
email 'francesco.nobilia@kcl.ac.uk'
organization 'King\'s College London'
}
}
issueManagement {
system 'GitHub'
Expand Down Expand Up @@ -214,6 +220,7 @@ dependencies {
testImplementation group: 'org.mockito', name: 'mockito-core', version: mockitoVersion
testImplementation group: 'org.hamcrest', name: 'hamcrest-all', version: hamcrestVersion
testImplementation group: 'com.squareup.okhttp3', name: 'mockwebserver', version: okhttpVersion

codacy group: 'com.github.codacy', name: 'codacy-coverage-reporter', version: codacyVersion
}

Expand Down
12 changes: 12 additions & 0 deletions src/main/java/org/radarcns/config/ServerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.radarcns.config;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSetter;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.Proxy;
Expand All @@ -37,6 +38,7 @@ public class ServerConfig {
private String proxyHost;
@JsonProperty("proxy_port")
private int proxyPort = -1;
private boolean unsafe = false;

public ServerConfig() {
// POJO initializer
Expand Down Expand Up @@ -175,6 +177,7 @@ public String getPath() {
return path;
}

@JsonSetter("path")
public final void setPath(String path) {
if (path == null) {
this.path = "/";
Expand Down Expand Up @@ -207,6 +210,7 @@ public boolean equals(Object other) {
ServerConfig otherConfig = (ServerConfig) other;
return Objects.equals(host, otherConfig.host)
&& port == otherConfig.port
&& unsafe == otherConfig.unsafe
&& Objects.equals(protocol, otherConfig.protocol)
&& Objects.equals(proxyHost, otherConfig.proxyHost)
&& proxyPort == otherConfig.proxyPort;
Expand All @@ -221,4 +225,12 @@ public int hashCode() {
result = 31 * result + proxyPort;
return result;
}

public boolean isUnsafe() {
return unsafe;
}

public void setUnsafe(boolean unsafe) {
this.unsafe = unsafe;
}
}
18 changes: 11 additions & 7 deletions src/main/java/org/radarcns/producer/SchemaRetriever.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericContainer;
import org.radarcns.config.ServerConfig;
import org.radarcns.producer.rest.ManagedConnectionPool;
import org.radarcns.producer.rest.ParsedSchemaMetadata;
import org.radarcns.producer.rest.RestClient;
import org.slf4j.Logger;
Expand Down Expand Up @@ -72,13 +73,15 @@ public SchemaRetriever(ServerConfig config, long connectionTimeout) {
cache = new ConcurrentHashMap<>();
jsonFactory = new JsonFactory();
reader = new ObjectMapper(jsonFactory).readerFor(JsonNode.class);
httpClient = new RestClient(config, connectionTimeout);
httpClient = new RestClient(config, connectionTimeout, ManagedConnectionPool.GLOBAL_POOL);
}

public synchronized void setConnectionTimeout(long connectionTimeout) {
if (httpClient.getTimeout() != connectionTimeout) {
RestClient newHttpClient = new RestClient(httpClient.getConfig(), connectionTimeout,
ManagedConnectionPool.GLOBAL_POOL);
httpClient.close();
httpClient = new RestClient(httpClient.getConfig(), connectionTimeout);
httpClient = newHttpClient;
}
}

Expand All @@ -103,13 +106,13 @@ protected ParsedSchemaMetadata retrieveSchemaMetadata(String subject, int versio
RestClient restClient = getRestClient();
Request request = restClient.requestBuilder(path)
.addHeader("Accept", "application/json")
.get()
.build();

try (Response response = restClient.request(request)) {
if (!response.isSuccessful()) {
throw new IOException("Cannot retrieve metadata (HTTP " + response.code()
+ ": " + response.message() + ") -> " + response.body().string());
throw new IOException("Cannot retrieve metadata " + request.url()
+ " (HTTP " + response.code() + ": " + response.message()
+ ") -> " + response.body().string());
}
JsonNode node = reader.readTree(response.body().byteStream());
int newVersion = version < 1 ? node.get("version").asInt() : version;
Expand Down Expand Up @@ -157,8 +160,9 @@ public void addSchemaMetadata(String topic, boolean ofValue, ParsedSchemaMetadat

try (Response response = restClient.request(request)) {
if (!response.isSuccessful()) {
throw new IOException("Cannot post schema (HTTP " + response.code()
+ ": " + response.message() + ") -> " + response.body().string());
throw new IOException("Cannot post schema to " + request.url()
+ " (HTTP " + response.code() + ": " + response.message()
+ ") -> " + response.body().string());
}
JsonNode node = reader.readTree(response.body().byteStream());
int schemaId = node.get("id").asInt();
Expand Down
87 changes: 87 additions & 0 deletions src/main/java/org/radarcns/producer/rest/ConnectionState.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2017 The Hyve and King's College London
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.radarcns.producer.rest;

import java.util.concurrent.TimeUnit;

/**
* Current connection status of a KafkaSender. After a timeout occurs this will turn to
* disconnected. When the connection is dropped, the associated KafkaSender should set this to
* disconnected, when it successfully connects, it should set it to connected. This class is
* thread-safe. The state transition diagram is CONNECTED to and from DISCONNECTED with
* {@link #didConnect()} and {@link #didDisconnect()}; CONNECTED to and from UNKNOWN with
* {@link #getState()} after a timeout occurs and {@link #didConnect()}; and UNKNOWN to DISCONNECTED
* with {@link #didDisconnect()}.
*
* A connection state could be shared with multiple HTTP clients if they are talking to the same
* server.
*/
public final class ConnectionState {

/** State symbols of the connection. */
public enum State {
CONNECTED, DISCONNECTED, UNKNOWN
}

private long timeout;
private long lastConnection;
private State state;

/**
* Connection state with given timeout. The state will start as connected.
* @param timeout timeout
* @param unit unit of the timeout
* @throws IllegalArgumentException if the timeout is not strictly positive.
*/
public ConnectionState(long timeout, TimeUnit unit) {
lastConnection = -1L;
state = State.UNKNOWN;
setTimeout(timeout, unit);
}

/** Current state of the connection. */
public synchronized State getState() {
if (state == State.CONNECTED && System.currentTimeMillis() - lastConnection >= timeout) {
state = State.UNKNOWN;
}
return state;
}

/** For a sender to indicate that a connection attempt succeeded. */
public synchronized void didConnect() {
state = State.CONNECTED;
lastConnection = System.currentTimeMillis();
}

/** For a sender to indicate that a connection attempt failed. */
public synchronized void didDisconnect() {
state = State.DISCONNECTED;
}

/**
* Set the timeout after which the state will go from CONNECTED to UNKNOWN.
* @param timeout timeout
* @param unit unit of the timeout
* @throws IllegalArgumentException if the timeout is not strictly positive
*/
public synchronized void setTimeout(long timeout, TimeUnit unit) {
if (timeout <= 0) {
throw new IllegalArgumentException("Timeout must be strictly positive");
}
this.timeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.GZIPOutputStream;
import okhttp3.MediaType;
import okio.BufferedSink;

/**
* TopicRequestData in a Gzipped RequestBody.
*/
class GzipTopicRequestBody extends TopicRequestBody {

GzipTopicRequestBody(TopicRequestData requestData) throws IOException {
super(requestData);
GzipTopicRequestBody(TopicRequestData requestData, MediaType mediaType) throws IOException {
super(requestData, mediaType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2017 The Hyve and King's College London
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.radarcns.producer.rest;

import okhttp3.ConnectionPool;

/**
* Manages a connection pool. Using this class properly ensures that all resources are released
* after a global ConnectionPool is no longer used. Each call to {@link #acquire()} must be matched
* with exactly one call to {@link #release()} once the acquired ConnectionPool is no longer used.
* This class is thread-safe.
*/
public class ManagedConnectionPool {
public static final ManagedConnectionPool GLOBAL_POOL = new ManagedConnectionPool();
private ConnectionPool connectionPool;
private int references;

public ManagedConnectionPool() {
references = 0;
}

/**
* Acquire access to a connection pool. A call to this must be matched with exactly one call
* to {@link #release()}.
* @return the single connection pool managed by this object
*/
public synchronized ConnectionPool acquire() {
if (references == 0) {
connectionPool = new ConnectionPool();
}
references++;
return connectionPool;
}

/**
* Release access to a connection pool once it is no longer used.
* @throws IllegalStateException if release is called more often than acquire.
*/
public synchronized void release() {
if (references == 0) {
throw new IllegalStateException(
"Cannot release a connection pool that was not acquired.");
}
references--;
if (references == 0) {
connectionPool = null;
}
}
}
Loading

0 comments on commit 80fefa0

Please sign in to comment.