Skip to content

Commit

Permalink
Merge pull request #58 from RADAR-base/release-0.10.0
Browse files Browse the repository at this point in the history
Release 0.10.0
  • Loading branch information
blootsvoets authored Aug 28, 2018
2 parents 39cd67a + 9685f49 commit 2cd9200
Show file tree
Hide file tree
Showing 76 changed files with 1,642 additions and 915 deletions.
23 changes: 18 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,46 @@ repositories {
}
dependencies {
implementation group: 'org.radarcns', name: 'radar-commons', version: '0.9.0'
implementation group: 'org.radarcns', name: 'radar-commons', version: '0.10.0'
}
```

For server utilities, include `radar-commons-server`:
```gradle
repositories {
jcenter()
maven { url 'http://packages.confluent.io/maven/' }
}
dependencies {
implementation group: 'org.radarcns', name: 'radar-commons-server', version: '0.9.0'
implementation group: 'org.radarcns', name: 'radar-commons-server', version: '0.10.0'
}
```

For mocking clients of the RADAR-CNS infrastructure, use that 'radar-commons-testing' repository:

```gradle
repositories {
jcenter()
maven { url 'http://packages.confluent.io/maven/' }
maven { url 'http://dl.bintray.com/radar-cns/org.radarcns' }
}
dependencies {
testImplementation group: 'org.radarcns', name: 'radar-commons-testing', version: '0.9.0'
testImplementation group: 'org.radarcns', name: 'radar-commons-testing', version: '0.10.0'
}
```

Finally, if the schema registry is losing old schemas and your code is not recovering, include `radar-commons-unsafe`. Ensure that it comes in the classpath before any Confluent code. This will override the Confluent Avro deserializer to recover from failure when a message with unknown schema ID is passed.
```gradle
repositories {
jcenter()
maven { url 'http://packages.confluent.io/maven/' }
maven { url 'http://dl.bintray.com/radar-cns/org.radarcns' }
}
dependencies {
runtimeOnly group: 'org.radarcns', name: 'radar-commons-unsafe', version: '0.9.0'
runtimeOnly group: 'org.radarcns', name: 'radar-commons-unsafe', version: '0.10.0'
}
```

Expand All @@ -65,7 +78,7 @@ configurations.all {
}
dependencies {
compile group: 'org.radarcns', name: 'radar-commons', version: '0.8.3-SNAPSHOT', changing: true
compile group: 'org.radarcns', name: 'radar-commons', version: '0.10.1-SNAPSHOT', changing: true
}
```

Expand Down
10 changes: 5 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
plugins {
// Get bintray version
id 'com.jfrog.bintray' version '1.8.1' apply false
id 'com.jfrog.artifactory' version '4.5.4'
id 'com.jfrog.artifactory' version '4.7.5' apply false
}

