Skip to content

Commit

Permalink
Merge pull request #47 from RADAR-base/release-0.8.1
Browse files Browse the repository at this point in the history
Release 0.8.1
  • Loading branch information
blootsvoets authored Mar 6, 2018
2 parents 3d2349d + a2bffbf commit 20d3508
Show file tree
Hide file tree
Showing 8 changed files with 473 additions and 84 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ repositories {
}
dependencies {
compile group: 'org.radarcns', name: 'radar-commons', version: '0.8.0'
compile group: 'org.radarcns', name: 'radar-commons', version: '0.8.1'
}
```

Expand All @@ -26,7 +26,7 @@ repositories {
}
dependencies {
testCompile group: 'org.radarcns', name: 'radar-commons-testing', version: '0.8.0'
testCompile group: 'org.radarcns', name: 'radar-commons-testing', version: '0.8.1'
}
```

Expand All @@ -51,7 +51,7 @@ configurations.all {
}
dependencies {
compile group: 'org.radarcns', name: 'radar-commons', version: '0.8.1-SNAPSHOT', changing: true
compile group: 'org.radarcns', name: 'radar-commons', version: '0.8.2-SNAPSHOT', changing: true
}
```

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ allprojects {
// Configuration //
//---------------------------------------------------------------------------//

version = '0.8.0'
version = '0.8.1'
group = 'org.radarcns'
ext.githubRepoName = 'RADAR-CNS/RADAR-Commons'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void send(RecordData<K, V> records) throws IOException {
logger.debug("Added message to topic {} -> {}",
topic, responseBody(response));
}
} else if (response.code() == 401 || response.code() == 403 || response.code() == 422) {
} else if (response.code() == 401 || response.code() == 403) {
state.wasUnauthorized();
} else if (response.code() == 415
&& Objects.equals(request.header("Accept"), KAFKA_REST_ACCEPT_ENCODING)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,44 @@

package org.radarcns.stream.collector;

import static org.radarcns.util.Serialization.floatToDouble;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.specific.SpecificRecord;

import java.math.BigDecimal;
import java.util.List;
import java.util.Objects;

import static org.radarcns.util.Serialization.floatToDouble;

/**
* Java class to aggregate data using Kafka Streams. Double is the base type.
* Only the sum and sorted history are collected, other values are calculated on request.
* Only the sum and sorted history are collected, other getSamples are calculated on request.
*/
@JsonDeserialize(builder = NumericAggregateCollector.Builder.class)
public class NumericAggregateCollector implements RecordCollector {
private final String name;
private final int pos;
private final Type fieldType;
private double min;
private double max;
private BigDecimal sum;
private final List<Double> history;
private final UniformSamplingReservoir reservoir;

@JsonCreator
public NumericAggregateCollector(
@JsonProperty("name") String name, @JsonProperty("pos") int pos,
@JsonProperty("fieldType") Type fieldType, @JsonProperty("sum") BigDecimal sum,
@JsonProperty("history") List<Double> history) {
this.name = name;
this.pos = pos;
this.fieldType = fieldType;
this.sum = sum;
this.history = new ArrayList<>(history);
public NumericAggregateCollector(Builder builder) {
this.name = builder.nameValue;
this.pos = builder.posValue;
this.fieldType = builder.fieldTypeValue;
this.min = builder.minValue;
this.max = builder.maxValue;
this.sum = builder.sumValue;
this.reservoir = builder.reservoirValue;
}

public NumericAggregateCollector(String fieldName) {
Expand All @@ -59,38 +62,44 @@ public NumericAggregateCollector(String fieldName) {

public NumericAggregateCollector(String fieldName, Schema schema) {
sum = BigDecimal.ZERO;
this.history = new ArrayList<>();
min = Double.POSITIVE_INFINITY;
max = Double.NEGATIVE_INFINITY;
reservoir = new UniformSamplingReservoir();

this.name = fieldName;
name = fieldName;
if (schema == null) {
this.pos = -1;
this.fieldType = null;
pos = -1;
fieldType = null;
} else {
Field field = schema.getField(fieldName);
if (field == null) {
throw new IllegalArgumentException(
"Field " + fieldName + " does not exist in schema " + schema.getFullName());
}
this.pos = field.pos();

Type apparentType = field.schema().getType();
if (apparentType == Type.UNION) {
for (Schema subSchema : field.schema().getTypes()) {
if (subSchema.getType() != Type.NULL) {
apparentType = subSchema.getType();
break;
}
pos = field.pos();
fieldType = getType(field);
}
}

private static Type getType(Field field) {
Type apparentType = field.schema().getType();
if (apparentType == Type.UNION) {
for (Schema subSchema : field.schema().getTypes()) {
if (subSchema.getType() != Type.NULL) {
apparentType = subSchema.getType();
break;
}
}
fieldType = apparentType;
}

if (fieldType != Type.DOUBLE
&& fieldType != Type.FLOAT
&& fieldType != Type.INT
&& fieldType != Type.LONG) {
throw new IllegalArgumentException("Field " + fieldName + " is not a number type.");
}
if (apparentType != Type.DOUBLE
&& apparentType != Type.FLOAT
&& apparentType != Type.INT
&& apparentType != Type.LONG) {
throw new IllegalArgumentException("Field " + field.name() + " is not a number type.");
}

return apparentType;
}

@Override
Expand Down Expand Up @@ -120,12 +129,12 @@ public NumericAggregateCollector add(float value) {
*/
public NumericAggregateCollector add(double value) {
sum = sum.add(BigDecimal.valueOf(value));

int index = Collections.binarySearch(history, value);
if (index >= 0) {
history.add(index, value);
} else {
history.add(-index - 1, value);
reservoir.add(value);
if (value > max) {
max = value;
}
if (value < min) {
min = value;
}

return this;
Expand All @@ -138,57 +147,33 @@ public String toString() {
+ ", min=" + getMin()
+ ", max=" + getMax()
+ ", sum=" + getSum()
+ ", count=" + getCount()
+ ", mean=" + getMean()
+ ", quartile=" + getQuartile()
+ ", history=" + history + '}';
+ ", reservoir=" + reservoir + '}';
}

public double getMin() {
return history.get(0);
return min;
}

public double getMax() {
return history.get(history.size() - 1);
return max;
}

public double getSum() {
return sum.doubleValue();
}

public int getCount() {
return history.size();
return reservoir.getCount();
}

public double getMean() {
return sum.doubleValue() / history.size();
return sum.doubleValue() / getCount();
}

public List<Double> getQuartile() {
int length = history.size();

List<Double> quartiles;
if (length == 1) {
Double elem = history.get(0);
quartiles = Arrays.asList(elem, elem, elem);
} else {
quartiles = new ArrayList<>(3);
for (int i = 1; i <= 3; i++) {
double pos = i * (length + 1) / 4.0d; // == i * 25 * (length + 1) / 100
int intPos = (int) pos;
if (intPos == 0) {
quartiles.add(history.get(0));
} else if (intPos == length) {
quartiles.add(history.get(length - 1));
} else {
double diff = pos - intPos;
double base = history.get(intPos - 1);
quartiles.add(base + diff * (history.get(intPos) - base));
}
}
}

return quartiles;
return reservoir.getQuartiles();
}

public double getInterQuartileRange() {
Expand All @@ -200,4 +185,106 @@ public double getInterQuartileRange() {
public String getName() {
return name;
}

protected UniformSamplingReservoir getReservoir() {
return reservoir;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
NumericAggregateCollector that = (NumericAggregateCollector) o;
return pos == that.pos
&& Double.compare(that.min, min) == 0
&& Double.compare(that.max, max) == 0
&& Objects.equals(name, that.name)
&& fieldType == that.fieldType
&& Objects.equals(sum, that.sum)
&& Objects.equals(reservoir, that.reservoir);
}

@Override
public int hashCode() {
return Objects.hash(name, pos, fieldType, min, max, sum, reservoir);
}

@JsonPOJOBuilder(withPrefix = "")
public static class Builder {
private double maxValue = Double.NEGATIVE_INFINITY;
private double minValue = Double.POSITIVE_INFINITY;
private final String nameValue;
private int posValue = -1;
private Type fieldTypeValue = null;
private BigDecimal sumValue = BigDecimal.ZERO;
private UniformSamplingReservoir reservoirValue = new UniformSamplingReservoir();

@JsonCreator
public Builder(@JsonProperty("name") String name) {
this.nameValue = Objects.requireNonNull(name);
}

@JsonSetter
public Builder pos(int pos) {
posValue = pos;
return this;
}

@JsonSetter
public Builder fieldType(Type fieldType) {
fieldTypeValue = fieldType;
return this;
}

@JsonSetter
public Builder min(double min) {
if (min < minValue) {
minValue = min;
}
return this;
}

@JsonSetter
public Builder max(double max) {
if (max > maxValue) {
maxValue = max;
}
return this;
}

@JsonSetter
public Builder sum(BigDecimal sum) {
sumValue = sum;
return this;
}

@JsonSetter
public Builder reservoir(UniformSamplingReservoir reservoir) {
this.reservoirValue = reservoir;
return this;
}

/**
* For backwards compatibility purposes, convert a full history to a reservoir.
* @param history stored history.
* @return the current builder.
* @deprecated use reservoir instead.
*/
@Deprecated
@JsonSetter
public Builder history(List<Double> history) {
min(history.get(0));
max(history.get(history.size() - 1));
reservoir(new UniformSamplingReservoir(history));
return this;
}

public NumericAggregateCollector build() {
return new NumericAggregateCollector(this);
}
}
}
Loading

0 comments on commit 20d3508

Please sign in to comment.