Skip to content

Commit

Permalink
Merge branch 'release-0.6.3'
Browse files Browse the repository at this point in the history
  • Loading branch information
blootsvoets committed Dec 5, 2017
2 parents 5bda901 + 0144b94 commit c6d439c
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 29 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ repositories {
}
dependencies {
compile group: 'org.radarcns', name: 'radar-commons', version: '0.6.2'
compile group: 'org.radarcns', name: 'radar-commons', version: '0.6.3'
}
```

Expand All @@ -26,7 +26,7 @@ repositories {
}
dependencies {
testCompile group: 'org.radarcns', name: 'radar-commons-testing', version: '0.6.2'
testCompile group: 'org.radarcns', name: 'radar-commons-testing', version: '0.6.3'
}
```

Expand All @@ -51,7 +51,7 @@ configurations.all {
}
dependencies {
compile group: 'org.radarcns', name: 'radar-commons', version: '0.6.3-SNAPSHOT', changing: true
compile group: 'org.radarcns', name: 'radar-commons', version: '0.6.4-SNAPSHOT', changing: true
}
```

Expand Down
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ allprojects {
// Configuration //
//---------------------------------------------------------------------------//

version = '0.6.2'
version = '0.6.3'
group = 'org.radarcns'
ext.githubRepoName = 'RADAR-CNS/RADAR-Commons'

Expand Down Expand Up @@ -234,6 +234,7 @@ dependencies {
testImplementation group: 'org.mockito', name: 'mockito-core', version: mockitoVersion
testImplementation group: 'org.hamcrest', name: 'hamcrest-all', version: hamcrestVersion
testImplementation group: 'com.squareup.okhttp3', name: 'mockwebserver', version: okhttpVersion
testRuntime group: 'org.slf4j', name: 'slf4j-simple', version: slf4jVersion

codacy group: 'com.github.codacy', name: 'codacy-coverage-reporter', version: codacyVersion
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/radarcns/producer/BatchedKafkaSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ public <L extends K, W extends V> KafkaTopicSender<L, W> sender(final AvroTopic<
}

@Override
public boolean isConnected() {
public boolean isConnected() throws AuthenticationException {
return wrappedSender.isConnected();
}

@Override
public boolean resetConnection() {
public boolean resetConnection() throws AuthenticationException {
return wrappedSender.resetConnection();
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/radarcns/producer/KafkaSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ <L extends K, W extends V> KafkaTopicSender<L, W> sender(AvroTopic<L, W> topic)
* If the sender is no longer connected, try to reconnect.
* @return whether the connection has been restored.
*/
boolean resetConnection();
boolean resetConnection() throws AuthenticationException;

/**
* Whether the sender is connected to the Kafka system.
*/
boolean isConnected();
boolean isConnected() throws AuthenticationException;
}
48 changes: 33 additions & 15 deletions src/main/java/org/radarcns/producer/ThreadedKafkaSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,17 @@ public Thread newThread( Runnable r) {
public void run() {
opsRequests.add(1);

boolean success = sendHeartbeat();
if (success) {
state.didConnect();
} else {
logger.error("Failed to send message");
state.didDisconnect();
try {
boolean success = sendHeartbeat();
if (success) {
state.didConnect();
} else {
logger.error("Failed to send message");
state.didDisconnect();
}
} catch (AuthenticationException ex) {
logger.error("Unauthorized");
state.wasUnauthorized();
}

if (opsSent.hasAverage() && opsRequests.hasAverage()) {
Expand Down Expand Up @@ -196,6 +201,10 @@ public void run() {

if (exception == null) {
state.didConnect();
} else if (exception instanceof AuthenticationException) {
logger.error("Authentication failed");
state.wasUnauthorized();
break;
} else {
logger.error("Failed to send message");
state.didDisconnect();
Expand All @@ -207,7 +216,7 @@ public void run() {
}
}

private boolean sendHeartbeat() {
private boolean sendHeartbeat() throws AuthenticationException {
boolean success = false;
for (int i = 0; !success && i < RETRIES; i++) {
success = wrappedSender.resetConnection();
Expand All @@ -216,12 +225,14 @@ private boolean sendHeartbeat() {
}

@Override
public synchronized boolean isConnected() {
public synchronized boolean isConnected() throws AuthenticationException {
switch (state.getState()) {
case CONNECTED:
return true;
case DISCONNECTED:
return false;
case UNAUTHORIZED:
throw new AuthenticationException("Authorization invalid");
case UNKNOWN:
state.didDisconnect();
return false;
Expand All @@ -231,15 +242,22 @@ public synchronized boolean isConnected() {
}

@Override
public boolean resetConnection() {
public boolean resetConnection() throws AuthenticationException {
if (isConnected()) {
return true;
} else if (wrappedSender.resetConnection()) {
state.didConnect();
return true;
} else {
state.didDisconnect();
return false;
}

try {
if (wrappedSender.resetConnection()) {
state.didConnect();
return true;
} else {
state.didDisconnect();
return false;
}
} catch (AuthenticationException ex) {
state.wasUnauthorized();
throw ex;
}
}

Expand Down
10 changes: 9 additions & 1 deletion src/main/java/org/radarcns/producer/rest/ConnectionState.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public final class ConnectionState {

/** State symbols of the connection. */
public enum State {
CONNECTED, DISCONNECTED, UNKNOWN
CONNECTED, DISCONNECTED, UNKNOWN, UNAUTHORIZED
}

private long timeout;
Expand Down Expand Up @@ -72,6 +72,14 @@ public synchronized void didDisconnect() {
state = State.DISCONNECTED;
}

public synchronized void wasUnauthorized() {
state = State.UNAUTHORIZED;
}

public synchronized void reset() {
state = State.UNKNOWN;
}

/**
* Set the timeout after which the state will go from CONNECTED to UNKNOWN.
* @param timeout timeout
Expand Down
19 changes: 14 additions & 5 deletions src/main/java/org/radarcns/producer/rest/RestSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ private void setRestClient(RestClient newClient) {
throw new IllegalArgumentException("Schemaless topics do not have a valid URL", ex);
}
httpClient = newClient;
state.reset();
}

public final synchronized void setSchemaRetriever(SchemaRetriever retriever) {
Expand Down Expand Up @@ -179,6 +180,7 @@ public synchronized Headers getHeaders() {

public synchronized void setHeaders(Headers additionalHeaders) {
this.additionalHeaders = additionalHeaders;
this.state.reset();
}

private class RestTopicSender<L extends K, W extends V> implements KafkaTopicSender<L, W> {
Expand Down Expand Up @@ -365,35 +367,42 @@ public <L extends K, W extends V> KafkaTopicSender<L, W> sender(AvroTopic<L, W>
}

@Override
public boolean resetConnection() {
public boolean resetConnection() throws AuthenticationException {
if (state.getState() == State.CONNECTED) {
return true;
}
try (Response response = httpClient.request(getIsConnectedRequest())) {
if (response.isSuccessful()) {
state.didConnect();
return true;
} else if (response.code() == 401) {
state.wasUnauthorized();
} else {
state.didDisconnect();
String bodyString = responseBody(response);
logger.warn("Failed to make heartbeat request to {} (HTTP status code {}): {}",
httpClient, response.code(), bodyString);
return false;
}
} catch (IOException ex) {
// no stack trace is needed
state.didDisconnect();
logger.warn("Failed to make heartbeat request to {}: {}", httpClient, ex.toString());
return false;
}

if (state.getState() == State.UNAUTHORIZED) {
throw new AuthenticationException("HEAD request unauthorized");
}

return state.getState() == State.CONNECTED;
}

public boolean isConnected() {
public boolean isConnected() throws AuthenticationException {
switch (state.getState()) {
case CONNECTED:
return true;
case DISCONNECTED:
return false;
case UNAUTHORIZED:
throw new AuthenticationException("Unauthorized");
case UNKNOWN:
return resetConnection();
default:
Expand Down
33 changes: 33 additions & 0 deletions src/test/java/org/radarcns/producer/rest/RestSenderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.radarcns.data.SpecificRecordEncoder;
import org.radarcns.kafka.ObservationKey;
import org.radarcns.passive.phone.PhoneLight;
import org.radarcns.producer.AuthenticationException;
import org.radarcns.producer.KafkaTopicSender;
import org.radarcns.topic.AvroTopic;

Expand Down Expand Up @@ -187,6 +188,38 @@ public void resetConnection() throws Exception {
assertEquals("HEAD", request.getMethod());
}

@Test
public void resetConnectionUnauthorized() throws Exception {
webServer.enqueue(new MockResponse().setResponseCode(401));
try {
sender.isConnected();
fail("Authentication exception expected");
} catch (AuthenticationException ex) {
// success
}
try {
sender.isConnected();
fail("Authentication exception expected");
} catch (AuthenticationException ex) {
// success
}
webServer.enqueue(new MockResponse().setResponseCode(401));
try {
sender.resetConnection();
fail("Authentication exception expected");
} catch (AuthenticationException ex) {
assertEquals(2, webServer.getRequestCount());
// success
}
webServer.enqueue(new MockResponse().setResponseCode(200));
try {
assertTrue(sender.resetConnection());
} catch (AuthenticationException ex) {
assertEquals(3, webServer.getRequestCount());
fail("Unexpected authentication failure");
}
}

@Test
public void withCompression() throws IOException, InterruptedException {
sender.setCompression(true);
Expand Down

0 comments on commit c6d439c

Please sign in to comment.