subprojects {
Expand All @@ -36,14 +36,14 @@ subprojects {
// Configuration //
//---------------------------------------------------------------------------//

version = '0.9.0'
version = '0.10.0'
group = 'org.radarcns'
ext.githubRepoName = 'RADAR-CNS/RADAR-Commons'

ext.slf4jVersion = '1.7.25'
ext.kafkaVersion = '1.1.1-cp1'
ext.kafkaVersion = '2.0.0'
ext.avroVersion = '1.8.2'
ext.confluentVersion = '4.1.1'
ext.confluentVersion = '5.0.0'
ext.jacksonVersion = '2.9.6'
ext.okhttpVersion = '3.10.0'
ext.junitVersion = '4.12'
Expand Down Expand Up @@ -179,5 +179,5 @@ subprojects {
}

wrapper {
gradleVersion '4.8'
gradleVersion '4.9'
}
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.8-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-4.9-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
package org.radarcns.config;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import org.apache.avro.specific.SpecificRecord;
import org.radarcns.topic.AvroTopic;

import java.util.List;

/**
* Specifies an Avro topic.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@

package org.radarcns.producer.direct;

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.radarcns.data.Record;
import org.radarcns.data.RecordData;
import org.radarcns.producer.KafkaSender;
import org.radarcns.producer.KafkaTopicSender;
import org.radarcns.topic.AvroTopic;

import java.util.Properties;

/**
* Directly sends a message to Kafka using a KafkaProducer.
*/
Expand Down Expand Up @@ -73,8 +71,8 @@ public void send(K key, V value) {

@Override
public void send(RecordData<K, V> records) {
for (Record<K, V> record : records) {
producer.send(new ProducerRecord<>(name, record.key, record.value));
for (V record : records) {
producer.send(new ProducerRecord<>(name, records.getKey(), record));
}
producer.flush();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,21 @@

package org.radarcns.stream.collector;

import static org.radarcns.util.Serialization.floatToDouble;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import java.math.BigDecimal;
import java.util.List;
import java.util.Objects;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.specific.SpecificRecord;

import java.math.BigDecimal;
import java.util.List;
import java.util.Objects;

import static org.radarcns.util.Serialization.floatToDouble;

/**
* Java class to aggregate data using Kafka Streams. Double is the base type.
* Only the sum and sorted history are collected, other getSamples are calculated on request.
Expand Down Expand Up @@ -84,6 +83,13 @@ public NumericAggregateCollector(String fieldName, Schema schema) {
}
}

/**
* Get the non-null number type for a given field. If the tye is a union, it will use the first
* non-null type in the union.
* @param field record field to get type for.
* @return type
* @throws IllegalArgumentException if the resulting field is non-numeric.
*/
private static Type getType(Field field) {
Type apparentType = field.schema().getType();
if (apparentType == Type.UNION) {
Expand Down Expand Up @@ -245,6 +251,7 @@ public Builder fieldType(Type fieldType) {
return this;
}

/** Set minimum value. */
@JsonSetter
public Builder min(double min) {
if (min < minValue) {
Expand All @@ -253,6 +260,7 @@ public Builder min(double min) {
return this;
}

/** Set maximum value. */
@JsonSetter
public Builder max(double max) {
if (max > maxValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,9 @@ public void configure(Map<String, ?> configs, boolean isKey) {
}
}

@SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract")
@Override
public void close() {
if (schemaRetriever != null) {
schemaRetriever.close();
schemaRetriever = null;
}
// noop
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.radarcns.producer.rest.SchemaRetriever;
import org.radarcns.producer.rest.ParsedSchemaMetadata;
import org.radarcns.producer.rest.SchemaRetriever;
import org.radarcns.util.Serialization;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@

package org.radarcns.config;

import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;

import okhttp3.HttpUrl;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,15 @@
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.math.BigDecimal;
import org.junit.Before;
import org.junit.Test;
import org.radarcns.kafka.AggregateKey;
import org.radarcns.monitor.application.ApplicationRecordCounts;
import org.radarcns.passive.empatica.EmpaticaE4BloodVolumePulse;
import org.radarcns.passive.phone.PhoneBatteryLevel;

import java.io.IOException;
import java.math.BigDecimal;

/**
* Created by nivethika on 20-12-16.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package org.radarcns.stream.collector;

import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Test;

public class UniformSamplingReservoirTest {
@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import org.apache.avro.generic.GenericRecord;
import org.junit.Test;
import org.radarcns.kafka.ObservationKey;
import org.radarcns.producer.rest.SchemaRetriever;
import org.radarcns.producer.rest.ParsedSchemaMetadata;
import org.radarcns.producer.rest.SchemaRetriever;

public class KafkaAvroSerializerTest {

Expand Down
1 change: 0 additions & 1 deletion radar-commons-testing/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/

apply plugin: 'application'
apply plugin: 'com.jfrog.artifactory'

mainClassName = 'org.radarcns.mock.MockProducer'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.avro.SchemaValidationException;
import org.apache.avro.specific.SpecificRecord;
import org.radarcns.data.Record;
import org.radarcns.mock.data.RecordGenerator;
Expand All @@ -44,7 +45,7 @@ public class MockDevice<K extends SpecificRecord> extends Thread {
private final List<RecordGenerator<K>> generators;
private final K key;

private IOException exception;
private Exception exception;

/**
* Basic constructor.
Expand Down Expand Up @@ -96,7 +97,7 @@ public void run() {
for (KafkaTopicSender<K, SpecificRecord> topicSender : topicSenders) {
topicSender.close();
}
} catch (IOException e) {
} catch (SchemaValidationException | IOException e) {
synchronized (this) {
this.exception = e;
}
Expand All @@ -112,10 +113,25 @@ public void shutdown() {
}

/** Get the exception that occurred in the thread. Returns null if no exception occurred. */
public synchronized IOException getException() {
public synchronized Exception getException() {
return exception;
}

/** Check whether an exception occurred, and rethrow the exception if that is the case. */
public synchronized void checkException() throws IOException, SchemaValidationException {
if (exception != null) {
if (exception instanceof IOException) {
throw (IOException) exception;
} else if (exception instanceof SchemaValidationException) {
throw (SchemaValidationException) exception;
} else if (exception instanceof RuntimeException) {
throw (RuntimeException) exception;
} else {
throw new IllegalStateException("Unknown exception occurred", exception);
}
}
}

private int computeBaseFrequency(List<RecordGenerator<K>> generators) {
BigInteger lcm = BigInteger.ONE;
for (RecordGenerator<K> generator : generators) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@

package org.radarcns.mock;

import org.radarcns.data.AvroRecordData;
import java.io.IOException;
import org.apache.avro.SchemaValidationException;
import org.radarcns.data.Record;
import org.radarcns.mock.data.MockCsvParser;
import org.radarcns.producer.KafkaSender;
import org.radarcns.producer.KafkaTopicSender;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;

/**
* Send mock data from a CSV file.
*
Expand All @@ -47,11 +44,12 @@ public MockFileSender(KafkaSender sender, MockCsvParser parser) {
@SuppressWarnings("unchecked")
public void send() throws IOException {
try (KafkaTopicSender topicSender = sender.sender(parser.getTopic())) {
Collection<Record> records = new ArrayList<>();
while (parser.hasNext()) {
records.add(parser.next());
Record record = parser.next();
topicSender.send(record.key, record.value);
}
topicSender.send(new AvroRecordData(parser.getTopic(), records));
} catch (SchemaValidationException e) {
throw new IOException("Failed to match schemas", e);
}
}
}
Loading

0 comments on commit 2cd9200

Please sign in to comment.