Skip to content

Commit

Permalink
Merge pull request #24 from RADAR-CNS/v0.4.2_release
Browse files Browse the repository at this point in the history
V0.4.2 release
  • Loading branch information
blootsvoets authored Jul 20, 2017
2 parents a43961d + 0b93a53 commit 97144de
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 7 deletions.
5 changes: 5 additions & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# These owners will be the default owners for everything in the repo.
# Unless a later match takes precedence, they will be requested for review when someone
# opens a pull request.
* @blootsvoets
testing/* @nivemaham @fnobilia
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ repositories {
}
dependencies {
compile group: 'org.radarcns', name: 'radar-commons', version: '0.4.1'
compile group: 'org.radarcns', name: 'radar-commons', version: '0.4.2'
}
```

Expand All @@ -26,7 +26,7 @@ repositories {
}
dependencies {
testCompile group: 'org.radarcns', name: 'radar-commons-testing', version: '0.4.1'
testCompile group: 'org.radarcns', name: 'radar-commons-testing', version: '0.4.2'
}
```

Expand All @@ -51,7 +51,7 @@ configurations.all {
}
dependencies {
compile group: 'org.radarcns', name: 'radar-commons', version: '0.4.2-SNAPSHOT', changing: true
compile group: 'org.radarcns', name: 'radar-commons', version: '0.5-SNAPSHOT', changing: true
}
```

Expand Down
15 changes: 13 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ allprojects {
// Configuration //
//---------------------------------------------------------------------------//

version = '0.4.1'
version = '0.4.2'
group = 'org.radarcns'
ext.githubRepoName = 'RADAR-CNS/RADAR-Commons'

ext.slf4jVersion = '1.7.21'
ext.kafkaVersion = '0.10.1.1'
ext.kafkaVersion = '0.10.2.1'
ext.avroVersion = '1.8.1'
ext.confluentVersion = '3.1.2'
ext.log4jVersion = '2.7'
Expand Down Expand Up @@ -224,6 +224,17 @@ dependencies {
testImplementation group: 'org.hamcrest', name: 'hamcrest-all', version: hamcrestVersion
testImplementation group: 'com.squareup.okhttp3', name: 'mockwebserver', version: okhttpVersion

// For Topic name validation based on Kafka classes
testImplementation (group: 'org.apache.kafka', name: 'kafka_2.11', version: kafkaVersion) {
exclude group: 'org.apache.kafka', module: 'kafka-clients'
exclude group: 'net.sf.jopt-simple'
exclude group: 'com.yammer.metrics'
exclude group: 'org.scala-lang.modules'
exclude group: 'org.slf4j'
exclude group: 'com.101tec'
exclude group: 'org.apache.zookeeper'
}

codacy group: 'com.github.codacy', name: 'codacy-coverage-reporter', version: codacyVersion
}

Expand Down
5 changes: 4 additions & 1 deletion src/main/java/org/radarcns/stream/GeneralStreamGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
* method.
*/
public class GeneralStreamGroup implements StreamGroup {

public static final String OUTPUT_LABEL = "_output";

private final Map<String, StreamDefinition> topicMap;
private final Set<String> topicNames;

Expand Down Expand Up @@ -64,7 +67,7 @@ protected StreamDefinition createStream(String input, String output) {
* @return sensor stream definition
*/
protected StreamDefinition createSensorStream(String input) {
return createStream(input, input + "_output");
return createStream(input, input + OUTPUT_LABEL);
}

@Override
Expand Down
20 changes: 19 additions & 1 deletion src/main/java/org/radarcns/stream/StreamDefinition.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,20 @@
import org.radarcns.topic.KafkaTopic;

public class StreamDefinition {

public static final String FROM_LABEL = "From-";
public static final String TO_LABEL = "-To-";

private final KafkaTopic inputTopic;
private final KafkaTopic outputTopic;

/**
* Constructor. It takes in input the topic name to be consumed and to topic name where the
* related stream will write the computed values.
*
* @param input source {@link KafkaTopic}
* @param output output {@link KafkaTopic}
*/
public StreamDefinition(KafkaTopic input, KafkaTopic output) {
if (input == null || output == null) {
throw new IllegalArgumentException("Input and output topic may not be null");
Expand All @@ -38,7 +49,14 @@ public KafkaTopic getOutputTopic() {
return outputTopic;
}

/**
* Kafka Streams allows for stateful stream processing. The internal state is managed in
* so-called state stores. A fault-tolerant state store is an internally created and
* compacted changelog topic. This function return the changelog topic name.
*
* @return {@code String} representing the changelog topic name
*/
public String getStateStoreName() {
return getInputTopic().getName() + "->" + getOutputTopic().getName();
return FROM_LABEL + getInputTopic().getName() + TO_LABEL + getOutputTopic().getName();
}
}
53 changes: 53 additions & 0 deletions src/test/java/org/radarcns/stream/StreamDefinitionTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.radarcns.stream;

/*
* Copyright 2017 King's College London and The Hyve
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import static org.junit.Assert.assertEquals;

import org.apache.kafka.common.errors.InvalidTopicException;
import org.junit.Test;
import org.radarcns.topic.KafkaTopic;

public class StreamDefinitionTest {

private static final String INPUT = "android_empatica_e4_blood_volume_pulse";
private static final String OUTPUT = INPUT + GeneralStreamGroup.OUTPUT_LABEL;

@Test
public void nameValidation() {
KafkaTopic inputTopic = new KafkaTopic(INPUT);
KafkaTopic outputTopic = new KafkaTopic(OUTPUT);

StreamDefinition definition = new StreamDefinition(inputTopic, outputTopic);

kafka.common.Topic.validate(definition.getStateStoreName());

assertEquals("From-" + "android_empatica_e4_blood_volume_pulse" + "-To-" +
"android_empatica_e4_blood_volume_pulse" + "_output",
definition.getStateStoreName());
}

@Test(expected = InvalidTopicException.class)
public void faultyNameValidation() {
KafkaTopic inputTopic = new KafkaTopic(INPUT + "$");
KafkaTopic outputTopic = new KafkaTopic(OUTPUT);

StreamDefinition definition = new StreamDefinition(inputTopic, outputTopic);

kafka.common.Topic.validate(definition.getStateStoreName());
}
}

0 comments on commit 97144de

Please sign in to comment.