diff --git a/README.md b/README.md index dcd8d58b..bd9e116f 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ repositories { } dependencies { - compile group: 'org.radarcns', name: 'radar-commons', version: '0.3' + compile group: 'org.radarcns', name: 'radar-commons', version: '0.4' } ``` @@ -26,7 +26,7 @@ repositories { } dependencies { - testCompile group: 'org.radarcns', name: 'radar-commons-testing', version: '0.3' + testCompile group: 'org.radarcns', name: 'radar-commons-testing', version: '0.4' } ``` diff --git a/build.gradle b/build.gradle index cf2bb635..a9101b19 100644 --- a/build.gradle +++ b/build.gradle @@ -34,7 +34,7 @@ allprojects { // Configuration // //---------------------------------------------------------------------------// - version = '0.3' + version = '0.4' group = 'org.radarcns' ext.githubRepoName = 'RADAR-CNS/RADAR-Commons' diff --git a/src/main/java/org/radarcns/producer/rest/RestClient.java b/src/main/java/org/radarcns/producer/rest/RestClient.java index 428a3551..edac7055 100644 --- a/src/main/java/org/radarcns/producer/rest/RestClient.java +++ b/src/main/java/org/radarcns/producer/rest/RestClient.java @@ -37,12 +37,13 @@ import okhttp3.Response; import org.radarcns.config.ServerConfig; -/** REST client using OkHttp3. */ +/** REST client using OkHttp3. This class is not thread-safe. */ public class RestClient implements Closeable { private final long timeout; private final ServerConfig config; private final OkHttpClient httpClient; private final ManagedConnectionPool connectionPool; + private boolean isClosed; /** * REST client. This client will use the OkHttp3 default connection pool and 30 second timeout. @@ -68,6 +69,7 @@ public RestClient(ServerConfig config, long connectionTimeout, this.config = config; this.timeout = connectionTimeout; this.connectionPool = connectionPool; + this.isClosed = false; OkHttpClient.Builder builder; if (config.isUnsafe()) { @@ -247,6 +249,10 @@ public String toString() { @Override public void close() { + if (isClosed) { + return; + } + isClosed = true; if (connectionPool != null) { connectionPool.release(); } diff --git a/src/main/java/org/radarcns/util/serde/AbstractKafkaAvroSerde.java b/src/main/java/org/radarcns/util/serde/AbstractKafkaAvroSerde.java index daf9dccd..63919df1 100644 --- a/src/main/java/org/radarcns/util/serde/AbstractKafkaAvroSerde.java +++ b/src/main/java/org/radarcns/util/serde/AbstractKafkaAvroSerde.java @@ -60,6 +60,7 @@ public void configure(Map configs, boolean isKey) { public void close() { if (schemaRetriever != null) { schemaRetriever.close(); + schemaRetriever = null; } } } diff --git a/src/test/java/org/radarcns/stream/collector/DoubleArrayCollectorTest.java b/src/test/java/org/radarcns/stream/collector/DoubleArrayCollectorTest.java new file mode 100644 index 00000000..7eae97a9 --- /dev/null +++ b/src/test/java/org/radarcns/stream/collector/DoubleArrayCollectorTest.java @@ -0,0 +1,43 @@ +/* + * Copyright 2017 King's College London and The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.radarcns.stream.collector; + +import static org.junit.Assert.assertEquals; + +import org.junit.Before; +import org.junit.Test; + +/** + * Created by nivethika on 20-12-16. + */ +public class DoubleArrayCollectorTest { + + private DoubleArrayCollector arrayCollector; + + @Before + public void setUp() { + this.arrayCollector = new DoubleArrayCollector(); + } + + @Test + public void add() { + double[] arrayvalues = {0.15d, 1.0d, 2.0d, 3.0d}; + this.arrayCollector.add(arrayvalues); + + assertEquals("[DoubleValueCollector{min=0.15, max=0.15, sum=0.15, count=1.0, avg=0.15, quartile=[0.15, 0.15, 0.15], iqr=0.0, history=[0.15]}, DoubleValueCollector{min=1.0, max=1.0, sum=1.0, count=1.0, avg=1.0, quartile=[1.0, 1.0, 1.0], iqr=0.0, history=[1.0]}, DoubleValueCollector{min=2.0, max=2.0, sum=2.0, count=1.0, avg=2.0, quartile=[2.0, 2.0, 2.0], iqr=0.0, history=[2.0]}, DoubleValueCollector{min=3.0, max=3.0, sum=3.0, count=1.0, avg=3.0, quartile=[3.0, 3.0, 3.0], iqr=0.0, history=[3.0]}]" , this.arrayCollector.toString()); + } +} diff --git a/src/test/java/org/radarcns/stream/collector/DoubleValueCollectorTest.java b/src/test/java/org/radarcns/stream/collector/DoubleValueCollectorTest.java new file mode 100644 index 00000000..7ce1d9e6 --- /dev/null +++ b/src/test/java/org/radarcns/stream/collector/DoubleValueCollectorTest.java @@ -0,0 +1,100 @@ +/* + * Copyright 2017 King's College London and The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.radarcns.stream.collector; + +import static org.junit.Assert.assertEquals; + +import org.junit.Before; +import org.junit.Test; + +/** + * Created by nivethika on 20-12-16. + */ +public class DoubleValueCollectorTest { + + private DoubleValueCollector valueCollector ; + + @Before + public void setUp() { + this.valueCollector = new DoubleValueCollector(); + } + + @Test + public void add() { + valueCollector.add(10.0d); + assertEquals(10.0d, valueCollector.getMin(), 0.0d); + assertEquals(10.0d, valueCollector.getMax(), 0.0d); + assertEquals(10.0d, valueCollector.getSum(), 0.0d); + assertEquals(10.0d, valueCollector.getAvg(), 0.0d); + assertEquals(0.0d, valueCollector.getIqr(), 0.0d); + assertEquals(1, valueCollector.getCount(),0); + + valueCollector.add(15.100d); + assertEquals(10.0d, valueCollector.getMin(), 0.0d); + assertEquals(15.100d, valueCollector.getMax(), 0.0d); + assertEquals(25.100d, valueCollector.getSum(), 0.0d); + assertEquals(12.550d, valueCollector.getAvg(), 0.0d); + assertEquals(5.1, valueCollector.getIqr(), 0.0d); + assertEquals(2, valueCollector.getCount(),0); + + valueCollector.add(28.100d); + assertEquals(18.1d, valueCollector.getIqr(), 0.0d); + + } + + @Test + public void addFloat() { + valueCollector.add(10.0234f); + assertEquals(10.0234d, valueCollector.getMin(), 0.0d); + assertEquals(10.0234d, valueCollector.getMax(), 0.0d); + assertEquals(10.0234d, valueCollector.getSum(), 0.0d); + assertEquals(10.0234d, valueCollector.getAvg(), 0.0d); + assertEquals(0.0d, valueCollector.getIqr(), 0.0d); + assertEquals(1, valueCollector.getCount(),0); + + valueCollector.add(15.0d); + assertEquals(10.0234d, valueCollector.getMin(), 0.0d); + assertEquals(15.0d, valueCollector.getMax(), 0.0d); + assertEquals(25.0234d, valueCollector.getSum(), 0.0d); + assertEquals(12.5117d, valueCollector.getAvg(), 0.0d); + assertEquals(4.9766d, valueCollector.getIqr(), 0.0d); + assertEquals(2, valueCollector.getCount(),0); + + valueCollector.add(28.100d); + assertEquals(18.0766d, valueCollector.getIqr(), 0.0d); + + } + + @Test + public void testAverage() { + double[] input = {36.793899922141186, 36.878288191353626, 36.965575690177715, 36.988087035729855, 36.628622572158214}; + for (double d : input) { + valueCollector.add(d); + } + assertEquals(36.850894682312116, valueCollector.getAvg(), 0); + } + + @Test + public void testAverageFloat() { + double[] input = {36.793899922141186, 36.878288191353626, 36.965575690177715, 36.988087035729855, 36.628622572158214}; + for (double d : input) { + valueCollector.add((float)d); + } + // converting to float will give a lower number of decimals on the double result + assertEquals(36.8508954, valueCollector.getAvg(), 0); + } +} diff --git a/testing/build.gradle b/testing/build.gradle index 445fdf0e..8fa8d783 100644 --- a/testing/build.gradle +++ b/testing/build.gradle @@ -29,8 +29,8 @@ run { ext.testingName = 'radar-commons-testing' ext.description = 'RADAR Common testing library mocking code and utilities.' -targetCompatibility = 1.7 -sourceCompatibility = 1.7 +targetCompatibility = '1.7' +sourceCompatibility = '1.7' dependencies { api rootProject diff --git a/testing/src/main/java/org/radarcns/integration/aggregator/MockAggregator.java b/testing/src/main/java/org/radarcns/integration/aggregator/MockAggregator.java deleted file mode 100644 index 5a668851..00000000 --- a/testing/src/main/java/org/radarcns/integration/aggregator/MockAggregator.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright 2017 The Hyve and King's College London - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.radarcns.integration.aggregator; - -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.radarcns.integration.model.ExpectedArrayValue; -import org.radarcns.integration.model.ExpectedDoubleValue; -import org.radarcns.integration.model.ExpectedValue; -import org.radarcns.integration.model.MockConfigToCsvParser; -import org.radarcns.integration.model.MockRecord; -import org.radarcns.mock.MockDataConfig; - -/** - * The MockAggregator simulates the behaviour of a Kafka Streams application based on time window. - * It supported accumulators are - */ -public final class MockAggregator { - /** - * Default constructor. - */ - private MockAggregator() {} - - /** - * @param parser class that reads a CVS file line by line returning an {@code Map} . - * @return {@code ExpectedArrayValue} the simulated results computed using the input parser. - * {@link org.radarcns.integration.model.ExpectedArrayValue} - **/ - public static ExpectedArrayValue simulateArrayCollector(MockConfigToCsvParser parser) - throws IOException, IllegalAccessException, InstantiationException { - - MockRecord.DoubleArrayType record = parser.nextDoubleArrayRecord(); - ExpectedArrayValue eav = new ExpectedArrayValue(record.getKey()); - - while (record != null) { - eav.add(record.getTimeWindow(10_000), - record.getTimeMillis(), - record.getValues()); - - record = parser.nextDoubleArrayRecord(); - } - - return eav; - } - - /** - * @param parser class that reads a CSV file line by line returning an {@code HashMap}. - * @return {@code ExpectedDoubleValue} the simulated results computed using the input parser. - * {@link ExpectedDoubleValue} - **/ - public static ExpectedDoubleValue simulateSingletonCollector(MockConfigToCsvParser parser) - throws IOException, IllegalAccessException, InstantiationException { - - MockRecord.DoubleType record = parser.nextDoubleRecord(); - ExpectedDoubleValue edv = new ExpectedDoubleValue(record.getKey()); - - while (record != null) { - edv.add(record.getTimeWindow(10_000), - record.getTimeMillis(), - record.getValue()); - - record = parser.nextDoubleRecord(); - } - - return edv; - } - - /** - * Given a list of configurations, it simulates all of them that has - * double as expected type. - * - * @param configs list containing all configurations that have to be tested. - * @return {@code Map} of key {@code MockDataConfig} and value {@code ExpectedValue}. {@link - * ExpectedDoubleValue}. - **/ - public static Map simulateSingleton(List configs) - throws ClassNotFoundException, NoSuchMethodException, IOException, IllegalAccessException, - InvocationTargetException, InstantiationException { - Map expectedOutput = new HashMap<>(); - - for (MockDataConfig config : configs) { - MockConfigToCsvParser parser = new MockConfigToCsvParser(config); - - if (config.getValueFields() != null && config.getValueFields().size() == 1) { - expectedOutput.put(config, - MockAggregator.simulateSingletonCollector(parser)); - } - } - - return expectedOutput; - } - - /** - * Given a list of configurations, it simulates all of them that has - * array as expected type. - * - * @param configs list containing all configurations that have to be tested. - * @return {@code Map} of key {@code MockDataConfig} and value {@code ExpectedValue}. {@link - * ExpectedDoubleValue}. - **/ - public static Map simulateArray(List configs) - throws ClassNotFoundException, NoSuchMethodException, IOException, IllegalAccessException, - InvocationTargetException, InstantiationException { - Map exepctedValue = new HashMap<>(); - - for (MockDataConfig config : configs) { - try (MockConfigToCsvParser parser = new MockConfigToCsvParser(config)) { - if (config.getValueFields() != null && config.getValueFields().size() > 1) { - exepctedValue.put(config, MockAggregator.simulateArrayCollector(parser)); - } - } - } - - return exepctedValue; - } - - /** - * Simulates all possible test case scenarios configured in mock-configuration. - * - * @return {@code Map} of key {@code MockDataConfig} and value {@code ExpectedValue}. {@link - * ExpectedDoubleValue}. - **/ - public static Map getSimulations( - List mockDataConfigs) - throws ClassNotFoundException, NoSuchMethodException, IOException, IllegalAccessException, - InvocationTargetException, InstantiationException { - Map map = new HashMap<>(); - map.putAll(simulateSingleton(mockDataConfigs)); - map.putAll(simulateArray(mockDataConfigs)); - - return map; - } -} \ No newline at end of file diff --git a/testing/src/main/java/org/radarcns/integration/model/ExpectedArrayValue.java b/testing/src/main/java/org/radarcns/integration/model/ExpectedArrayValue.java deleted file mode 100644 index 1815ae8b..00000000 --- a/testing/src/main/java/org/radarcns/integration/model/ExpectedArrayValue.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2017 The Hyve and King's College London - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.radarcns.integration.model; - -import org.radarcns.stream.collector.DoubleArrayCollector; -import org.radarcns.key.MeasurementKey; - -/** - * {@code ExpectedValue} represented as {@code Double[]}. - * - * {@link ExpectedValue} - */ -public class ExpectedArrayValue extends ExpectedValue { - /** - * Constructor. - **/ - public ExpectedArrayValue(MeasurementKey key) - throws InstantiationException, IllegalAccessException { - super(key, DoubleArrayCollector.class); - } - - /** - * It adds a new value the simulation taking into account if it belongs to an existing time - * window or not. - * - * @param startTimeWindow timeZero for a time window that has this sample as initil value - * @param timestamp time associated with the value - * @param array sample value - **/ - public void add(long startTimeWindow, long timestamp, double[] array) { - if (timestamp < lastTimestamp + DURATION) { - lastValue.add(array); - } else { - lastTimestamp = startTimeWindow; - lastValue = new DoubleArrayCollector(); - lastValue.add(array); - getSeries().put(startTimeWindow, lastValue); - } - } -} diff --git a/testing/src/main/java/org/radarcns/integration/model/ExpectedDoubleValue.java b/testing/src/main/java/org/radarcns/integration/model/ExpectedDoubleValue.java deleted file mode 100644 index 805f5a5b..00000000 --- a/testing/src/main/java/org/radarcns/integration/model/ExpectedDoubleValue.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2017 The Hyve and King's College London - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.radarcns.integration.model; - -import org.radarcns.stream.collector.DoubleValueCollector; -import org.radarcns.key.MeasurementKey; - -/** - * {@code ExpectedValue} represented as {@code Double}. - * - * {@link ExpectedValue} - */ -public class ExpectedDoubleValue extends ExpectedValue { - /** - * Constructor. - **/ - public ExpectedDoubleValue(MeasurementKey key) - throws InstantiationException, IllegalAccessException { - super(key, DoubleValueCollector.class); - } - - /** - * It adds a new value the simulation taking into account if it belongs to an existing time - * window or not. - * - * @param startTimeWindow timeZero for a time window that has this sample as initil value - * @param timestamp time associated with the value - * @param value sample value - **/ - public void add(long startTimeWindow, long timestamp, double value) { - if (timestamp < lastTimestamp + DURATION) { - lastValue.add(value); - } else { - lastTimestamp = startTimeWindow; - lastValue = new DoubleValueCollector(); - lastValue.add(value); - getSeries().put(startTimeWindow, lastValue); - } - } -} diff --git a/testing/src/main/java/org/radarcns/integration/model/ExpectedValue.java b/testing/src/main/java/org/radarcns/integration/model/ExpectedValue.java deleted file mode 100644 index 2183cafd..00000000 --- a/testing/src/main/java/org/radarcns/integration/model/ExpectedValue.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright 2017 The Hyve and King's College London - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.radarcns.integration.model; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import org.radarcns.key.MeasurementKey; - -/** - * It computes the expected value for a test case. - */ -public class ExpectedValue { - - public enum StatType { - MINIMUM, - MAXIMUM, - SUM, - QUARTILES, - MEDIAN, - INTERQUARTILE_RANGE, - COUNT, - AVERAGE - } - - //Timewindow length in milliseconds - @SuppressWarnings({"checkstyle:AbbreviationAsWordInName", "checkstyle:MemberName"}) - public static final long DURATION = TimeUnit.SECONDS.toMillis(10); - - protected Long lastTimestamp; - protected V lastValue; - private final Map series; - private final MeasurementKey key; - - /** - * Constructor. - **/ - public ExpectedValue(MeasurementKey key, Class valueClass) - throws IllegalAccessException, InstantiationException { - series = new HashMap<>(); - - this.key = key; - lastTimestamp = 0L; - - lastValue = valueClass.newInstance(); - } - - public Map getSeries() { - return series; - } - - public MeasurementKey getKey() { - return key; - } -} diff --git a/testing/src/main/java/org/radarcns/integration/model/MockConfigToCsvParser.java b/testing/src/main/java/org/radarcns/integration/model/MockConfigToCsvParser.java deleted file mode 100644 index f7356bf3..00000000 --- a/testing/src/main/java/org/radarcns/integration/model/MockConfigToCsvParser.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright 2017 The Hyve and King's College London - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.radarcns.integration.model; - -import java.io.Closeable; -import java.io.File; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import org.apache.avro.Schema; -import org.apache.avro.specific.SpecificRecord; -import org.radarcns.data.Record; -import org.radarcns.key.MeasurementKey; -import org.radarcns.mock.MockDataConfig; -import org.radarcns.mock.MockFile; - -/** - * Starting from a CSV file, this parser generates a map containing all available fields. - * The value field can contain either a Double or an array of Doubles. - */ -public class MockConfigToCsvParser implements Closeable { - private final MockFile mockFile; - private final int timeReceivedPos; - private final int[] valuePos; - - /** - * Constructor that initialises the {@code CSVReader} and computes the {@code ExpectedType}. - * - * @param config containing the CSV file path that has to be parsed - **/ - public MockConfigToCsvParser(MockDataConfig config) - throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, - IllegalAccessException, IOException { - this.mockFile = new MockFile<>(new File(config.getAbsoluteDataFile()), config); - Schema valueSchema = mockFile.getTopic().getValueSchema(); - timeReceivedPos = valueSchema.getField("timeReceived").pos(); - - valuePos = new int[config.getValueFields().size()]; - for (int i = 0; i < valuePos.length; i++) { - valuePos[i] = valueSchema.getField(config.getValueFields().get(i)).pos(); - } - } - - /** - * Record computed by the next available - * raw of CSV file. - **/ - public Record next() throws IOException { - if (!mockFile.hasNext()) { - return null; - } - return mockFile.next(); - } - - /** - * Double type Record computed by the next available - * raw of CSV file. - */ - public MockRecord.DoubleType nextDoubleRecord() throws IOException { - Record record = next(); - if (record == null) { - return null; - } - MockRecord.DoubleType result = new MockRecord.DoubleType(); - result.setKey(record.key); - result.setTime((Double)record.value.get(timeReceivedPos)); - result.setValue(((Number)record.value.get(valuePos[0])).doubleValue()); - return result; - } - - /** - * Double array type Record computed by the next available - * raw of CSV file. - */ - public MockRecord.DoubleArrayType nextDoubleArrayRecord() throws IOException { - Record record = next(); - if (record == null) { - return null; - } - MockRecord.DoubleArrayType result = new MockRecord.DoubleArrayType(); - result.setKey(record.key); - result.setTime((Double)record.value.get(timeReceivedPos)); - double[] values = new double[valuePos.length]; - for (int i = 0; i < valuePos.length; i++) { - values[i] = ((Number)record.value.get(valuePos[i])).doubleValue(); - } - result.setValues(values); - return result; - } - - /** - * Close the {@code MockConfigToCsvParser} closing the CSV reader. - **/ - public void close() throws IOException { - mockFile.close(); - } -} \ No newline at end of file diff --git a/testing/src/main/java/org/radarcns/integration/model/MockRecord.java b/testing/src/main/java/org/radarcns/integration/model/MockRecord.java deleted file mode 100644 index 719642c5..00000000 --- a/testing/src/main/java/org/radarcns/integration/model/MockRecord.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright 2017 The Hyve and King's College London - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.radarcns.integration.model; - -import org.radarcns.key.MeasurementKey; - -public class MockRecord { - private MeasurementKey key; - private double time = Double.NaN; - - public long getTimeMillis() { - return (long)(time * 1000d); - } - - public long getTimeWindow(long intervalMillis) { - long ms = getTimeMillis(); - return ms - (ms % intervalMillis); - } - - public MeasurementKey getKey() { - return key; - } - - public void setKey(MeasurementKey key) { - this.key = key; - } - - public void setTime(double time) { - this.time = time; - } - - public static class DoubleType extends MockRecord { - private double value; - - public double getValue() { - return value; - } - - public void setValue(double value) { - this.value = value; - } - } - - public static class DoubleArrayType extends MockRecord { - private double[] values; - - public double[] getValues() { - return values; - } - - public void setValues(double[] values) { - this.values = values; - } - } -} diff --git a/testing/src/main/java/org/radarcns/mock/MockDevice.java b/testing/src/main/java/org/radarcns/mock/MockDevice.java index 5b5102e7..976f48b3 100644 --- a/testing/src/main/java/org/radarcns/mock/MockDevice.java +++ b/testing/src/main/java/org/radarcns/mock/MockDevice.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.avro.specific.SpecificRecord; import org.radarcns.data.Record; +import org.radarcns.mock.data.RecordGenerator; import org.radarcns.producer.KafkaSender; import org.radarcns.producer.KafkaTopicSender; import org.radarcns.util.Oscilloscope; @@ -75,20 +76,27 @@ public void run() { } Oscilloscope oscilloscope = new Oscilloscope(baseFrequency); - while (!stopping.get()) { - // The time keeping is regulated with beats, with baseFrequency beats per second. - int beat = oscilloscope.beat(); + try { + while (!stopping.get()) { + // The time keeping is regulated with beats, with baseFrequency beats per + // second. + int beat = oscilloscope.beat(); - for (int i = 0; i < generators.size(); i++) { - int frequency = generators.get(i).getConfig().getFrequency(); - if (frequency > 0 && beat % (baseFrequency / frequency) == 0) { - Record record = recordIterators.get(i).next(); - topicSenders.get(i).send(record.offset, record.key, record.value); + for (int i = 0; i < generators.size(); i++) { + int frequency = generators.get(i).getConfig().getFrequency(); + if (frequency > 0 && beat % (baseFrequency / frequency) == 0) { + Record record = recordIterators.get(i).next(); + topicSenders.get(i).send(record.offset, record.key, record.value); + } } } + } catch (InterruptedException ex) { + // do nothing, just exit the loop + } + + for (KafkaTopicSender topicSender : topicSenders) { + topicSender.close(); } - } catch (InterruptedException ex) { - // do nothing, just exit the loop } catch (IOException e) { synchronized (this) { this.exception = e; diff --git a/testing/src/main/java/org/radarcns/mock/MockFileSender.java b/testing/src/main/java/org/radarcns/mock/MockFileSender.java index 88066cb7..1a38160d 100644 --- a/testing/src/main/java/org/radarcns/mock/MockFileSender.java +++ b/testing/src/main/java/org/radarcns/mock/MockFileSender.java @@ -20,6 +20,7 @@ import org.apache.avro.specific.SpecificRecord; import org.radarcns.data.Record; import org.radarcns.key.MeasurementKey; +import org.radarcns.mock.data.MockCsvParser; import org.radarcns.producer.KafkaSender; import org.radarcns.producer.KafkaTopicSender; @@ -30,10 +31,10 @@ */ public class MockFileSender { private final KafkaSender sender; - private final MockFile parser; + private final MockCsvParser parser; public MockFileSender(KafkaSender sender, - MockFile parser) { + MockCsvParser parser) { this.parser = parser; this.sender = sender; } diff --git a/testing/src/main/java/org/radarcns/mock/MockProducer.java b/testing/src/main/java/org/radarcns/mock/MockProducer.java index d04b8451..529442de 100644 --- a/testing/src/main/java/org/radarcns/mock/MockProducer.java +++ b/testing/src/main/java/org/radarcns/mock/MockProducer.java @@ -24,8 +24,6 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.InvocationTargetException; -import java.security.KeyManagementException; -import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -43,6 +41,10 @@ import org.radarcns.empatica.EmpaticaE4InterBeatInterval; import org.radarcns.empatica.EmpaticaE4Temperature; import org.radarcns.key.MeasurementKey; +import org.radarcns.mock.config.BasicMockConfig; +import org.radarcns.mock.config.MockDataConfig; +import org.radarcns.mock.data.MockCsvParser; +import org.radarcns.mock.data.RecordGenerator; import org.radarcns.producer.KafkaSender; import org.radarcns.producer.SchemaRetriever; import org.radarcns.producer.direct.DirectSender; @@ -65,76 +67,85 @@ public class MockProducer { private final List> devices; private final List files; private final List> senders; + private final SchemaRetriever retriever; + + /** + * MockProducer with files from current directory. The data root directory will be the current + * directory. + * @param mockConfig configuration to mock + * @throws IOException if the data could not be read or sent + */ + public MockProducer(BasicMockConfig mockConfig) throws IOException { + this(mockConfig, null); + } /** * Basic constructor. - * @param mockConfig configuration to mock. + * @param mockConfig configuration to mock + * @param root root directory of where mock files are located * @throws IOException if data could not be sent */ - public MockProducer(BasicMockConfig mockConfig) throws IOException { - int numDevices; - if (mockConfig.getData() != null) { - numDevices = mockConfig.getData().size(); - } else if (mockConfig.getNumberOfDevices() != 0) { - numDevices = mockConfig.getNumberOfDevices(); - } else { - throw new IllegalArgumentException( - "Error simulating mock device setup. Please provide data or number_of_devices"); - } + public MockProducer(BasicMockConfig mockConfig, File root) throws IOException { + int numDevices = mockConfig.getNumberOfDevices(); - try { - senders = createSenders(mockConfig, numDevices); - } catch (KeyManagementException | NoSuchAlgorithmException ex) { - logger.error("Sender cannot be created.", ex); - throw new IOException(ex); - } + retriever = new SchemaRetriever(mockConfig.getSchemaRegistry(), 10); + List> tmpSenders = null; - devices = new ArrayList<>(numDevices); - files = new ArrayList<>(numDevices); + try { + devices = new ArrayList<>(numDevices); + files = new ArrayList<>(numDevices); - String userId = "UserID_"; - String sourceId = "SourceID_"; + List dataConfigs = mockConfig.getData(); + if (dataConfigs == null) { + dataConfigs = defaultDataConfig(); + } - if (mockConfig.getData() == null) { List> generators; + List> mockFiles; try { - generators = createGenerators(defaultDataConfig()); + generators = createGenerators(dataConfigs); + mockFiles = createMockFiles(dataConfigs, root); } catch (NoSuchMethodException | IllegalAccessException | ClassNotFoundException | InvocationTargetException ex) { - throw new IllegalStateException("Default configuration invalid", ex); + throw new IllegalStateException("Configuration invalid", ex); } - for (int i = 0; i < numDevices; i++) { - MeasurementKey key = new MeasurementKey(userId + i, sourceId + i); - devices.add(new MockDevice<>(senders.get(i), key, generators)); - } - } else { - try { + tmpSenders = createSenders(mockConfig, numDevices + mockFiles.size()); + + if (!generators.isEmpty()) { + String userId = "UserID_"; + String sourceId = "SourceID_"; + for (int i = 0; i < numDevices; i++) { - File mockFile = new File(mockConfig.getData().get(i).getAbsoluteDataFile()); - MockFile parser = new MockFile( - mockFile, mockConfig.getData().get(i)); - files.add(new MockFileSender(senders.get(i), parser)); + MeasurementKey key = new MeasurementKey(userId + i, sourceId + i); + devices.add(new MockDevice<>(tmpSenders.get(i), key, generators)); + } + } + + for (int i = 0; i < mockFiles.size(); i++) { + files.add(new MockFileSender(tmpSenders.get(i + numDevices), mockFiles.get(i))); + } + } catch (Exception ex) { + if (tmpSenders != null) { + for (KafkaSender sender : tmpSenders) { + sender.close(); } - } catch (NoSuchMethodException | IllegalAccessException - | InvocationTargetException | ClassNotFoundException ex) { - throw new IOException("Cannot instantiate mock file", ex); } + retriever.close(); + throw ex; } + + senders = tmpSenders; } private List> createSenders( - BasicMockConfig mockConfig, int numDevices) - throws KeyManagementException, NoSuchAlgorithmException { - try (SchemaRetriever retriever = new SchemaRetriever( - mockConfig.getSchemaRegistry(), 10)) { - - if (mockConfig.isDirectProducer()) { - return createDirectSenders(numDevices, retriever, mockConfig.getBrokerPaths()); - } else { - return createRestSenders(numDevices, retriever, mockConfig.getRestProxy(), - mockConfig.hasCompression()); - } + BasicMockConfig mockConfig, int numDevices) { + + if (mockConfig.isDirectProducer()) { + return createDirectSenders(numDevices, retriever, mockConfig.getBrokerPaths()); + } else { + return createRestSenders(numDevices, retriever, mockConfig.getRestProxy(), + mockConfig.hasCompression()); } } @@ -206,6 +217,7 @@ public void shutdown() throws IOException, InterruptedException { for (KafkaSender sender : senders) { sender.close(); } + retriever.close(); for (MockDevice device : devices) { if (device.getException() != null) { @@ -223,7 +235,7 @@ public static void main(String[] args) { System.exit(1); } - File mockFile = new File(args[0]); + File mockFile = new File(args[0]).getAbsoluteFile(); BasicMockConfig config = null; try { config = new YamlConfigLoader().load(mockFile, BasicMockConfig.class); @@ -233,7 +245,7 @@ public static void main(String[] args) { } try { - MockProducer producer = new MockProducer(config); + MockProducer producer = new MockProducer(config, mockFile.getParentFile()); producer.start(); waitForProducer(producer, config.getDuration()); } catch (IllegalArgumentException ex) { @@ -339,7 +351,30 @@ private List> createGenerators(List> result = new ArrayList<>(configs.size()); for (MockDataConfig config : configs) { - result.add(new RecordGenerator<>(config, MeasurementKey.class)); + if (config.getDataFile() == null) { + result.add(new RecordGenerator<>(config, MeasurementKey.class)); + } + } + + return result; + } + + private List> createMockFiles(List configs, + File dataRoot) + throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, + InvocationTargetException, IOException { + + List> result = new ArrayList<>(configs.size()); + + File parent = dataRoot; + if (parent == null) { + parent = new File(".").getAbsoluteFile(); + } + + for (MockDataConfig config : configs) { + if (config.getDataFile() != null) { + result.add(new MockCsvParser(config, parent)); + } } return result; diff --git a/testing/src/main/java/org/radarcns/mock/BasicMockConfig.java b/testing/src/main/java/org/radarcns/mock/config/BasicMockConfig.java similarity index 98% rename from testing/src/main/java/org/radarcns/mock/BasicMockConfig.java rename to testing/src/main/java/org/radarcns/mock/config/BasicMockConfig.java index ce366ea1..c28998b2 100644 --- a/testing/src/main/java/org/radarcns/mock/BasicMockConfig.java +++ b/testing/src/main/java/org/radarcns/mock/config/BasicMockConfig.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.radarcns.mock; +package org.radarcns.mock.config; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; diff --git a/testing/src/main/java/org/radarcns/mock/MockDataConfig.java b/testing/src/main/java/org/radarcns/mock/config/MockDataConfig.java similarity index 93% rename from testing/src/main/java/org/radarcns/mock/MockDataConfig.java rename to testing/src/main/java/org/radarcns/mock/config/MockDataConfig.java index 424f8a1e..042f4208 100644 --- a/testing/src/main/java/org/radarcns/mock/MockDataConfig.java +++ b/testing/src/main/java/org/radarcns/mock/config/MockDataConfig.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.radarcns.mock; +package org.radarcns.mock.config; import com.fasterxml.jackson.annotation.JsonProperty; import java.io.File; @@ -48,8 +48,8 @@ public class MockDataConfig { @JsonProperty("maximum_difference") private double maximumDifference = 1e-10d; - private double minimum = Double.NEGATIVE_INFINITY; - private double maximum = Double.POSITIVE_INFINITY; + private double minimum = -1e5; + private double maximum = 1e5; public String getTopic() { return topic; @@ -109,15 +109,19 @@ public void setValueSchema(String valueSchema) { } /** - * Get the data file associated with this definition, relative to given configuration file. + * Get the data file associated with this definition, relative to given directory. * If the data file is specified as an absolute path, then this will return that path. + * + * @param root directory the data file is relative to. + * @return absolute path to the data file + * @throws NullPointerException if root is null */ - public File getDataFile(File configFile) { + public File getDataFile(File root) { File directDataFile = new File(dataFile); if (directDataFile.isAbsolute()) { return directDataFile; } else { - File absoluteFile = new File(configFile.getParentFile(), dataFile); + File absoluteFile = new File(root, dataFile); this.absolutePath = absoluteFile.getAbsolutePath(); return absoluteFile; } diff --git a/testing/src/main/java/org/radarcns/mock/CsvGenerator.java b/testing/src/main/java/org/radarcns/mock/data/CsvGenerator.java similarity index 55% rename from testing/src/main/java/org/radarcns/mock/CsvGenerator.java rename to testing/src/main/java/org/radarcns/mock/data/CsvGenerator.java index 1b2fdca3..2f968769 100644 --- a/testing/src/main/java/org/radarcns/mock/CsvGenerator.java +++ b/testing/src/main/java/org/radarcns/mock/data/CsvGenerator.java @@ -14,12 +14,13 @@ * limitations under the License. */ -package org.radarcns.mock; +package org.radarcns.mock.data; import java.io.File; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import org.radarcns.key.MeasurementKey; +import org.radarcns.mock.config.MockDataConfig; import org.radarcns.util.CsvWriter; /** @@ -28,29 +29,39 @@ */ public final class CsvGenerator { /** - * Generates new CSV file to simulation a single user with a single device as longs as seconds. + * Generates new CSV file to simulation a single user with a single device. * * @param config properties containing metadata to generate data - * @param duration simulation duration expressed in seconds - * @param parentFile of csv file to be generate - * @throws IOException in case configuration file cannot be retrieved + * @param duration simulation duration expressed in milliseconds + * @param root directory relative to which the output csv file is generated + * @throws IOException if the CSV file cannot be written to */ - public void generate(MockDataConfig config, long duration, File parentFile) + public void generate(MockDataConfig config, long duration, File root) throws IOException { - File file = config.getDataFile(parentFile); + File file = config.getDataFile(root); try { - RecordGenerator generator = new RecordGenerator<>( - config, MeasurementKey.class); - - MeasurementKey key = new MeasurementKey("UserID_0", "SourceID_0"); - - try (CsvWriter writer = new CsvWriter(file, generator.getHeader())) { - writer.writeRows(generator.iterateRawValues(key, duration)); - } + generate(new RecordGenerator<>(config, MeasurementKey.class), duration, file); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | ClassNotFoundException ex) { throw new IOException("Failed to generate data", ex); } } + + /** + * Generates new CSV file to simulation a single user with a single device. + * + * @param generator generator to generate data + * @param duration simulation duration expressed in milliseconds + * @param csvFile CSV file to write data to + * @throws IOException if the CSV file cannot be written to + */ + public void generate(RecordGenerator generator, long duration, File csvFile) + throws IOException { + MeasurementKey key = new MeasurementKey("UserID_0", "SourceID_0"); + + try (CsvWriter writer = new CsvWriter(csvFile, generator.getHeader())) { + writer.writeRows(generator.iterateRawValues(key, duration)); + } + } } diff --git a/testing/src/main/java/org/radarcns/mock/MockFile.java b/testing/src/main/java/org/radarcns/mock/data/MockCsvParser.java similarity index 89% rename from testing/src/main/java/org/radarcns/mock/MockFile.java rename to testing/src/main/java/org/radarcns/mock/data/MockCsvParser.java index 13f6409e..3f7c7ef3 100644 --- a/testing/src/main/java/org/radarcns/mock/MockFile.java +++ b/testing/src/main/java/org/radarcns/mock/data/MockCsvParser.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.radarcns.mock; +package org.radarcns.mock.data; import java.io.BufferedReader; import java.io.Closeable; @@ -31,6 +31,7 @@ import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificRecord; import org.radarcns.data.Record; +import org.radarcns.mock.config.MockDataConfig; import org.radarcns.topic.AvroTopic; import org.radarcns.util.CsvParser; @@ -38,7 +39,7 @@ * Parse mock data from a CSV file * @param key type. */ -public class MockFile implements Closeable { +public class MockCsvParser implements Closeable { private static final char ARRAY_SEPARATOR = ';'; private static final char ARRAY_START = '['; private static final char ARRAY_END = ']'; @@ -53,16 +54,17 @@ public class MockFile implements Closeable { /** * Base constructor. - * @param baseFile parent directory of the data file. * @param config configuration of the stream. + * @param root parent directory of the data file. + * @throws IllegalArgumentException if the second row has the wrong number of columns */ - public MockFile(File baseFile, MockDataConfig config) + public MockCsvParser(MockDataConfig config, File root) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, IOException { //noinspection unchecked topic = (AvroTopic) config.parseAvroTopic(); - fileReader = new FileReader(config.getDataFile(baseFile)); + fileReader = new FileReader(config.getDataFile(root)); bufferedReader = new BufferedReader(fileReader); csvReader = new CsvParser(bufferedReader); List header = csvReader.parseLine(); @@ -80,6 +82,10 @@ public AvroTopic getTopic() { /** * Read the next record in the file. + * @throws NullPointerException if a field from the Avro schema is missing as a column + * @throws IllegalArgumentException if the row has the wrong number of columns + * @throws IllegalStateException if a next row is not available + * @throws IOException if the next row could not be read */ public Record next() throws IOException { if (!hasNext()) { diff --git a/testing/src/main/java/org/radarcns/mock/data/MockRecordValidator.java b/testing/src/main/java/org/radarcns/mock/data/MockRecordValidator.java new file mode 100644 index 00000000..687d34fe --- /dev/null +++ b/testing/src/main/java/org/radarcns/mock/data/MockRecordValidator.java @@ -0,0 +1,130 @@ +/* + * Copyright 2017 The Hyve and King's College London + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.radarcns.mock.data; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import org.apache.avro.specific.SpecificRecord; +import org.radarcns.data.Record; +import org.radarcns.key.MeasurementKey; +import org.radarcns.mock.config.MockDataConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CSV files must be validate before using since MockAggregator can handle only files containing + * unique User_ID and Source_ID and having increasing timestamp at each raw. + */ +public class MockRecordValidator { + private static final Logger logger = LoggerFactory.getLogger(MockRecordValidator.class); + private final MockDataConfig config; + private final long duration; + private final File root; + private int timePos; + private double time; + private double startTime; + + /** Create a new validator for given configuration. */ + public MockRecordValidator(MockDataConfig config, long duration, File root) { + this.config = config; + this.duration = duration; + this.root = root; + this.time = Double.NaN; + this.startTime = Double.NaN; + } + + /** + * Verify whether the CSV file can be used or not. + * @throws IllegalArgumentException if the CSV file does not respect the constraints. + */ + public void validate() { + try (MockCsvParser parser = new MockCsvParser<>(config, root)) { + if (!parser.hasNext()) { + throw new IllegalArgumentException("CSV file is empty"); + } + + timePos = config.parseAvroTopic().getValueSchema() + .getField("timeReceived").pos(); + + Record last = null; + long line = 1L; + + while (parser.hasNext()) { + Record record = parser.next(); + checkRecord(record, last, line++); + last = record; + } + + checkDuration(); + checkFrequency(line); + } catch (IOException e) { + error("Cannot open file", -1, e); + } catch (InvocationTargetException e) { + error("Cannot instantiate mock type", -1, e); + } catch (NoSuchMethodException e) { + error("Mock type is not a SpecificRecord", -1, e); + } catch (IllegalAccessException e) { + error("Mock type is not accessible", -1, e); + } catch (ClassNotFoundException e) { + error("Mock type class does not exist", -1, e); + } + } + + private void checkFrequency(long line) { + if (line != config.getFrequency() * duration / 1000L + 1L) { + error("CSV contains fewer messages than expected.", -1L, null); + } + } + + private void checkRecord(Record record, + Record last, long line) { + double previousTime = time; + time = (Double) record.value.get(timePos); + + if (last == null) { + // no checks, only update initial time stamp + startTime = time; + } else if (!last.key.equals(record.key)) { + error("It is possible to test only one user/source at time.", line, null); + } else if (time < previousTime) { + error("Time must increase row by row.", line, null); + } + } + + private void error(String message, long line, Exception ex) { + String mex = config.getDataFile() + " with topic " + config.getTopic() + " is invalid"; + if (line > 0L) { + mex += " on line " + line; + } + mex += ". " + message; + logger.error(mex); + throw new IllegalArgumentException(mex, ex); + } + + private void checkDuration() { + long interval = (long)(time * 1000d) - (long)(startTime * 1000d); + + // add a margin of 50 for clock error purposes + long margin = 50L; + + if (duration <= interval - margin || duration > interval + 1000L + margin) { + error("Data does not cover " + duration + " milliseconds but " + + interval + " instead.", -1L, null); + } + } +} diff --git a/testing/src/main/java/org/radarcns/mock/RecordGenerator.java b/testing/src/main/java/org/radarcns/mock/data/RecordGenerator.java similarity index 97% rename from testing/src/main/java/org/radarcns/mock/RecordGenerator.java rename to testing/src/main/java/org/radarcns/mock/data/RecordGenerator.java index 68e68435..da9df1b5 100644 --- a/testing/src/main/java/org/radarcns/mock/RecordGenerator.java +++ b/testing/src/main/java/org/radarcns/mock/data/RecordGenerator.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.radarcns.mock; +package org.radarcns.mock.data; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; @@ -30,6 +30,7 @@ import org.apache.avro.Schema.Type; import org.apache.avro.specific.SpecificRecord; import org.radarcns.data.Record; +import org.radarcns.mock.config.MockDataConfig; import org.radarcns.topic.AvroTopic; import org.radarcns.util.Metronome; @@ -128,7 +129,7 @@ private Field forceGetField(Schema schema, String name) { /** * Simulates data of a sensor with the given frequency for a time interval specified by * duration. The data is converted to lists of strings. - * @param duration in seconds for the simulation + * @param duration in milliseconds for the simulation * @param key key to generate data with * @return list containing simulated values */ @@ -141,7 +142,7 @@ public Iterator> iterateRawValues(K key, long duration) { /** * Simulates data of a sensor with the given frequency for a time interval specified by * duration. - * @param duration in seconds for the simulation + * @param duration in milliseconds for the simulation * @param key key to generate data with * @return list containing simulated values */ @@ -219,7 +220,7 @@ private class RecordIterator implements public RecordIterator(long duration, K key) { this.key = key; - timestamps = new Metronome(duration * config.getFrequency(), + timestamps = new Metronome(duration * config.getFrequency() / 1000L, config.getFrequency()); offset = 0; } diff --git a/testing/src/main/java/org/radarcns/mock/model/ExpectedArrayValue.java b/testing/src/main/java/org/radarcns/mock/model/ExpectedArrayValue.java new file mode 100644 index 00000000..1e0e8cc3 --- /dev/null +++ b/testing/src/main/java/org/radarcns/mock/model/ExpectedArrayValue.java @@ -0,0 +1,53 @@ +/* + * Copyright 2017 The Hyve and King's College London + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.radarcns.mock.model; + +import java.util.List; +import org.apache.avro.Schema; +import org.radarcns.stream.collector.DoubleArrayCollector; + +/** + * {@code ExpectedValue} represented as {@code Double[]}. + * + * {@link ExpectedValue} + */ +public class ExpectedArrayValue extends ExpectedValue { + /** + * Constructor. + */ + public ExpectedArrayValue(Schema valueSchema, List valueFields) { + super(valueSchema, valueFields); + } + + public ExpectedArrayValue() { + super(); + } + + @Override + protected DoubleArrayCollector createValue() { + return new DoubleArrayCollector(); + } + + @Override + protected void addToValue(DoubleArrayCollector collector, Object[] rawValues) { + double[] values = new double[rawValues.length]; + for (int i = 0; i < values.length; i++) { + values[i] = Double.parseDouble(rawValues[i].toString()); + } + collector.add(values); + } +} diff --git a/testing/src/main/java/org/radarcns/mock/model/ExpectedDoubleValue.java b/testing/src/main/java/org/radarcns/mock/model/ExpectedDoubleValue.java new file mode 100644 index 00000000..2b3207ff --- /dev/null +++ b/testing/src/main/java/org/radarcns/mock/model/ExpectedDoubleValue.java @@ -0,0 +1,49 @@ +/* + * Copyright 2017 The Hyve and King's College London + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.radarcns.mock.model; + +import java.util.List; +import org.apache.avro.Schema; +import org.radarcns.stream.collector.DoubleValueCollector; + +/** + * {@code ExpectedValue} represented as {@code Double}. + * + * {@link ExpectedValue} + */ +public class ExpectedDoubleValue extends ExpectedValue { + /** + * Constructor. + */ + public ExpectedDoubleValue(Schema valueSchema, List valueFields) { + super(valueSchema, valueFields); + } + + public ExpectedDoubleValue() { + super(); + } + + @Override + protected DoubleValueCollector createValue() { + return new DoubleValueCollector(); + } + + @Override + protected void addToValue(DoubleValueCollector collector, Object[] values) { + collector.add(Double.parseDouble(values[0].toString())); + } +} diff --git a/testing/src/main/java/org/radarcns/mock/model/ExpectedValue.java b/testing/src/main/java/org/radarcns/mock/model/ExpectedValue.java new file mode 100644 index 00000000..3b779f54 --- /dev/null +++ b/testing/src/main/java/org/radarcns/mock/model/ExpectedValue.java @@ -0,0 +1,132 @@ +/* + * Copyright 2017 The Hyve and King's College London + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.radarcns.mock.model; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificRecord; +import org.radarcns.data.Record; +import org.radarcns.key.MeasurementKey; + +/** + * It computes the expected value for a test case. + */ +public abstract class ExpectedValue { + // Timewindow length in milliseconds + public static final long DURATION = TimeUnit.SECONDS.toMillis(10); + private final int timeReceivedPos; + private final int[] valuePos; + + private Long lastTimestamp; + private MeasurementKey lastKey; + private V lastValue; + private final Map series; + + /** + * Constructor. + **/ + public ExpectedValue(Schema valueSchema, List valueFields) { + timeReceivedPos = valueSchema.getField("timeReceived").pos(); + + valuePos = new int[valueFields.size()]; + for (int i = 0; i < valuePos.length; i++) { + valuePos[i] = valueSchema.getField(valueFields.get(i)).pos(); + } + + series = new HashMap<>(); + lastTimestamp = 0L; + lastValue = null; + } + + /** + * Constructor. + **/ + public ExpectedValue() { + timeReceivedPos = -1; + valuePos = null; + + series = new HashMap<>(); + lastTimestamp = 0L; + lastValue = null; + } + + /** + * Create a new value for the series. This is called when the time window of a record does not + * match a previous value in the time series. + */ + protected abstract V createValue(); + + /** + * Add record to a single time series value. + * @param value value to add record to. + * @param values values of the record to add + */ + protected abstract void addToValue(V value, Object[] values); + + /** Number of value fields of the records in this expected value. */ + public int getValueFieldLength() { + return valuePos.length; + } + + protected Object getValue(SpecificRecord record, int i) { + return record.get(valuePos[i]); + } + + public Map getSeries() { + return series; + } + + /** + * Add a new record to the series of expected values. + * @param record record to add + */ + public void add(Record record) { + if (timeReceivedPos == -1) { + throw new IllegalStateException("Cannot parse record without a schema."); + } + long timeMillis = (long) ((Double) record.value.get(timeReceivedPos) * 1000d); + Object[] obj = new Object[valuePos.length]; + for (int i = 0; i < valuePos.length; i++) { + obj[i] = record.value.get(valuePos[i]); + } + + add(record.key, timeMillis, obj); + } + + /** + * Add a new record to the series of expected values. + * @param key measurement key + * @param timeMillis time the record is received + * @param values values to add + */ + public void add(MeasurementKey key, long timeMillis, Object... values) { + this.lastKey = key; + if (timeMillis >= lastTimestamp + DURATION || lastValue == null) { + lastTimestamp = timeMillis - (timeMillis % DURATION); + lastValue = createValue(); + getSeries().put(lastTimestamp, lastValue); + } + addToValue(lastValue, values); + } + + public MeasurementKey getLastKey() { + return lastKey; + } +} diff --git a/testing/src/main/java/org/radarcns/mock/model/MockAggregator.java b/testing/src/main/java/org/radarcns/mock/model/MockAggregator.java new file mode 100644 index 00000000..51108d09 --- /dev/null +++ b/testing/src/main/java/org/radarcns/mock/model/MockAggregator.java @@ -0,0 +1,90 @@ +/* + * Copyright 2017 The Hyve and King's College London + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.radarcns.mock.model; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.avro.Schema; +import org.radarcns.key.MeasurementKey; +import org.radarcns.mock.config.MockDataConfig; +import org.radarcns.mock.data.MockCsvParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The MockAggregator simulates the behaviour of a Kafka Streams application based on time window. + * It supported accumulators are
    + *
  • array of {@code Double} + *
  • singleton {@code Double} + *
+ */ +public final class MockAggregator { + private static final Logger logger = LoggerFactory.getLogger(MockAggregator.class); + + /** + * Default constructor. + */ + private MockAggregator() { + // utility class + } + + /** + * Simulates all possible test case scenarios configured in mock-configuration. + * + * @return {@code Map} of key {@code MockDataConfig} and value {@code ExpectedValue}. {@link + * ExpectedDoubleValue}. + **/ + public static Map getSimulations( + List mockDataConfigs, File root) throws IOException { + + Map expectedValue = new HashMap<>(); + + for (MockDataConfig config : mockDataConfigs) { + if (config.getValueFields() == null || config.getValueFields().isEmpty()) { + logger.warn("No value fields specified for {}. Skipping.", config.getTopic()); + continue; + } + + try (MockCsvParser parser = new MockCsvParser<>(config, root)) { + Schema valueSchema = config.parseAvroTopic().getValueSchema(); + List valueFields = config.getValueFields(); + + ExpectedValue value; + if (config.getValueFields().size() == 1) { + value = new ExpectedDoubleValue(valueSchema, valueFields); + } else { + value = new ExpectedArrayValue(valueSchema, valueFields); + } + + while (parser.hasNext()) { + value.add(parser.next()); + } + + expectedValue.put(config, value); + } catch (NoSuchMethodException | IllegalAccessException | ClassNotFoundException + | InvocationTargetException ex) { + throw new IllegalArgumentException("Could not read topic", ex); + } + } + + return expectedValue; + } +} \ No newline at end of file diff --git a/testing/src/test/java/org/radarcns/mock/CsvGeneratorTest.java b/testing/src/test/java/org/radarcns/mock/CsvGeneratorTest.java new file mode 100644 index 00000000..bf4d2911 --- /dev/null +++ b/testing/src/test/java/org/radarcns/mock/CsvGeneratorTest.java @@ -0,0 +1,108 @@ +/* + * Copyright 2017 The Hyve and King's College London + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.radarcns.mock; + +import static org.junit.Assert.*; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.radarcns.key.MeasurementKey; +import org.radarcns.mock.config.MockDataConfig; +import org.radarcns.mock.data.CsvGenerator; +import org.radarcns.mock.data.RecordGenerator; +import org.radarcns.phone.PhoneLight; +import org.radarcns.util.CsvParser; + +public class CsvGeneratorTest { + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private MockDataConfig makeConfig() throws IOException { + MockDataConfig config = new MockDataConfig(); + config.setDataFile(folder.newFile().getAbsolutePath()); + config.setValueSchema(PhoneLight.class.getName()); + config.setValueField("light"); + config.setTopic("test"); + return config; + } + + @Test + public void generateMockConfig() throws IOException { + CsvGenerator generator = new CsvGenerator(); + + MockDataConfig config = makeConfig(); + generator.generate(config, 100_000L, folder.getRoot()); + + CsvParser parser = new CsvParser(new BufferedReader(new FileReader(config.getDataFile()))); + List headers = Arrays.asList( + "userId", "sourceId", "time", "timeReceived", "light"); + assertEquals(headers, parser.parseLine()); + + int n = 0; + List line; + while ((line = parser.parseLine()) != null) { + String value = line.get(4); + assertNotEquals("NaN", value); + assertNotEquals("Infinity", value); + assertNotEquals("-Infinity", value); + // no decimals lost or appended + assertEquals(value, Float.valueOf(value).toString()); + n++; + } + assertEquals(100, n); + } + + @Test + public void generateGenerator() + throws IOException, ClassNotFoundException, NoSuchMethodException, + IllegalAccessException, InvocationTargetException { + CsvGenerator generator = new CsvGenerator(); + + MockDataConfig config = makeConfig(); + + final String time = Double.toString(System.currentTimeMillis() / 1000d); + + RecordGenerator recordGenerator = new RecordGenerator( + config, MeasurementKey.class) { + @Override + public Iterator> iterateRawValues(MeasurementKey key, long duration) { + return Collections.singletonList( + Arrays.asList("UserID_0", "SourceID_0", time, time, + Float.valueOf((float)0.123112412410423518).toString())) + .iterator(); + } + }; + + generator.generate(recordGenerator, 1000L, new File(config.getDataFile())); + + CsvParser parser = new CsvParser(new BufferedReader(new FileReader(config.getDataFile()))); + assertEquals(recordGenerator.getHeader(), parser.parseLine()); + // float will cut off a lot of decimals + assertEquals(Arrays.asList("UserID_0", "SourceID_0", time, time, "0.12311241"), + parser.parseLine()); + } +} \ No newline at end of file diff --git a/testing/src/test/java/org/radarcns/mock/RecordGeneratorTest.java b/testing/src/test/java/org/radarcns/mock/RecordGeneratorTest.java index 652f05d0..f4c95d09 100644 --- a/testing/src/test/java/org/radarcns/mock/RecordGeneratorTest.java +++ b/testing/src/test/java/org/radarcns/mock/RecordGeneratorTest.java @@ -25,6 +25,8 @@ import org.radarcns.data.Record; import org.radarcns.empatica.EmpaticaE4Acceleration; import org.radarcns.key.MeasurementKey; +import org.radarcns.mock.config.MockDataConfig; +import org.radarcns.mock.data.RecordGenerator; /** * Created by joris on 17/05/2017. diff --git a/testing/src/test/java/org/radarcns/mock/data/MockRecordValidatorTest.java b/testing/src/test/java/org/radarcns/mock/data/MockRecordValidatorTest.java new file mode 100644 index 00000000..190d8c9c --- /dev/null +++ b/testing/src/test/java/org/radarcns/mock/data/MockRecordValidatorTest.java @@ -0,0 +1,180 @@ +/* + * Copyright 2017 The Hyve and King's College London + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.radarcns.mock.data; + +import java.io.FileWriter; +import java.io.IOException; +import java.util.Arrays; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.radarcns.mock.config.MockDataConfig; +import org.radarcns.phone.PhoneAcceleration; +import org.radarcns.phone.PhoneLight; + +public class MockRecordValidatorTest { + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Rule + public ExpectedException exception = ExpectedException.none(); + + private MockDataConfig makeConfig() throws IOException { + MockDataConfig config = new MockDataConfig(); + config.setDataFile(folder.newFile().getAbsolutePath()); + config.setValueSchema(PhoneLight.class.getName()); + config.setValueField("light"); + config.setTopic("test"); + return config; + } + + @Test + public void validate() throws Exception { + CsvGenerator generator = new CsvGenerator(); + + MockDataConfig config = makeConfig(); + generator.generate(config, 100_000L, folder.getRoot()); + + // doesn't throw + new MockRecordValidator(config, 100_000L, folder.getRoot()).validate(); + } + + @Test + public void validateWrongDuration() throws Exception { + CsvGenerator generator = new CsvGenerator(); + + MockDataConfig config = makeConfig(); + generator.generate(config, 100_000L, folder.getRoot()); + + exception.expect(IllegalArgumentException.class); + new MockRecordValidator(config, 10_000L, folder.getRoot()).validate(); + } + + @Test + public void validateCustom() throws Exception { + MockDataConfig config = makeConfig(); + + try (FileWriter writer = new FileWriter(config.getDataFile(folder.getRoot()))) { + writer.append("userId,sourceId,time,timeReceived,light\n"); + writer.append("a,b,1,1,1\n"); + writer.append("a,b,1,2,1\n"); + } + + new MockRecordValidator(config, 2_000L, folder.getRoot()).validate(); + } + + @Test + public void validateWrongKey() throws Exception { + MockDataConfig config = makeConfig(); + + try (FileWriter writer = new FileWriter(config.getDataFile(folder.getRoot()))) { + writer.append("userId,sourceId,time,timeReceived,light\n"); + writer.append("a,b,1,1,1\n"); + writer.append("a,c,1,2,1\n"); + } + + exception.expect(IllegalArgumentException.class); + new MockRecordValidator(config, 2_000L, folder.getRoot()).validate(); + } + + @Test + public void validateWrongTime() throws Exception { + MockDataConfig config = makeConfig(); + + try (FileWriter writer = new FileWriter(config.getDataFile(folder.getRoot()))) { + writer.append("userId,sourceId,time,timeReceived,light\n"); + writer.append("a,b,1,1,1\n"); + writer.append("a,b,1,0,1\n"); + } + + exception.expect(IllegalArgumentException.class); + new MockRecordValidator(config, 2_000L, folder.getRoot()).validate(); + } + + + @Test + public void validateMissingKeyField() throws Exception { + MockDataConfig config = makeConfig(); + + try (FileWriter writer = new FileWriter(config.getDataFile(folder.getRoot()))) { + writer.append("userId,time,timeReceived,light\n"); + writer.append("a,1,1,1\n"); + writer.append("a,1,2,1\n"); + } + + exception.expect(NullPointerException.class); + new MockRecordValidator(config, 2_000L, folder.getRoot()).validate(); + } + + @Test + public void validateMissingValueField() throws Exception { + MockDataConfig config = makeConfig(); + + try (FileWriter writer = new FileWriter(config.getDataFile(folder.getRoot()))) { + writer.append("userId,sourceId,time,light\n"); + writer.append("a,b,1,1\n"); + writer.append("a,b,1,2\n"); + } + + exception.expect(NullPointerException.class); + new MockRecordValidator(config, 2_000L, folder.getRoot()).validate(); + } + + @Test + public void validateMissingValue() throws Exception { + MockDataConfig config = makeConfig(); + + try (FileWriter writer = new FileWriter(config.getDataFile(folder.getRoot()))) { + writer.append("userId,sourceId,time,timeReceived,light\n"); + writer.append("a,b,1,1\n"); + writer.append("a,b,1,2,1\n"); + } + + exception.expect(IllegalArgumentException.class); + new MockRecordValidator(config, 2_000L, folder.getRoot()).validate(); + } + + @Test + public void validateWrongValueType() throws Exception { + MockDataConfig config = makeConfig(); + + try (FileWriter writer = new FileWriter(config.getDataFile(folder.getRoot()))) { + writer.append("userId,sourceId,time,timeReceived,light\n"); + writer.append("a,b,1,1,a\n"); + writer.append("a,b,1,2,b\n"); + } + + exception.expect(NumberFormatException.class); + new MockRecordValidator(config, 2_000L, folder.getRoot()).validate(); + } + + @Test + public void validateMultipleFields() throws Exception { + MockDataConfig config = makeConfig(); + config.setValueSchema(PhoneAcceleration.class.getName()); + config.setValueFields(Arrays.asList("x", "y", "z")); + + try (FileWriter writer = new FileWriter(config.getDataFile(folder.getRoot()))) { + writer.append("userId,sourceId,time,timeReceived,x,y,z\n"); + writer.append("a,b,1,1,1,1,1\n"); + writer.append("a,b,1,2,1,1,1\n"); + } + + new MockRecordValidator(config, 2_000L, folder.getRoot()).validate(); + } +} \ No newline at end of file