diff --git a/.gitignore b/.gitignore index 65064a7c..ba935722 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,7 @@ log/ # Gradle .gradle/ .gradletasknamecache +local.properties # Build build/ @@ -22,3 +23,4 @@ build/ # Generated compile files /out/ backend.log +local.properties diff --git a/README.md b/README.md index 4a635df0..d09d19c3 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ repositories { } dependencies { - compile group: 'org.radarcns', name: 'radar-commons', version: '0.5' + compile group: 'org.radarcns', name: 'radar-commons', version: '0.6-alpha.1' } ``` @@ -26,7 +26,7 @@ repositories { } dependencies { - testCompile group: 'org.radarcns', name: 'radar-commons-testing', version: '0.5' + testCompile group: 'org.radarcns', name: 'radar-commons-testing', version: '0.6-alpha.1' } ``` @@ -51,7 +51,7 @@ configurations.all { } dependencies { - compile group: 'org.radarcns', name: 'radar-commons', version: '0.5.1-SNAPSHOT', changing: true + compile group: 'org.radarcns', name: 'radar-commons', version: '0.6-SNAPSHOT', changing: true } ``` diff --git a/build.gradle b/build.gradle index 68a99669..46ce5047 100644 --- a/build.gradle +++ b/build.gradle @@ -16,12 +16,10 @@ plugins { // Get bintray version - id "com.jfrog.bintray" version "1.7.3" - id "com.jfrog.artifactory" version "4.4.18" + id 'com.jfrog.bintray' version '1.7.3' + id 'com.jfrog.artifactory' version '4.4.18' } -apply plugin: "com.jfrog.artifactory" - allprojects { // Apply the plugins apply plugin: 'java' @@ -32,29 +30,30 @@ allprojects { apply plugin: 'com.jfrog.bintray' apply plugin: 'maven-publish' apply plugin: 'jacoco' + apply plugin: 'com.jfrog.artifactory' //---------------------------------------------------------------------------// // Configuration // //---------------------------------------------------------------------------// - version = '0.5' + version = '0.6-alpha.1' group = 'org.radarcns' ext.githubRepoName = 'RADAR-CNS/RADAR-Commons' ext.slf4jVersion = '1.7.21' - ext.kafkaVersion = '0.10.2.1' - ext.avroVersion = '1.8.1' - ext.confluentVersion = '3.1.2' + ext.kafkaVersion = '0.11.0.1' + ext.avroVersion = '1.8.2' + ext.confluentVersion = '3.3.0' ext.log4jVersion = '2.7' ext.jacksonVersion = '2.8.5' - ext.okhttpVersion = '3.6.0' - ext.okioVersion = '1.11.0' + ext.okhttpVersion = '3.8.0' ext.junitVersion = '4.12' ext.mockitoVersion = '2.2.29' ext.mathVersion = '3.0' ext.hamcrestVersion = '1.3' ext.codacyVersion = '1.0.10' - ext.radarSchemasVersion = '0.1' + ext.radarSchemasVersion = '0.2-alpha.3-SNAPSHOT' + ext.jsonVersion = '20170516' ext.githubUrl = 'https://github.com/' + githubRepoName + '.git' ext.issueUrl = 'https://github.com/' + githubRepoName + '/issues' @@ -67,6 +66,9 @@ allprojects { jcenter() maven { url 'http://packages.confluent.io/maven/' } maven { url 'http://dl.bintray.com/typesafe/maven-releases' } + flatDir { + dirs "${project.rootDir}/libs" + } } ext.pomConfig = { @@ -191,14 +193,24 @@ configurations { codacy } +configurations.compile { + resolutionStrategy.cacheChangingModulesFor 0, 'SECONDS' +} + // In this section you declare where to find the dependencies of your project repositories { maven { url 'https://jitpack.io' } + maven { url 'https://oss.jfrog.org/artifactory/oss-snapshot-local' } } // In this section you declare the dependencies for your production and test code dependencies { - api group: 'org.apache.avro', name: 'avro', version: avroVersion + api (group: 'org.apache.avro', name: 'avro', version: avroVersion) { + exclude group: 'org.xerial.snappy', module: 'snappy-java' + exclude group: 'com.thoughtworks.paranamer', module: 'paranamer' + exclude group: 'org.apache.commons', module: 'commons-compress' + exclude group: 'org.tukaani', module: 'xz' + } // to implement producers and consumers api group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaVersion @@ -207,9 +219,7 @@ dependencies { // For POJO classes and ConfigLoader implementation group: 'com.fasterxml.jackson.core' , name: 'jackson-databind' , version: jacksonVersion implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: jacksonVersion - - // The REST data is serialized with okio - implementation group: 'com.squareup.okio', name: 'okio', version: okioVersion + implementation group: 'org.json', name: 'json', version: jsonVersion // The production code uses the SLF4J logging API at compile time implementation group: 'org.slf4j', name:'slf4j-api', version: slf4jVersion @@ -352,10 +362,6 @@ artifactory { artifactoryPublish { publications('RadarCommonsPublication') - publishBuildInfo = true //Publish build-info to Artifactory (true by default) - publishArtifacts = true //Publish artifacts to Artifactory (true by default) - publishPom = true //Publish generated POM files to Artifactory (true by default). - publishIvy = false //Publish generated Ivy descriptor files to Artifactory (true by default). } task wrapper(type: Wrapper) { diff --git a/libs/.gitignore b/libs/.gitignore new file mode 100644 index 00000000..d392f0e8 --- /dev/null +++ b/libs/.gitignore @@ -0,0 +1 @@ +*.jar diff --git a/testing/.gitignore b/radar-commons-testing/.gitignore similarity index 100% rename from testing/.gitignore rename to radar-commons-testing/.gitignore diff --git a/testing/build.gradle b/radar-commons-testing/build.gradle similarity index 87% rename from testing/build.gradle rename to radar-commons-testing/build.gradle index fccab482..f421ba37 100644 --- a/testing/build.gradle +++ b/radar-commons-testing/build.gradle @@ -32,6 +32,10 @@ ext.description = 'RADAR Common testing library mocking code and utilities.' targetCompatibility = '1.7' sourceCompatibility = '1.7' +repositories { + maven { url 'https://oss.jfrog.org/artifactory/oss-snapshot-local' } +} + dependencies { api rootProject api group: 'org.apache.avro', name: 'avro', version: avroVersion @@ -136,4 +140,21 @@ bintray { released = new Date() } } -} \ No newline at end of file +} + + +artifactory { + contextUrl = 'https://oss.jfrog.org/artifactory' + publish { + repository { + repoKey = 'oss-snapshot-local' + username = project.hasProperty('bintrayUser') ? project.property('bintrayUser') : System.getenv('BINTRAY_USER') + password = project.hasProperty('bintrayApiKey') ? project.property('bintrayApiKey') : System.getenv('BINTRAY_API_KEY') + maven = true + } + } +} + +artifactoryPublish { + publications('RadarCommonsTestingPublication') +} diff --git a/testing/mock.yml.template b/radar-commons-testing/mock.yml.template similarity index 100% rename from testing/mock.yml.template rename to radar-commons-testing/mock.yml.template diff --git a/testing/src/main/java/org/radarcns/mock/MockDevice.java b/radar-commons-testing/src/main/java/org/radarcns/mock/MockDevice.java similarity index 100% rename from testing/src/main/java/org/radarcns/mock/MockDevice.java rename to radar-commons-testing/src/main/java/org/radarcns/mock/MockDevice.java diff --git a/testing/src/main/java/org/radarcns/mock/MockFileSender.java b/radar-commons-testing/src/main/java/org/radarcns/mock/MockFileSender.java similarity index 94% rename from testing/src/main/java/org/radarcns/mock/MockFileSender.java rename to radar-commons-testing/src/main/java/org/radarcns/mock/MockFileSender.java index 1a38160d..3bf5926a 100644 --- a/testing/src/main/java/org/radarcns/mock/MockFileSender.java +++ b/radar-commons-testing/src/main/java/org/radarcns/mock/MockFileSender.java @@ -19,7 +19,7 @@ import java.io.IOException; import org.apache.avro.specific.SpecificRecord; import org.radarcns.data.Record; -import org.radarcns.key.MeasurementKey; +import org.radarcns.kafka.ObservationKey; import org.radarcns.mock.data.MockCsvParser; import org.radarcns.producer.KafkaSender; import org.radarcns.producer.KafkaTopicSender; @@ -33,7 +33,7 @@ public class MockFileSender { private final KafkaSender sender; private final MockCsvParser parser; - public MockFileSender(KafkaSender sender, + public MockFileSender(KafkaSender sender, MockCsvParser parser) { this.parser = parser; this.sender = sender; diff --git a/testing/src/main/java/org/radarcns/mock/MockProducer.java b/radar-commons-testing/src/main/java/org/radarcns/mock/MockProducer.java similarity index 87% rename from testing/src/main/java/org/radarcns/mock/MockProducer.java rename to radar-commons-testing/src/main/java/org/radarcns/mock/MockProducer.java index 70e224df..0ab94ed7 100644 --- a/testing/src/main/java/org/radarcns/mock/MockProducer.java +++ b/radar-commons-testing/src/main/java/org/radarcns/mock/MockProducer.java @@ -34,13 +34,13 @@ import org.radarcns.config.ServerConfig; import org.radarcns.config.YamlConfigLoader; import org.radarcns.data.SpecificRecordEncoder; -import org.radarcns.empatica.EmpaticaE4Acceleration; -import org.radarcns.empatica.EmpaticaE4BatteryLevel; -import org.radarcns.empatica.EmpaticaE4BloodVolumePulse; -import org.radarcns.empatica.EmpaticaE4ElectroDermalActivity; -import org.radarcns.empatica.EmpaticaE4InterBeatInterval; -import org.radarcns.empatica.EmpaticaE4Temperature; -import org.radarcns.key.MeasurementKey; +import org.radarcns.passive.empatica.EmpaticaE4Acceleration; +import org.radarcns.passive.empatica.EmpaticaE4BatteryLevel; +import org.radarcns.passive.empatica.EmpaticaE4BloodVolumePulse; +import org.radarcns.passive.empatica.EmpaticaE4ElectroDermalActivity; +import org.radarcns.passive.empatica.EmpaticaE4InterBeatInterval; +import org.radarcns.passive.empatica.EmpaticaE4Temperature; +import org.radarcns.kafka.ObservationKey; import org.radarcns.mock.config.BasicMockConfig; import org.radarcns.mock.config.MockDataConfig; import org.radarcns.mock.data.MockCsvParser; @@ -64,9 +64,9 @@ public class MockProducer { private static final Logger logger = LoggerFactory.getLogger(MockProducer.class); - private final List> devices; + private final List> devices; private final List files; - private final List> senders; + private final List> senders; private final SchemaRetriever retriever; /** @@ -89,7 +89,7 @@ public MockProducer(BasicMockConfig mockConfig, File root) throws IOException { int numDevices = mockConfig.getNumberOfDevices(); retriever = new SchemaRetriever(mockConfig.getSchemaRegistry(), 10); - List> tmpSenders = null; + List> tmpSenders = null; try { devices = new ArrayList<>(numDevices); @@ -100,8 +100,8 @@ public MockProducer(BasicMockConfig mockConfig, File root) throws IOException { dataConfigs = defaultDataConfig(); } - List> generators; - List> mockFiles; + List> generators; + List> mockFiles; try { generators = createGenerators(dataConfigs); mockFiles = createMockFiles(dataConfigs, root); @@ -117,7 +117,7 @@ public MockProducer(BasicMockConfig mockConfig, File root) throws IOException { String sourceId = "SourceID_"; for (int i = 0; i < numDevices; i++) { - MeasurementKey key = new MeasurementKey(userId + i, sourceId + i); + ObservationKey key = new ObservationKey("test", userId + i, sourceId + i); devices.add(new MockDevice<>(tmpSenders.get(i), key, generators)); } } @@ -138,7 +138,7 @@ public MockProducer(BasicMockConfig mockConfig, File root) throws IOException { senders = tmpSenders; } - private List> createSenders( + private List> createSenders( BasicMockConfig mockConfig, int numDevices) { if (mockConfig.isDirectProducer()) { @@ -150,9 +150,9 @@ private List> createSenders( } /** Create senders that directly produce data to Kafka. */ - private List> createDirectSenders(int numDevices, + private List> createDirectSenders(int numDevices, SchemaRetriever retriever, String brokerPaths) { - List> result = new ArrayList<>(numDevices); + List> result = new ArrayList<>(numDevices); for (int i = 0; i < numDevices; i++) { Properties properties = new Properties(); properties.put(KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); @@ -160,18 +160,18 @@ private List> createDirectSenders(in properties.put(SCHEMA_REGISTRY_CONFIG, retriever); properties.put(BOOTSTRAP_SERVERS_CONFIG, brokerPaths); - result.add(new DirectSender(properties)); + result.add(new DirectSender(properties)); } return result; } /** Create senders that produce data to Kafka via the REST proxy. */ - private List> createRestSenders(int numDevices, + private List> createRestSenders(int numDevices, SchemaRetriever retriever, ServerConfig restProxy, boolean useCompression) { - List> result = new ArrayList<>(numDevices); + List> result = new ArrayList<>(numDevices); ConnectionState sharedState = new ConnectionState(10, TimeUnit.SECONDS); - RestSender.Builder restBuilder = - new RestSender.Builder() + RestSender.Builder restBuilder = + new RestSender.Builder() .server(restProxy) .schemaRetriever(retriever) .useCompression(useCompression) @@ -181,7 +181,7 @@ private List> createRestSenders(int .connectionTimeout(10, TimeUnit.SECONDS); for (int i = 0; i < numDevices; i++) { - RestSender firstSender = restBuilder + RestSender firstSender = restBuilder .connectionPool(new ManagedConnectionPool()) .build(); @@ -214,7 +214,7 @@ public void shutdown() throws IOException, InterruptedException { device.join(5_000L); } logger.info("Closing channels"); - for (KafkaSender sender : senders) { + for (KafkaSender sender : senders) { sender.close(); } retriever.close(); @@ -344,27 +344,27 @@ private List defaultDataConfig() { return Arrays.asList(acceleration, battery, bvp, eda, ibi, temperature); } - private List> createGenerators(List configs) + private List> createGenerators(List configs) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException { - List> result = new ArrayList<>(configs.size()); + List> result = new ArrayList<>(configs.size()); for (MockDataConfig config : configs) { if (config.getDataFile() == null) { - result.add(new RecordGenerator<>(config, MeasurementKey.class)); + result.add(new RecordGenerator<>(config, ObservationKey.class)); } } return result; } - private List> createMockFiles(List configs, + private List> createMockFiles(List configs, File dataRoot) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, IOException { - List> result = new ArrayList<>(configs.size()); + List> result = new ArrayList<>(configs.size()); File parent = dataRoot; if (parent == null) { @@ -373,7 +373,7 @@ private List> createMockFiles(List for (MockDataConfig config : configs) { if (config.getDataFile() != null) { - result.add(new MockCsvParser(config, parent)); + result.add(new MockCsvParser(config, parent)); } } diff --git a/testing/src/main/java/org/radarcns/mock/config/BasicMockConfig.java b/radar-commons-testing/src/main/java/org/radarcns/mock/config/BasicMockConfig.java similarity index 100% rename from testing/src/main/java/org/radarcns/mock/config/BasicMockConfig.java rename to radar-commons-testing/src/main/java/org/radarcns/mock/config/BasicMockConfig.java diff --git a/testing/src/main/java/org/radarcns/mock/config/MockDataConfig.java b/radar-commons-testing/src/main/java/org/radarcns/mock/config/MockDataConfig.java similarity index 91% rename from testing/src/main/java/org/radarcns/mock/config/MockDataConfig.java rename to radar-commons-testing/src/main/java/org/radarcns/mock/config/MockDataConfig.java index be3f6af6..018bc4da 100644 --- a/testing/src/main/java/org/radarcns/mock/config/MockDataConfig.java +++ b/radar-commons-testing/src/main/java/org/radarcns/mock/config/MockDataConfig.java @@ -17,15 +17,15 @@ package org.radarcns.mock.config; import com.fasterxml.jackson.annotation.JsonProperty; -import java.io.File; -import java.lang.reflect.InvocationTargetException; -import java.util.Collections; -import java.util.List; import org.apache.avro.specific.SpecificRecord; import org.radarcns.config.AvroTopicConfig; -import org.radarcns.key.MeasurementKey; +import org.radarcns.kafka.ObservationKey; import org.radarcns.topic.AvroTopic; +import java.io.File; +import java.util.Collections; +import java.util.List; + public class MockDataConfig extends AvroTopicConfig { @JsonProperty("file") private String dataFile; @@ -46,15 +46,13 @@ public class MockDataConfig extends AvroTopicConfig { private double maximum = 1e5; /** - * Parse an AvroTopic from the values in this class. If keySchema is not set, MeasurementKey + * Parse an AvroTopic from the values in this class. If keySchema is not set, ObservationKey * will be used as a key schema. */ @Override - public AvroTopic parseAvroTopic() - throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, - IllegalAccessException { + public AvroTopic parseAvroTopic() { if (getKeySchema() == null) { - setKeySchema(MeasurementKey.class.getName()); + setKeySchema(ObservationKey.class.getName()); } return super.parseAvroTopic(); } diff --git a/testing/src/main/java/org/radarcns/mock/data/CsvGenerator.java b/radar-commons-testing/src/main/java/org/radarcns/mock/data/CsvGenerator.java similarity index 90% rename from testing/src/main/java/org/radarcns/mock/data/CsvGenerator.java rename to radar-commons-testing/src/main/java/org/radarcns/mock/data/CsvGenerator.java index 2f968769..7e143662 100644 --- a/testing/src/main/java/org/radarcns/mock/data/CsvGenerator.java +++ b/radar-commons-testing/src/main/java/org/radarcns/mock/data/CsvGenerator.java @@ -19,7 +19,7 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.InvocationTargetException; -import org.radarcns.key.MeasurementKey; +import org.radarcns.kafka.ObservationKey; import org.radarcns.mock.config.MockDataConfig; import org.radarcns.util.CsvWriter; @@ -41,7 +41,7 @@ public void generate(MockDataConfig config, long duration, File root) File file = config.getDataFile(root); try { - generate(new RecordGenerator<>(config, MeasurementKey.class), duration, file); + generate(new RecordGenerator<>(config, ObservationKey.class), duration, file); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | ClassNotFoundException ex) { throw new IOException("Failed to generate data", ex); @@ -56,9 +56,9 @@ public void generate(MockDataConfig config, long duration, File root) * @param csvFile CSV file to write data to * @throws IOException if the CSV file cannot be written to */ - public void generate(RecordGenerator generator, long duration, File csvFile) + public void generate(RecordGenerator generator, long duration, File csvFile) throws IOException { - MeasurementKey key = new MeasurementKey("UserID_0", "SourceID_0"); + ObservationKey key = new ObservationKey("test", "UserID_0", "SourceID_0"); try (CsvWriter writer = new CsvWriter(csvFile, generator.getHeader())) { writer.writeRows(generator.iterateRawValues(key, duration)); diff --git a/testing/src/main/java/org/radarcns/mock/data/MockCsvParser.java b/radar-commons-testing/src/main/java/org/radarcns/mock/data/MockCsvParser.java similarity index 86% rename from testing/src/main/java/org/radarcns/mock/data/MockCsvParser.java rename to radar-commons-testing/src/main/java/org/radarcns/mock/data/MockCsvParser.java index 3f7c7ef3..25556e68 100644 --- a/testing/src/main/java/org/radarcns/mock/data/MockCsvParser.java +++ b/radar-commons-testing/src/main/java/org/radarcns/mock/data/MockCsvParser.java @@ -139,12 +139,40 @@ private static Object parseValue(Schema schema, String fieldString) { return fieldString; case ARRAY: return parseArray(schema, fieldString); + case UNION: + return parseUnion(schema, fieldString); default: throw new IllegalArgumentException("Cannot handle schemas of type " + schema.getType()); } } + private static Object parseUnion(Schema schema, String fieldString) { + if (schema.getTypes().size() != 2) { + throw new IllegalArgumentException( + "Cannot handle UNION types with other than two internal types: " + + schema.getTypes()); + } + Schema schema0 = schema.getTypes().get(0); + Schema schema1 = schema.getTypes().get(1); + + Schema nonNullSchema; + if (schema0.getType() == Schema.Type.NULL) { + nonNullSchema = schema1; + } else if (schema1.getType() == Schema.Type.NULL) { + nonNullSchema = schema0; + } else { + throw new IllegalArgumentException("Cannot handle non-nullable UNION types: " + + schema.getTypes()); + } + + if (fieldString.isEmpty() || fieldString.equals("null")) { + return null; + } else { + return parseValue(nonNullSchema, fieldString); + } + } + private static List parseArray(Schema schema, String fieldString) { if (fieldString.charAt(0) != ARRAY_START || fieldString.charAt(fieldString.length() - 1) != ARRAY_END) { diff --git a/testing/src/main/java/org/radarcns/mock/data/MockRecordValidator.java b/radar-commons-testing/src/main/java/org/radarcns/mock/data/MockRecordValidator.java similarity index 92% rename from testing/src/main/java/org/radarcns/mock/data/MockRecordValidator.java rename to radar-commons-testing/src/main/java/org/radarcns/mock/data/MockRecordValidator.java index 687d34fe..0c09160a 100644 --- a/testing/src/main/java/org/radarcns/mock/data/MockRecordValidator.java +++ b/radar-commons-testing/src/main/java/org/radarcns/mock/data/MockRecordValidator.java @@ -21,7 +21,7 @@ import java.lang.reflect.InvocationTargetException; import org.apache.avro.specific.SpecificRecord; import org.radarcns.data.Record; -import org.radarcns.key.MeasurementKey; +import org.radarcns.kafka.ObservationKey; import org.radarcns.mock.config.MockDataConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +53,7 @@ public MockRecordValidator(MockDataConfig config, long duration, File root) { * @throws IllegalArgumentException if the CSV file does not respect the constraints. */ public void validate() { - try (MockCsvParser parser = new MockCsvParser<>(config, root)) { + try (MockCsvParser parser = new MockCsvParser<>(config, root)) { if (!parser.hasNext()) { throw new IllegalArgumentException("CSV file is empty"); } @@ -61,11 +61,11 @@ public void validate() { timePos = config.parseAvroTopic().getValueSchema() .getField("timeReceived").pos(); - Record last = null; + Record last = null; long line = 1L; while (parser.hasNext()) { - Record record = parser.next(); + Record record = parser.next(); checkRecord(record, last, line++); last = record; } @@ -91,8 +91,8 @@ private void checkFrequency(long line) { } } - private void checkRecord(Record record, - Record last, long line) { + private void checkRecord(Record record, + Record last, long line) { double previousTime = time; time = (Double) record.value.get(timePos); diff --git a/testing/src/main/java/org/radarcns/mock/data/RecordGenerator.java b/radar-commons-testing/src/main/java/org/radarcns/mock/data/RecordGenerator.java similarity index 98% rename from testing/src/main/java/org/radarcns/mock/data/RecordGenerator.java rename to radar-commons-testing/src/main/java/org/radarcns/mock/data/RecordGenerator.java index da9df1b5..8d7d270b 100644 --- a/testing/src/main/java/org/radarcns/mock/data/RecordGenerator.java +++ b/radar-commons-testing/src/main/java/org/radarcns/mock/data/RecordGenerator.java @@ -69,7 +69,7 @@ public RecordGenerator(MockDataConfig config, Class keyClass) topic = (AvroTopic) config.parseAvroTopic(); if (!topic.getKeyClass().equals(keyClass)) { throw new IllegalArgumentException( - "RecordGenerator only generates MeasurementKey keys, not " + "RecordGenerator only generates ObservationKey keys, not " + topic.getKeyClass() + " in topic " + topic); } if (!SpecificRecord.class.isAssignableFrom(topic.getValueClass())) { @@ -78,7 +78,7 @@ public RecordGenerator(MockDataConfig config, Class keyClass) + topic.getValueClass() + " in topic " + topic); } header = new ArrayList<>(); - header.addAll(Arrays.asList("userId", "sourceId")); + header.addAll(Arrays.asList("projectId", "userId", "sourceId")); // cache key and value fields Schema valueSchema = topic.getValueSchema(); diff --git a/testing/src/main/java/org/radarcns/mock/model/ExpectedArrayValue.java b/radar-commons-testing/src/main/java/org/radarcns/mock/model/ExpectedArrayValue.java similarity index 100% rename from testing/src/main/java/org/radarcns/mock/model/ExpectedArrayValue.java rename to radar-commons-testing/src/main/java/org/radarcns/mock/model/ExpectedArrayValue.java diff --git a/testing/src/main/java/org/radarcns/mock/model/ExpectedDoubleValue.java b/radar-commons-testing/src/main/java/org/radarcns/mock/model/ExpectedDoubleValue.java similarity index 100% rename from testing/src/main/java/org/radarcns/mock/model/ExpectedDoubleValue.java rename to radar-commons-testing/src/main/java/org/radarcns/mock/model/ExpectedDoubleValue.java diff --git a/testing/src/main/java/org/radarcns/mock/model/ExpectedValue.java b/radar-commons-testing/src/main/java/org/radarcns/mock/model/ExpectedValue.java similarity index 93% rename from testing/src/main/java/org/radarcns/mock/model/ExpectedValue.java rename to radar-commons-testing/src/main/java/org/radarcns/mock/model/ExpectedValue.java index 3b779f54..820f28ea 100644 --- a/testing/src/main/java/org/radarcns/mock/model/ExpectedValue.java +++ b/radar-commons-testing/src/main/java/org/radarcns/mock/model/ExpectedValue.java @@ -23,7 +23,7 @@ import org.apache.avro.Schema; import org.apache.avro.specific.SpecificRecord; import org.radarcns.data.Record; -import org.radarcns.key.MeasurementKey; +import org.radarcns.kafka.ObservationKey; /** * It computes the expected value for a test case. @@ -35,7 +35,7 @@ public abstract class ExpectedValue { private final int[] valuePos; private Long lastTimestamp; - private MeasurementKey lastKey; + private ObservationKey lastKey; private V lastValue; private final Map series; @@ -97,7 +97,7 @@ public Map getSeries() { * Add a new record to the series of expected values. * @param record record to add */ - public void add(Record record) { + public void add(Record record) { if (timeReceivedPos == -1) { throw new IllegalStateException("Cannot parse record without a schema."); } @@ -116,7 +116,7 @@ public void add(Record record) { * @param timeMillis time the record is received * @param values values to add */ - public void add(MeasurementKey key, long timeMillis, Object... values) { + public void add(ObservationKey key, long timeMillis, Object... values) { this.lastKey = key; if (timeMillis >= lastTimestamp + DURATION || lastValue == null) { lastTimestamp = timeMillis - (timeMillis % DURATION); @@ -126,7 +126,7 @@ public void add(MeasurementKey key, long timeMillis, Object... values) { addToValue(lastValue, values); } - public MeasurementKey getLastKey() { + public ObservationKey getLastKey() { return lastKey; } } diff --git a/testing/src/main/java/org/radarcns/mock/model/MockAggregator.java b/radar-commons-testing/src/main/java/org/radarcns/mock/model/MockAggregator.java similarity index 96% rename from testing/src/main/java/org/radarcns/mock/model/MockAggregator.java rename to radar-commons-testing/src/main/java/org/radarcns/mock/model/MockAggregator.java index 51108d09..7f6e8bb8 100644 --- a/testing/src/main/java/org/radarcns/mock/model/MockAggregator.java +++ b/radar-commons-testing/src/main/java/org/radarcns/mock/model/MockAggregator.java @@ -23,7 +23,7 @@ import java.util.List; import java.util.Map; import org.apache.avro.Schema; -import org.radarcns.key.MeasurementKey; +import org.radarcns.kafka.ObservationKey; import org.radarcns.mock.config.MockDataConfig; import org.radarcns.mock.data.MockCsvParser; import org.slf4j.Logger; @@ -63,7 +63,7 @@ public static Map getSimulations( continue; } - try (MockCsvParser parser = new MockCsvParser<>(config, root)) { + try (MockCsvParser parser = new MockCsvParser<>(config, root)) { Schema valueSchema = config.parseAvroTopic().getValueSchema(); List valueFields = config.getValueFields(); diff --git a/testing/src/main/java/org/radarcns/util/CsvParser.java b/radar-commons-testing/src/main/java/org/radarcns/util/CsvParser.java similarity index 100% rename from testing/src/main/java/org/radarcns/util/CsvParser.java rename to radar-commons-testing/src/main/java/org/radarcns/util/CsvParser.java diff --git a/testing/src/main/java/org/radarcns/util/CsvWriter.java b/radar-commons-testing/src/main/java/org/radarcns/util/CsvWriter.java similarity index 100% rename from testing/src/main/java/org/radarcns/util/CsvWriter.java rename to radar-commons-testing/src/main/java/org/radarcns/util/CsvWriter.java diff --git a/testing/src/main/java/org/radarcns/util/Metronome.java b/radar-commons-testing/src/main/java/org/radarcns/util/Metronome.java similarity index 100% rename from testing/src/main/java/org/radarcns/util/Metronome.java rename to radar-commons-testing/src/main/java/org/radarcns/util/Metronome.java diff --git a/testing/src/main/java/org/radarcns/util/Oscilloscope.java b/radar-commons-testing/src/main/java/org/radarcns/util/Oscilloscope.java similarity index 100% rename from testing/src/main/java/org/radarcns/util/Oscilloscope.java rename to radar-commons-testing/src/main/java/org/radarcns/util/Oscilloscope.java diff --git a/testing/src/test/java/org/radarcns/mock/CsvGeneratorTest.java b/radar-commons-testing/src/test/java/org/radarcns/mock/CsvGeneratorTest.java similarity index 77% rename from testing/src/test/java/org/radarcns/mock/CsvGeneratorTest.java rename to radar-commons-testing/src/test/java/org/radarcns/mock/CsvGeneratorTest.java index bf4d2911..972727eb 100644 --- a/testing/src/test/java/org/radarcns/mock/CsvGeneratorTest.java +++ b/radar-commons-testing/src/test/java/org/radarcns/mock/CsvGeneratorTest.java @@ -16,7 +16,15 @@ package org.radarcns.mock; -import static org.junit.Assert.*; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.radarcns.kafka.ObservationKey; +import org.radarcns.mock.config.MockDataConfig; +import org.radarcns.mock.data.CsvGenerator; +import org.radarcns.mock.data.MockRecordValidatorTest; +import org.radarcns.mock.data.RecordGenerator; +import org.radarcns.util.CsvParser; import java.io.BufferedReader; import java.io.File; @@ -27,27 +35,16 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.radarcns.key.MeasurementKey; -import org.radarcns.mock.config.MockDataConfig; -import org.radarcns.mock.data.CsvGenerator; -import org.radarcns.mock.data.RecordGenerator; -import org.radarcns.phone.PhoneLight; -import org.radarcns.util.CsvParser; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; public class CsvGeneratorTest { @Rule public TemporaryFolder folder = new TemporaryFolder(); private MockDataConfig makeConfig() throws IOException { - MockDataConfig config = new MockDataConfig(); - config.setDataFile(folder.newFile().getAbsolutePath()); - config.setValueSchema(PhoneLight.class.getName()); - config.setValueField("light"); - config.setTopic("test"); - return config; + return MockRecordValidatorTest.makeConfig(folder); } @Test @@ -59,13 +56,13 @@ public void generateMockConfig() throws IOException { CsvParser parser = new CsvParser(new BufferedReader(new FileReader(config.getDataFile()))); List headers = Arrays.asList( - "userId", "sourceId", "time", "timeReceived", "light"); + "projectId", "userId", "sourceId", "time", "timeReceived", "light"); assertEquals(headers, parser.parseLine()); int n = 0; List line; while ((line = parser.parseLine()) != null) { - String value = line.get(4); + String value = line.get(5); assertNotEquals("NaN", value); assertNotEquals("Infinity", value); assertNotEquals("-Infinity", value); @@ -86,12 +83,12 @@ public void generateGenerator() final String time = Double.toString(System.currentTimeMillis() / 1000d); - RecordGenerator recordGenerator = new RecordGenerator( - config, MeasurementKey.class) { + RecordGenerator recordGenerator = new RecordGenerator( + config, ObservationKey.class) { @Override - public Iterator> iterateRawValues(MeasurementKey key, long duration) { + public Iterator> iterateRawValues(ObservationKey key, long duration) { return Collections.singletonList( - Arrays.asList("UserID_0", "SourceID_0", time, time, + Arrays.asList("test", "UserID_0", "SourceID_0", time, time, Float.valueOf((float)0.123112412410423518).toString())) .iterator(); } @@ -102,7 +99,7 @@ public Iterator> iterateRawValues(MeasurementKey key, long duration CsvParser parser = new CsvParser(new BufferedReader(new FileReader(config.getDataFile()))); assertEquals(recordGenerator.getHeader(), parser.parseLine()); // float will cut off a lot of decimals - assertEquals(Arrays.asList("UserID_0", "SourceID_0", time, time, "0.12311241"), + assertEquals(Arrays.asList("test", "UserID_0", "SourceID_0", time, time, "0.12311241"), parser.parseLine()); } } \ No newline at end of file diff --git a/testing/src/test/java/org/radarcns/mock/RecordGeneratorTest.java b/radar-commons-testing/src/test/java/org/radarcns/mock/RecordGeneratorTest.java similarity index 75% rename from testing/src/test/java/org/radarcns/mock/RecordGeneratorTest.java rename to radar-commons-testing/src/test/java/org/radarcns/mock/RecordGeneratorTest.java index f4c95d09..73201b48 100644 --- a/testing/src/test/java/org/radarcns/mock/RecordGeneratorTest.java +++ b/radar-commons-testing/src/test/java/org/radarcns/mock/RecordGeneratorTest.java @@ -23,8 +23,8 @@ import org.apache.avro.specific.SpecificRecord; import org.junit.Test; import org.radarcns.data.Record; -import org.radarcns.empatica.EmpaticaE4Acceleration; -import org.radarcns.key.MeasurementKey; +import org.radarcns.passive.empatica.EmpaticaE4Acceleration; +import org.radarcns.kafka.ObservationKey; import org.radarcns.mock.config.MockDataConfig; import org.radarcns.mock.data.RecordGenerator; @@ -43,13 +43,13 @@ public void generate() throws Exception { config.setValueFields(Arrays.asList("x", "y", "z")); config.setValueSchema(EmpaticaE4Acceleration.class.getName()); - RecordGenerator generator = new RecordGenerator<>(config, - MeasurementKey.class); - Iterator> iter = generator - .iterateValues(new MeasurementKey("a", "b"), 0); - Record record = iter.next(); + RecordGenerator generator = new RecordGenerator<>(config, + ObservationKey.class); + Iterator> iter = generator + .iterateValues(new ObservationKey("test", "a", "b"), 0); + Record record = iter.next(); assertEquals(0, record.offset); - assertEquals(new MeasurementKey("a", "b"), record.key); + assertEquals(new ObservationKey("test", "a", "b"), record.key); float x = ((EmpaticaE4Acceleration)record.value).getX(); assertTrue(x >= 0.1f && x < 9.9f); float y = ((EmpaticaE4Acceleration)record.value).getX(); @@ -60,7 +60,7 @@ public void generate() throws Exception { assertTrue(time > System.currentTimeMillis() / 1000d - 1d && time <= System.currentTimeMillis() / 1000d); - Record nextRecord = iter.next(); + Record nextRecord = iter.next(); assertEquals(1, nextRecord.offset); assertEquals(time + 0.1d, (Double)nextRecord.value.get(0), 1e-6); } @@ -71,10 +71,11 @@ public void getHeaders() throws Exception { config.setTopic("test"); config.setValueSchema(EmpaticaE4Acceleration.class.getName()); - RecordGenerator generator = new RecordGenerator<>(config, - MeasurementKey.class); + RecordGenerator generator = new RecordGenerator<>(config, + ObservationKey.class); assertEquals( - Arrays.asList("userId", "sourceId", "time", "timeReceived", "x", "y", "z"), + Arrays.asList("projectId", "userId", "sourceId", + "time", "timeReceived", "x", "y", "z"), generator.getHeader()); } } \ No newline at end of file diff --git a/testing/src/test/java/org/radarcns/mock/data/MockRecordValidatorTest.java b/radar-commons-testing/src/test/java/org/radarcns/mock/data/MockRecordValidatorTest.java similarity index 75% rename from testing/src/test/java/org/radarcns/mock/data/MockRecordValidatorTest.java rename to radar-commons-testing/src/test/java/org/radarcns/mock/data/MockRecordValidatorTest.java index 190d8c9c..ae0d15a3 100644 --- a/testing/src/test/java/org/radarcns/mock/data/MockRecordValidatorTest.java +++ b/radar-commons-testing/src/test/java/org/radarcns/mock/data/MockRecordValidatorTest.java @@ -24,8 +24,8 @@ import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.radarcns.mock.config.MockDataConfig; -import org.radarcns.phone.PhoneAcceleration; -import org.radarcns.phone.PhoneLight; +import org.radarcns.passive.phone.PhoneAcceleration; +import org.radarcns.passive.phone.PhoneLight; public class MockRecordValidatorTest { @Rule @@ -35,6 +35,10 @@ public class MockRecordValidatorTest { public ExpectedException exception = ExpectedException.none(); private MockDataConfig makeConfig() throws IOException { + return makeConfig(folder); + } + + public static MockDataConfig makeConfig(TemporaryFolder folder) throws IOException { MockDataConfig config = new MockDataConfig(); config.setDataFile(folder.newFile().getAbsolutePath()); config.setValueSchema(PhoneLight.class.getName()); @@ -70,9 +74,9 @@ public void validateCustom() throws Exception { MockDataConfig config = makeConfig(); try (FileWriter writer = new FileWriter(config.getDataFile(folder.getRoot()))) { - writer.append("userId,sourceId,time,timeReceived,light\n"); - writer.append("a,b,1,1,1\n"); - writer.append("a,b,1,2,1\n"); + writer.append("projectId,userId,sourceId,time,timeReceived,light\n"); + writer.append("test,a,b,1,1,1\n"); + writer.append("test,a,b,1,2,1\n"); } new MockRecordValidator(config, 2_000L, folder.getRoot()).validate(); @@ -83,9 +87,9 @@ public void validateWrongKey() throws Exception { MockDataConfig config = makeConfig(); try (FileWriter writer = new FileWriter(config.getDataFile(folder.getRoot()))) { - writer.append("userId,sourceId,time,timeReceived,light\n"); - writer.append("a,b,1,1,1\n"); - writer.append("a,c,1,2,1\n"); + writer.append("projectId,userId,sourceId,time,timeReceived,light\n"); + writer.append("test,a,b,1,1,1\n"); + writer.append("test,a,c,1,2,1\n"); } exception.expect(IllegalArgumentException.class); @@ -97,9 +101,9 @@ public void validateWrongTime() throws Exception { MockDataConfig config = makeConfig(); try (FileWriter writer = new FileWriter(config.getDataFile(folder.getRoot()))) { - writer.append("userId,sourceId,time,timeReceived,light\n"); - writer.append("a,b,1,1,1\n"); - writer.append("a,b,1,0,1\n"); + writer.append("projectId,userId,sourceId,time,timeReceived,light\n"); + writer.append("test,a,b,1,1,1\n"); + writer.append("test,a,b,1,0,1\n"); } exception.expect(IllegalArgumentException.class); @@ -112,9 +116,9 @@ public void validateMissingKeyField() throws Exception { MockDataConfig config = makeConfig(); try (FileWriter writer = new FileWriter(config.getDataFile(folder.getRoot()))) { - writer.append("userId,time,timeReceived,light\n"); - writer.append("a,1,1,1\n"); - writer.append("a,1,2,1\n"); + writer.append("projectId,userId,time,timeReceived,light\n"); + writer.append("test,a,1,1,1\n"); + writer.append("test,a,1,2,1\n"); } exception.expect(NullPointerException.class); @@ -126,9 +130,9 @@ public void validateMissingValueField() throws Exception { MockDataConfig config = makeConfig(); try (FileWriter writer = new FileWriter(config.getDataFile(folder.getRoot()))) { - writer.append("userId,sourceId,time,light\n"); - writer.append("a,b,1,1\n"); - writer.append("a,b,1,2\n"); + writer.append("projectId,userId,sourceId,time,light\n"); + writer.append("test,a,b,1,1\n"); + writer.append("test,a,b,1,2\n"); } exception.expect(NullPointerException.class); @@ -140,9 +144,9 @@ public void validateMissingValue() throws Exception { MockDataConfig config = makeConfig(); try (FileWriter writer = new FileWriter(config.getDataFile(folder.getRoot()))) { - writer.append("userId,sourceId,time,timeReceived,light\n"); - writer.append("a,b,1,1\n"); - writer.append("a,b,1,2,1\n"); + writer.append("projectId,userId,sourceId,time,timeReceived,light\n"); + writer.append("test,a,b,1,1\n"); + writer.append("test,a,b,1,2,1\n"); } exception.expect(IllegalArgumentException.class); @@ -154,9 +158,9 @@ public void validateWrongValueType() throws Exception { MockDataConfig config = makeConfig(); try (FileWriter writer = new FileWriter(config.getDataFile(folder.getRoot()))) { - writer.append("userId,sourceId,time,timeReceived,light\n"); - writer.append("a,b,1,1,a\n"); - writer.append("a,b,1,2,b\n"); + writer.append("projectId,userId,sourceId,time,timeReceived,light\n"); + writer.append("test,a,b,1,1,a\n"); + writer.append("test,a,b,1,2,b\n"); } exception.expect(NumberFormatException.class); @@ -170,9 +174,9 @@ public void validateMultipleFields() throws Exception { config.setValueFields(Arrays.asList("x", "y", "z")); try (FileWriter writer = new FileWriter(config.getDataFile(folder.getRoot()))) { - writer.append("userId,sourceId,time,timeReceived,x,y,z\n"); - writer.append("a,b,1,1,1,1,1\n"); - writer.append("a,b,1,2,1,1,1\n"); + writer.append("projectId,userId,sourceId,time,timeReceived,x,y,z\n"); + writer.append("test,a,b,1,1,1,1,1\n"); + writer.append("test,a,b,1,2,1,1,1\n"); } new MockRecordValidator(config, 2_000L, folder.getRoot()).validate(); diff --git a/testing/src/test/java/org/radarcns/util/CsvParserTest.java b/radar-commons-testing/src/test/java/org/radarcns/util/CsvParserTest.java similarity index 100% rename from testing/src/test/java/org/radarcns/util/CsvParserTest.java rename to radar-commons-testing/src/test/java/org/radarcns/util/CsvParserTest.java diff --git a/testing/src/test/java/org/radarcns/util/CsvWriterTest.java b/radar-commons-testing/src/test/java/org/radarcns/util/CsvWriterTest.java similarity index 100% rename from testing/src/test/java/org/radarcns/util/CsvWriterTest.java rename to radar-commons-testing/src/test/java/org/radarcns/util/CsvWriterTest.java diff --git a/testing/src/test/java/org/radarcns/util/MetronomeTest.java b/radar-commons-testing/src/test/java/org/radarcns/util/MetronomeTest.java similarity index 100% rename from testing/src/test/java/org/radarcns/util/MetronomeTest.java rename to radar-commons-testing/src/test/java/org/radarcns/util/MetronomeTest.java diff --git a/testing/src/test/java/org/radarcns/util/OscilloscopeTest.java b/radar-commons-testing/src/test/java/org/radarcns/util/OscilloscopeTest.java similarity index 100% rename from testing/src/test/java/org/radarcns/util/OscilloscopeTest.java rename to radar-commons-testing/src/test/java/org/radarcns/util/OscilloscopeTest.java diff --git a/settings.gradle b/settings.gradle index 798e1360..2aa63da0 100644 --- a/settings.gradle +++ b/settings.gradle @@ -16,5 +16,5 @@ rootProject.name = 'radar-commons' -include ':testing' +include ':radar-commons-testing' diff --git a/src/main/java/org/radarcns/config/AvroTopicConfig.java b/src/main/java/org/radarcns/config/AvroTopicConfig.java index 63e19f50..3408b7d8 100644 --- a/src/main/java/org/radarcns/config/AvroTopicConfig.java +++ b/src/main/java/org/radarcns/config/AvroTopicConfig.java @@ -17,14 +17,11 @@ package org.radarcns.config; import com.fasterxml.jackson.annotation.JsonProperty; -import java.lang.reflect.InvocationTargetException; -import java.util.List; -import java.util.Objects; -import org.apache.avro.Schema; -import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificRecord; import org.radarcns.topic.AvroTopic; +import java.util.List; + /** * Specifies an Avro topic */ @@ -38,29 +35,17 @@ public class AvroTopicConfig { /** * Parse an AvroTopic from the values in this class. + * + * @throws IllegalStateException if the key_schema or value_schema properties are not valid + * Avro SpecificRecord classes */ - @SuppressWarnings("unchecked") - public AvroTopic parseAvroTopic() - throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, - IllegalAccessException { - - Objects.requireNonNull(this.topic, "topic needs to be specified"); - Objects.requireNonNull(this.keySchema, "key_schema needs to be specified"); - Objects.requireNonNull(this.valueSchema, "value_schema needs to be specified"); - - Class keyClass = (Class) Class.forName(this.keySchema); - Schema keyAvroSchema = (Schema) keyClass - .getMethod("getClassSchema").invoke(null); - // check instantiation - SpecificData.newInstance(keyClass, keyAvroSchema); - - Class valueClass = (Class) Class.forName(this.valueSchema); - Schema valueAvroSchema = (Schema) valueClass - .getMethod("getClassSchema").invoke(null); - // check instantiation - SpecificData.newInstance(valueClass, valueAvroSchema); - - return new AvroTopic<>(topic, keyAvroSchema, valueAvroSchema, keyClass, valueClass); + public AvroTopic parseAvroTopic() { + try { + return AvroTopic.parse(topic, keySchema, valueSchema); + } catch (IllegalArgumentException ex) { + throw new IllegalStateException("Topic " + topic + + " schema cannot be instantiated", ex); + } } public String getTopic() { diff --git a/src/main/java/org/radarcns/producer/rest/RestClient.java b/src/main/java/org/radarcns/producer/rest/RestClient.java index edac7055..7986806e 100644 --- a/src/main/java/org/radarcns/producer/rest/RestClient.java +++ b/src/main/java/org/radarcns/producer/rest/RestClient.java @@ -16,6 +16,19 @@ package org.radarcns.producer.rest; +import okhttp3.OkHttpClient; +import okhttp3.OkHttpClient.Builder; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.ResponseBody; +import org.radarcns.config.ServerConfig; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; import java.io.Closeable; import java.io.IOException; import java.net.MalformedURLException; @@ -25,17 +38,6 @@ import java.security.cert.CertificateException; import java.util.Objects; import java.util.concurrent.TimeUnit; -import javax.net.ssl.HostnameVerifier; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSession; -import javax.net.ssl.SSLSocketFactory; -import javax.net.ssl.TrustManager; -import javax.net.ssl.X509TrustManager; -import okhttp3.OkHttpClient; -import okhttp3.OkHttpClient.Builder; -import okhttp3.Request; -import okhttp3.Response; -import org.radarcns.config.ServerConfig; /** REST client using OkHttp3. This class is not thread-safe. */ public class RestClient implements Closeable { @@ -194,6 +196,32 @@ public Response request(String relativePath) throws IOException { return request(requestBuilder(relativePath).build()); } + /** + * Make a blocking request and return the body. + * @param request request to make. + * @return response body string. + * @throws RestException if no body was returned or an HTTP status code indicating error was + * returned. + * @throws IOException if the request cannot be completed or the response cannot be read. + * + */ + public String requestString(Request request) throws IOException { + try (Response response = request(request)) { + ResponseBody body = response.body(); + + String bodyString = null; + + if (body != null) { + bodyString = body.string(); + } + if (!response.isSuccessful() || bodyString == null) { + throw new RestException(response.code(), bodyString); + } + + return bodyString; + } + } + /** * Create a OkHttp3 request builder with {@link Request.Builder#url(URL)} set. * Call{@link Request.Builder#build()} to make the actual request with diff --git a/src/main/java/org/radarcns/producer/rest/RestException.java b/src/main/java/org/radarcns/producer/rest/RestException.java new file mode 100644 index 00000000..58bd7052 --- /dev/null +++ b/src/main/java/org/radarcns/producer/rest/RestException.java @@ -0,0 +1,27 @@ +package org.radarcns.producer.rest; + +import java.io.IOException; + +public class RestException extends IOException { + private final int statusCode; + private final String body; + + public RestException(int statusCode, String body) { + this(statusCode, body, null); + } + + public RestException(int statusCode, String body, Throwable cause) { + super("REST call failed (HTTP code " + statusCode + "): " + + body.substring(0, Math.min(512, body.length())), cause); + this.statusCode = statusCode; + this.body = body; + } + + public int getStatusCode() { + return statusCode; + } + + public String getBody() { + return body; + } +} diff --git a/src/main/java/org/radarcns/producer/rest/RestSender.java b/src/main/java/org/radarcns/producer/rest/RestSender.java index 92f74d52..9512bd81 100644 --- a/src/main/java/org/radarcns/producer/rest/RestSender.java +++ b/src/main/java/org/radarcns/producer/rest/RestSender.java @@ -16,7 +16,6 @@ package org.radarcns.producer.rest; -import com.fasterxml.jackson.core.JsonFactory; import okhttp3.Headers; import okhttp3.HttpUrl; import okhttp3.MediaType; @@ -72,7 +71,6 @@ public class RestSender implements KafkaSender { private final AvroEncoder keyEncoder; private final AvroEncoder valueEncoder; - private final JsonFactory jsonFactory; private HttpUrl schemalessKeyUrl; private HttpUrl schemalessValueUrl; @@ -101,7 +99,6 @@ private RestSender(RestClient httpClient, SchemaRetriever schemaRetriever, this.schemaRetriever = schemaRetriever; this.keyEncoder = keyEncoder; this.valueEncoder = valueEncoder; - this.jsonFactory = new JsonFactory(); this.useCompression = useCompression; this.acceptType = KAFKA_REST_ACCEPT_ENCODING; this.contentType = KAFKA_REST_AVRO_ENCODING; @@ -195,7 +192,7 @@ private RestTopicSender(AvroTopic topic) throws IOException { if (url == null) { throw new MalformedURLException("Cannot parse " + rawUrl); } - requestData = new TopicRequestData<>(topic, keyEncoder, valueEncoder, jsonFactory); + requestData = new TopicRequestData<>(topic, keyEncoder, valueEncoder); } @Override @@ -451,6 +448,12 @@ public Builder connectionPool(ManagedConnectionPool pool) { return this; } + public Builder headers(Headers headers) { + additionalHeaders = headers.newBuilder(); + return this; + } + + @Deprecated public Builder headers(List> headers) { additionalHeaders = new Headers.Builder(); for (Entry header : headers) { diff --git a/src/main/java/org/radarcns/producer/rest/SchemaRetriever.java b/src/main/java/org/radarcns/producer/rest/SchemaRetriever.java index 7d3477c8..a5f29e12 100644 --- a/src/main/java/org/radarcns/producer/rest/SchemaRetriever.java +++ b/src/main/java/org/radarcns/producer/rest/SchemaRetriever.java @@ -16,33 +16,31 @@ package org.radarcns.producer.rest; -import com.fasterxml.jackson.core.JsonEncoding; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectReader; -import java.io.Closeable; -import java.io.IOException; -import java.io.OutputStream; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import okhttp3.MediaType; import okhttp3.Request; import okhttp3.RequestBody; -import okhttp3.Response; import okio.BufferedSink; import org.apache.avro.Schema; import org.apache.avro.Schema.Type; import org.apache.avro.generic.GenericContainer; +import org.json.JSONObject; import org.radarcns.config.ServerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Retriever of an Avro Schema */ +import java.io.Closeable; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** Retriever of an Avro Schema. + * + * Internally, only {@link JSONObject} is used to manage JSON data, to keep the class as lean as + * possible. + */ public class SchemaRetriever implements Closeable { private static final Logger logger = LoggerFactory.getLogger(SchemaRetriever.class); private static final MediaType CONTENT_TYPE = MediaType.parse( @@ -61,15 +59,11 @@ public class SchemaRetriever implements Closeable { } private final ConcurrentMap cache; - private final ObjectReader reader; - private final JsonFactory jsonFactory; private RestClient httpClient; public SchemaRetriever(ServerConfig config, long connectionTimeout) { Objects.requireNonNull(config); cache = new ConcurrentHashMap<>(); - jsonFactory = new JsonFactory(); - reader = new ObjectMapper(jsonFactory).readerFor(JsonNode.class); httpClient = new RestClient(config, connectionTimeout, ManagedConnectionPool.GLOBAL_POOL); } @@ -105,18 +99,12 @@ protected ParsedSchemaMetadata retrieveSchemaMetadata(String subject, int versio .addHeader("Accept", "application/json") .build(); - try (Response response = restClient.request(request)) { - if (!response.isSuccessful()) { - throw new IOException("Cannot retrieve metadata " + request.url() - + " (HTTP " + response.code() + ": " + response.message() - + ") -> " + response.body().string()); - } - JsonNode node = reader.readTree(response.body().byteStream()); - int newVersion = version < 1 ? node.get("version").asInt() : version; - int schemaId = node.get("id").asInt(); - Schema schema = parseSchema(node.get("schema").asText()); - return new ParsedSchemaMetadata(schemaId, newVersion, schema); - } + String response = restClient.requestString(request); + JSONObject node = new JSONObject(response); + int newVersion = version < 1 ? node.getInt("version") : version; + int schemaId = node.getInt("id"); + Schema schema = parseSchema(node.getString("schema")); + return new ParsedSchemaMetadata(schemaId, newVersion, schema); } public ParsedSchemaMetadata getSchemaMetadata(String topic, boolean ofValue, int version) @@ -155,16 +143,10 @@ public void addSchemaMetadata(String topic, boolean ofValue, ParsedSchemaMetadat .post(new SchemaRequestBody(metadata.getSchema())) .build(); - try (Response response = restClient.request(request)) { - if (!response.isSuccessful()) { - throw new IOException("Cannot post schema to " + request.url() - + " (HTTP " + response.code() + ": " + response.message() - + ") -> " + response.body().string()); - } - JsonNode node = reader.readTree(response.body().byteStream()); - int schemaId = node.get("id").asInt(); - metadata.setId(schemaId); - } + String response = restClient.requestString(request); + JSONObject node = new JSONObject(response); + int schemaId = node.getInt("id"); + metadata.setId(schemaId); } cache.put(subject, metadata); } @@ -210,13 +192,9 @@ public MediaType contentType() { @Override public void writeTo(BufferedSink sink) throws IOException { - try (OutputStream out = sink.outputStream(); - JsonGenerator writer = jsonFactory.createGenerator(out, JsonEncoding.UTF8)) { - writer.writeStartObject(); - writer.writeFieldName("schema"); - writer.writeString(schema.toString()); - writer.writeEndObject(); - } + sink.writeUtf8("{\"schema\":"); + sink.writeUtf8(JSONObject.quote(schema.toString())); + sink.writeUtf8("}"); } } diff --git a/src/main/java/org/radarcns/producer/rest/TopicRequestData.java b/src/main/java/org/radarcns/producer/rest/TopicRequestData.java index 387121db..2ca4eacd 100644 --- a/src/main/java/org/radarcns/producer/rest/TopicRequestData.java +++ b/src/main/java/org/radarcns/producer/rest/TopicRequestData.java @@ -16,23 +16,23 @@ package org.radarcns.producer.rest; -import com.fasterxml.jackson.core.JsonEncoding; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; -import java.io.IOException; -import java.io.OutputStream; -import java.util.List; +import org.json.JSONObject; import org.radarcns.data.AvroEncoder; import org.radarcns.data.Record; import org.radarcns.topic.AvroTopic; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.List; + /** * Request data to submit records to the Kafka REST proxy. */ class TopicRequestData { private final AvroEncoder.AvroWriter keyWriter; private final AvroEncoder.AvroWriter valueWriter; - private final JsonFactory jsonFactory; private Integer keySchemaId; private Integer valueSchemaId; @@ -41,40 +41,58 @@ class TopicRequestData { private List> records; - TopicRequestData(AvroTopic topic, AvroEncoder keyEncoder, AvroEncoder valueEncoder, - JsonFactory jsonFactory) throws IOException { + TopicRequestData(AvroTopic topic, AvroEncoder keyEncoder, AvroEncoder valueEncoder) + throws IOException { keyWriter = keyEncoder.writer(topic.getKeySchema(), topic.getKeyClass()); valueWriter = valueEncoder.writer(topic.getValueSchema(), topic.getValueClass()); - this.jsonFactory = jsonFactory; } + /** + * Writes the current topic to a stream. This implementation does not use any JSON writers to + * write the data, but writes it directly to a stream. {@link JSONObject#quote(String, Writer)} + * is used to get the correct formatting. This makes the method as lean as possible. + * @param out OutputStream to write to. It is assumed to be buffered. + * @throws IOException if a superimposing stream could not be created + */ void writeToStream(OutputStream out) throws IOException { - try (JsonGenerator writer = jsonFactory.createGenerator(out, JsonEncoding.UTF8)) { - writer.writeStartObject(); + try (OutputStreamWriter writer = new OutputStreamWriter(out)) { + writer.append('{'); if (keySchemaId != null) { - writer.writeNumberField("key_schema_id", keySchemaId); + writer.append("\"key_schema_id\":").append(keySchemaId.toString()); } else { - writer.writeStringField("key_schema", keySchemaString); + writer.append("\"key_schema\":"); + JSONObject.quote(keySchemaString, writer); } - if (valueSchemaId != null) { - writer.writeNumberField("value_schema_id", valueSchemaId); + writer.append(",\"value_schema_id\":").append(valueSchemaId.toString()); } else { - writer.writeStringField("value_schema", valueSchemaString); + writer.append(",\"value_schema\":"); + JSONObject.quote(valueSchemaString, writer); } - - writer.writeArrayFieldStart("records"); - - for (Record record : records) { - writer.writeStartObject(); - writer.writeFieldName("key"); - writer.writeRawValue(new String(keyWriter.encode(record.key))); - writer.writeFieldName("value"); - writer.writeRawValue(new String(valueWriter.encode(record.value))); - writer.writeEndObject(); + writer.append(",\"records\":["); + + for (int i = 0; i < records.size(); i++) { + Record record = records.get(i); + + if (i == 0) { + writer.append("{\"key\":"); + } else { + writer.append(",{\"key\":"); + } + + // flush writer and write raw bytes to underlying stream + // flush so the data do not overlap. + writer.flush(); + out.write(keyWriter.encode(record.key)); + + writer.append(",\"value\":"); + // flush writer and write raw bytes to underlying stream + // flush so the data do not overlap. + writer.flush(); + out.write(valueWriter.encode(record.value)); + writer.append('}'); } - writer.writeEndArray(); - writer.writeEndObject(); + writer.append("]}"); } } diff --git a/src/main/java/org/radarcns/topic/AvroTopic.java b/src/main/java/org/radarcns/topic/AvroTopic.java index 3178a1ac..bc02b725 100644 --- a/src/main/java/org/radarcns/topic/AvroTopic.java +++ b/src/main/java/org/radarcns/topic/AvroTopic.java @@ -19,10 +19,13 @@ import org.apache.avro.Schema; import org.apache.avro.Schema.Type; import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import java.lang.reflect.InvocationTargetException; import java.util.List; +import java.util.Objects; -/** AvroTopic with schema. */ +/** Kafka topic with schema. */ public class AvroTopic extends KafkaTopic { private final Schema valueSchema; private final Schema keySchema; @@ -30,6 +33,14 @@ public class AvroTopic extends KafkaTopic { private final Class valueClass; private final Class keyClass; + /** + * Kafka topic with Avro schema. + * @param name topic name + * @param keySchema Avro schema for keys + * @param valueSchema Avro schema for values + * @param keyClass Java class for keys + * @param valueClass Java class for values + */ public AvroTopic(String name, Schema keySchema, Schema valueSchema, Class keyClass, Class valueClass) { @@ -55,22 +66,26 @@ public AvroTopic(String name, } } + /** Avro schema used for keys. */ public Schema getKeySchema() { return keySchema; } + /** Avro schema used for values. */ public Schema getValueSchema() { return valueSchema; } - public Class getValueClass() { - return valueClass; - } - + /** Java class used for keys. */ public Class getKeyClass() { return keyClass; } + /** Java class used for values. */ + public Class getValueClass() { + return valueClass; + } + /** * Tries to construct a new SpecificData instance of the value. * @return new empty SpecificData class @@ -85,6 +100,43 @@ public Schema.Type[] getValueFieldTypes() { return valueFieldTypes; } + /** + * Parse an AvroTopic. + * + * @throws IllegalArgumentException if the key_schema or value_schema properties are not valid + * Avro SpecificRecord classes + */ + @SuppressWarnings({"unchecked", "JavaReflectionMemberAccess"}) + public static AvroTopic parse( + String topic, String keySchema, String valueSchema) { + + try { + Objects.requireNonNull(topic, "topic needs to be specified"); + Objects.requireNonNull(keySchema, "key_schema needs to be specified"); + Objects.requireNonNull(valueSchema, "value_schema needs to be specified"); + + Class keyClass = (Class) Class.forName(keySchema); + Schema keyAvroSchema = (Schema) keyClass + .getMethod("getClassSchema").invoke(null); + // check instantiation + SpecificData.newInstance(keyClass, keyAvroSchema); + + Class valueClass = (Class) Class.forName(valueSchema); + Schema valueAvroSchema = (Schema) valueClass + .getMethod("getClassSchema").invoke(null); + // check instantiation + SpecificData.newInstance(valueClass, valueAvroSchema); + + return new AvroTopic<>(topic, keyAvroSchema, valueAvroSchema, keyClass, valueClass); + } catch (ClassNotFoundException + | NoSuchMethodException + | InvocationTargetException + | IllegalAccessException ex) { + throw new IllegalArgumentException("Topic " + topic + + " schema cannot be instantiated", ex); + } + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/src/main/java/org/radarcns/topic/KafkaTopic.java b/src/main/java/org/radarcns/topic/KafkaTopic.java index 49ec605f..d1b6fc17 100644 --- a/src/main/java/org/radarcns/topic/KafkaTopic.java +++ b/src/main/java/org/radarcns/topic/KafkaTopic.java @@ -21,7 +21,7 @@ /** * A topic that used by Apache Kafka. */ -public class KafkaTopic { +public class KafkaTopic implements Comparable { private final String name; private static final Pattern TOPIC_NAME_PATTERN = Pattern.compile("[a-zA-Z][a-zA-Z0-9_]*"); @@ -74,4 +74,9 @@ public int hashCode() { public String toString() { return getClass().getSimpleName() + "<" + name + ">"; } + + @Override + public int compareTo(KafkaTopic o) { + return name.compareTo(o.name); + } } diff --git a/src/main/java/org/radarcns/topic/SensorTopic.java b/src/main/java/org/radarcns/topic/SensorTopic.java index 4a2492f8..61c63b43 100644 --- a/src/main/java/org/radarcns/topic/SensorTopic.java +++ b/src/main/java/org/radarcns/topic/SensorTopic.java @@ -18,11 +18,13 @@ import org.apache.avro.Schema; import org.apache.avro.Schema.Type; +import org.apache.avro.specific.SpecificRecord; /** * AvroTopic used by sensors. This has additional verification on the schemas that are used compared * to AvroTopic. */ +@SuppressWarnings("PMD.UseUtilityClass") public class SensorTopic extends AvroTopic { public SensorTopic(String name, Schema keySchema, Schema valueSchema, Class keyClass, Class valueClass) { @@ -35,6 +37,9 @@ public SensorTopic(String name, Schema keySchema, Schema valueSchema, throw new IllegalArgumentException("Sensors must send records as values"); } + if (keySchema.getField("projectId") == null) { + throw new IllegalArgumentException("Key schema must have a project ID"); + } if (keySchema.getField("userId") == null) { throw new IllegalArgumentException("Key schema must have a user ID"); } @@ -48,4 +53,18 @@ public SensorTopic(String name, Schema keySchema, Schema valueSchema, throw new IllegalArgumentException("Schema must have timeReceived as a field"); } } + + /** + * Parse a SensorTopic. + * + * @throws IllegalArgumentException if the key_schema or value_schema properties are not valid + * Avro SpecificRecord classes + */ + public static SensorTopic parse( + String topic, String keySchema, String valueSchema) { + AvroTopic parseAvro = AvroTopic.parse(topic, keySchema, valueSchema); + return new SensorTopic<>(parseAvro.getName(), + parseAvro.getKeySchema(), parseAvro.getValueSchema(), + parseAvro.getKeyClass(), parseAvro.getValueClass()); + } } diff --git a/src/test/java/org/radarcns/data/SpecificRecordDecorderTest.java b/src/test/java/org/radarcns/data/SpecificRecordDecorderTest.java index d1cbe946..698e29b0 100644 --- a/src/test/java/org/radarcns/data/SpecificRecordDecorderTest.java +++ b/src/test/java/org/radarcns/data/SpecificRecordDecorderTest.java @@ -20,8 +20,8 @@ import java.io.IOException; import org.junit.Test; -import org.radarcns.empatica.EmpaticaE4BloodVolumePulse; -import org.radarcns.key.MeasurementKey; +import org.radarcns.passive.empatica.EmpaticaE4BloodVolumePulse; +import org.radarcns.kafka.ObservationKey; import org.radarcns.topic.AvroTopic; /** @@ -30,13 +30,14 @@ public class SpecificRecordDecorderTest { @Test - public void decordJson() throws IOException { + public void decodeJson() throws IOException { SpecificRecordDecoder decoder = new SpecificRecordDecoder(false); - AvroTopic topic = new AvroTopic<>("keeeeys", MeasurementKey.getClassSchema(), EmpaticaE4BloodVolumePulse.getClassSchema(), MeasurementKey.class, EmpaticaE4BloodVolumePulse.class); - AvroDecoder.AvroReader keyDecoder = decoder.reader(topic.getKeySchema(), topic.getKeyClass()); + AvroTopic topic = new AvroTopic<>("keeeeys", ObservationKey.getClassSchema(), EmpaticaE4BloodVolumePulse.getClassSchema(), ObservationKey.class, EmpaticaE4BloodVolumePulse.class); + AvroDecoder.AvroReader keyDecoder = decoder.reader(topic.getKeySchema(), topic.getKeyClass()); AvroDecoder.AvroReader valueDecoder = decoder.reader(topic.getValueSchema(), topic.getValueClass()); - MeasurementKey key = keyDecoder.decode("{\"userId\":\"a\",\"sourceId\":\"b\"}".getBytes()); + ObservationKey key = keyDecoder.decode("{\"projectId\":{\"string\":\"test\"},\"userId\":\"a\",\"sourceId\":\"b\"}".getBytes()); + assertEquals(key.get("projectId"), "test"); assertEquals(key.get("userId"), "a"); assertEquals(key.get("sourceId"), "b"); @@ -47,15 +48,20 @@ public void decordJson() throws IOException { } @Test - public void decordBinary() throws IOException { + public void decodeBinary() throws IOException { SpecificRecordDecoder decoder = new SpecificRecordDecoder(true); - AvroTopic topic = new AvroTopic<>("keeeeys", MeasurementKey.getClassSchema(), EmpaticaE4BloodVolumePulse.getClassSchema(), MeasurementKey.class, EmpaticaE4BloodVolumePulse.class); - AvroDecoder.AvroReader keyDecoder = decoder.reader(topic.getKeySchema(), topic.getKeyClass()); + AvroTopic topic = new AvroTopic<>("keeeeys", ObservationKey.getClassSchema(), EmpaticaE4BloodVolumePulse.getClassSchema(), ObservationKey.class, EmpaticaE4BloodVolumePulse.class); + AvroDecoder.AvroReader keyDecoder = decoder.reader(topic.getKeySchema(), topic.getKeyClass()); AvroDecoder.AvroReader valueDecoder = decoder.reader(topic.getValueSchema(), topic.getValueClass()); - byte[] inputKey = {2, 97, 2, 98}; - MeasurementKey key = keyDecoder.decode( inputKey); + // note that positive numbers are multiplied by two in avro binary encoding, due to the + // zig-zag encoding schema used. + // See http://avro.apache.org/docs/1.8.1/spec.html#binary_encoding + // type index 1, length 4, char t, char e, char s, char t, length 1, char a, length 1, char b + byte[] inputKey = {2, 8, 116, 101, 115, 116, 2, 97, 2, 98}; + ObservationKey key = keyDecoder.decode( inputKey); + assertEquals(key.get("projectId"), "test"); assertEquals(key.get("userId"), "a"); assertEquals(key.get("sourceId"), "b"); diff --git a/src/test/java/org/radarcns/data/SpecificRecordEncoderTest.java b/src/test/java/org/radarcns/data/SpecificRecordEncoderTest.java index 2174cace..8139aabd 100644 --- a/src/test/java/org/radarcns/data/SpecificRecordEncoderTest.java +++ b/src/test/java/org/radarcns/data/SpecificRecordEncoderTest.java @@ -18,8 +18,8 @@ import junit.framework.TestCase; -import org.radarcns.empatica.EmpaticaE4BloodVolumePulse; -import org.radarcns.key.MeasurementKey; +import org.radarcns.passive.empatica.EmpaticaE4BloodVolumePulse; +import org.radarcns.kafka.ObservationKey; import java.io.IOException; import java.util.Arrays; @@ -28,25 +28,28 @@ public class SpecificRecordEncoderTest extends TestCase { public void testJson() throws IOException { SpecificRecordEncoder encoder = new SpecificRecordEncoder(false); - AvroTopic topic = new AvroTopic<>("keeeeys", MeasurementKey.getClassSchema(), EmpaticaE4BloodVolumePulse.getClassSchema(), MeasurementKey.class, EmpaticaE4BloodVolumePulse.class); - AvroEncoder.AvroWriter keyEncoder = encoder.writer(topic.getKeySchema(), topic.getKeyClass()); + AvroTopic topic = new AvroTopic<>("keeeeys", ObservationKey.getClassSchema(), EmpaticaE4BloodVolumePulse.getClassSchema(), ObservationKey.class, EmpaticaE4BloodVolumePulse.class); + AvroEncoder.AvroWriter keyEncoder = encoder.writer(topic.getKeySchema(), topic.getKeyClass()); AvroEncoder.AvroWriter valueEncoder = encoder.writer(topic.getValueSchema(), topic.getValueClass()); - byte[] key = keyEncoder.encode(new MeasurementKey("a", "b")); + byte[] key = keyEncoder.encode(new ObservationKey("test", "a", "b")); byte[] value = valueEncoder.encode(new EmpaticaE4BloodVolumePulse(0d, 0d, 0f)); - assertEquals("{\"userId\":\"a\",\"sourceId\":\"b\"}", new String(key)); + assertEquals("{\"projectId\":{\"string\":\"test\"},\"userId\":\"a\",\"sourceId\":\"b\"}", new String(key)); assertEquals("{\"time\":0.0,\"timeReceived\":0.0,\"bloodVolumePulse\":0.0}", new String(value)); } public void testBinary() throws IOException { SpecificRecordEncoder encoder = new SpecificRecordEncoder(true); - AvroTopic topic = new AvroTopic<>("keeeeys", MeasurementKey.getClassSchema(), EmpaticaE4BloodVolumePulse.getClassSchema(), MeasurementKey.class, EmpaticaE4BloodVolumePulse.class); - AvroEncoder.AvroWriter keyEncoder = encoder.writer(topic.getKeySchema(), topic.getKeyClass()); + AvroTopic topic = new AvroTopic<>("keeeeys", ObservationKey.getClassSchema(), EmpaticaE4BloodVolumePulse.getClassSchema(), ObservationKey.class, EmpaticaE4BloodVolumePulse.class); + AvroEncoder.AvroWriter keyEncoder = encoder.writer(topic.getKeySchema(), topic.getKeyClass()); AvroEncoder.AvroWriter valueEncoder = encoder.writer(topic.getValueSchema(), topic.getValueClass()); - byte[] key = keyEncoder.encode(new MeasurementKey("a", "b")); - // start string, char a, start string, char b - byte[] expectedKey = {2, 97, 2, 98}; + byte[] key = keyEncoder.encode(new ObservationKey("test", "a", "b")); + // note that positive numbers are multiplied by two in avro binary encoding, due to the + // zig-zag encoding schema used. + // See http://avro.apache.org/docs/1.8.1/spec.html#binary_encoding + // type index 1, length 4, char t, char e, char s, char t, length 1, char a, length 1, char b + byte[] expectedKey = {2, 8, 116, 101, 115, 116, 2, 97, 2, 98}; System.out.println("key: 0x" + byteArrayToHex(key)); System.out.println("expected: 0x" + byteArrayToHex(expectedKey)); assertTrue(Arrays.equals(expectedKey, key)); diff --git a/src/test/java/org/radarcns/producer/rest/RestClientTest.java b/src/test/java/org/radarcns/producer/rest/RestClientTest.java index bb56521f..00a4f551 100644 --- a/src/test/java/org/radarcns/producer/rest/RestClientTest.java +++ b/src/test/java/org/radarcns/producer/rest/RestClientTest.java @@ -56,7 +56,7 @@ public void request() throws Exception { } @Test - public void requestString() throws Exception { + public void requestStringPath() throws Exception { server.enqueue(new MockResponse().setBody("{\"id\":10,\"version\":2,\"schema\":\"\\\"string\\\"\"}")); try (Response response = client.request("myPath")) { assertTrue(response.isSuccessful()); @@ -67,6 +67,22 @@ public void requestString() throws Exception { assertEquals("/base/myPath", recordedRequest.getPath()); } + @Test + public void requestString() throws Exception { + server.enqueue(new MockResponse().setBody("{\"id\":10,\"version\":2,\"schema\":\"\\\"string\\\"\"}")); + String response = client.requestString(client.requestBuilder("myPath").build()); + assertEquals("{\"id\":10,\"version\":2,\"schema\":\"\\\"string\\\"\"}", response); + RecordedRequest recordedRequest = server.takeRequest(); + assertEquals("GET", recordedRequest.getMethod()); + assertEquals("/base/myPath", recordedRequest.getPath()); + } + + @Test(expected = RestException.class) + public void requestStringEmpty() throws Exception { + server.enqueue(new MockResponse().setResponseCode(500)); + client.requestString(client.requestBuilder("myPath").build()); + } + @Test public void requestBuilder() throws Exception { Request.Builder builder = client.requestBuilder("myPath"); diff --git a/src/test/java/org/radarcns/producer/rest/RestSenderTest.java b/src/test/java/org/radarcns/producer/rest/RestSenderTest.java index a9b88ef0..cfbd32c0 100644 --- a/src/test/java/org/radarcns/producer/rest/RestSenderTest.java +++ b/src/test/java/org/radarcns/producer/rest/RestSenderTest.java @@ -32,8 +32,8 @@ import org.radarcns.config.ServerConfig; import org.radarcns.data.Record; import org.radarcns.data.SpecificRecordEncoder; -import org.radarcns.key.MeasurementKey; -import org.radarcns.phone.PhoneLight; +import org.radarcns.kafka.ObservationKey; +import org.radarcns.passive.phone.PhoneLight; import org.radarcns.producer.KafkaTopicSender; import org.radarcns.topic.AvroTopic; @@ -47,7 +47,7 @@ public class RestSenderTest { private SchemaRetriever retriever; - private RestSender sender; + private RestSender sender; @Rule public MockWebServer webServer = new MockWebServer(); @@ -58,7 +58,7 @@ public void setUp() { SpecificRecordEncoder encoder = new SpecificRecordEncoder(false); ServerConfig config = new ServerConfig(webServer.url("/").url()); - this.sender = new RestSender.Builder() + this.sender = new RestSender.Builder() .server(config) .schemaRetriever(retriever) .encoders(encoder, encoder) @@ -68,18 +68,18 @@ public void setUp() { @Test public void sender() throws Exception { - Schema keySchema = MeasurementKey.getClassSchema(); + Schema keySchema = ObservationKey.getClassSchema(); Schema valueSchema = PhoneLight.getClassSchema(); - AvroTopic topic = new AvroTopic<>("test", - keySchema, valueSchema, MeasurementKey.class, PhoneLight.class); + AvroTopic topic = new AvroTopic<>("test", + keySchema, valueSchema, ObservationKey.class, PhoneLight.class); Headers headers = new Headers.Builder() .add("Cookie", "ab") .add("Cookie", "bc") .build(); sender.setHeaders(headers); - KafkaTopicSender topicSender = sender.sender(topic); + KafkaTopicSender topicSender = sender.sender(topic); - MeasurementKey key = new MeasurementKey("a", "b"); + ObservationKey key = new ObservationKey("test","a", "b"); PhoneLight value = new PhoneLight(0.1, 0.2, 0.3f); ParsedSchemaMetadata keySchemaMetadata = new ParsedSchemaMetadata(10, 2, keySchema); ParsedSchemaMetadata valueSchemaMetadata = new ParsedSchemaMetadata(10, 2, valueSchema); @@ -118,13 +118,13 @@ public void sender() throws Exception { @Test public void sendTwo() throws Exception { - Schema keySchema = MeasurementKey.getClassSchema(); + Schema keySchema = ObservationKey.getClassSchema(); Schema valueSchema = PhoneLight.getClassSchema(); - AvroTopic topic = new AvroTopic<>("test", - keySchema, valueSchema, MeasurementKey.class, PhoneLight.class); - KafkaTopicSender topicSender = sender.sender(topic); + AvroTopic topic = new AvroTopic<>("test", + keySchema, valueSchema, ObservationKey.class, PhoneLight.class); + KafkaTopicSender topicSender = sender.sender(topic); - MeasurementKey key = new MeasurementKey("a", "b"); + ObservationKey key = new ObservationKey("test", "a", "b"); PhoneLight value = new PhoneLight(0.1, 0.2, 0.3f); ParsedSchemaMetadata keySchemaMetadata = new ParsedSchemaMetadata(10, 2, keySchema); ParsedSchemaMetadata valueSchemaMetadata = new ParsedSchemaMetadata(10, 2, valueSchema); @@ -193,13 +193,13 @@ public void withCompression() throws IOException, InterruptedException { webServer.enqueue(new MockResponse() .setHeader("Content-Type", "application/json; charset=utf-8") .setBody("{\"offset\": 100}")); - Schema keySchema = MeasurementKey.getClassSchema(); + Schema keySchema = ObservationKey.getClassSchema(); Schema valueSchema = PhoneLight.getClassSchema(); - AvroTopic topic = new AvroTopic<>("test", - keySchema, valueSchema, MeasurementKey.class, PhoneLight.class); - KafkaTopicSender topicSender = sender.sender(topic); + AvroTopic topic = new AvroTopic<>("test", + keySchema, valueSchema, ObservationKey.class, PhoneLight.class); + KafkaTopicSender topicSender = sender.sender(topic); - MeasurementKey key = new MeasurementKey("a", "b"); + ObservationKey key = new ObservationKey("test", "a", "b"); PhoneLight value = new PhoneLight(0.1, 0.2, 0.3f); ParsedSchemaMetadata keySchemaMetadata = new ParsedSchemaMetadata(10, 2, keySchema); ParsedSchemaMetadata valueSchemaMetadata = new ParsedSchemaMetadata(10, 2, valueSchema); diff --git a/src/test/java/org/radarcns/topic/KafkaTopicTest.java b/src/test/java/org/radarcns/topic/KafkaTopicTest.java new file mode 100644 index 00000000..b9adc804 --- /dev/null +++ b/src/test/java/org/radarcns/topic/KafkaTopicTest.java @@ -0,0 +1,49 @@ +package org.radarcns.topic; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import static org.junit.Assert.*; + +public class KafkaTopicTest { + @Test(expected = IllegalArgumentException.class) + public void nullArguments() { + new KafkaTopic(null); + } + + @Test(expected = IllegalArgumentException.class) + public void invalidTopicName() { + new KafkaTopic("bla$"); + } + + + @Test + public void getName() { + KafkaTopic topic = new KafkaTopic("aba"); + assertEquals("aba", topic.getName()); + } + + + @Test + public void compare() throws Exception { + final int randomSize = 100; + List randomString = new ArrayList<>(randomSize); + List randomTopic = new ArrayList<>(randomSize); + for (int i = 0; i < randomSize; i++) { + String str = 'a' + UUID.randomUUID().toString().replace('-', '_'); + randomString.add(str); + randomTopic.add(new KafkaTopic(str)); + } + + Collections.sort(randomString); + Collections.sort(randomTopic); + + for (int i = 0; i < randomSize; i++) { + assertEquals(randomString.get(i), randomTopic.get(i).getName()); + } + } +} diff --git a/src/test/java/org/radarcns/topic/SensorTopicTest.java b/src/test/java/org/radarcns/topic/SensorTopicTest.java index fc3f8a9a..845fabdb 100644 --- a/src/test/java/org/radarcns/topic/SensorTopicTest.java +++ b/src/test/java/org/radarcns/topic/SensorTopicTest.java @@ -16,13 +16,16 @@ package org.radarcns.topic; -import static org.junit.Assert.*; - +import com.fasterxml.jackson.databind.annotation.JsonTypeResolver; import org.apache.avro.Schema; import org.apache.avro.Schema.Type; import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericRecord; import org.junit.Test; +import org.radarcns.kafka.ObservationKey; +import org.radarcns.passive.phone.PhoneAcceleration; + +import static org.junit.Assert.assertEquals; /** * Created by joris on 05/07/2017. @@ -32,6 +35,7 @@ public class SensorTopicTest { @Test public void workingConstructor() { Schema keySchema = SchemaBuilder.record("key").fields() + .name("projectId").type(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING))).withDefault(null) .name("userId").type(Schema.create(Type.STRING)).noDefault() .name("sourceId").type(Schema.create(Type.STRING)).noDefault() .endRecord(); @@ -118,4 +122,31 @@ public void notARecord() { keySchema, valueSchema, GenericRecord.class, GenericRecord.class); } + + @Test + public void parseTopic() { + SensorTopic topic = SensorTopic.parse("test", + ObservationKey.class.getName(), PhoneAcceleration.class.getName()); + + SensorTopic expected = new SensorTopic<>("test", + ObservationKey.getClassSchema(), PhoneAcceleration.getClassSchema(), + ObservationKey.class, PhoneAcceleration.class); + + assertEquals(expected, topic); + } + + @Test(expected = IllegalArgumentException.class) + public void parseUnexistingKey() { + SensorTopic.parse("test", + "unexisting." + ObservationKey.class.getName(), + PhoneAcceleration.class.getName()); + } + + + @Test(expected = IllegalArgumentException.class) + public void parseUnexistingValue() { + SensorTopic.parse("test", + ObservationKey.class.getName(), + "unexisting." + PhoneAcceleration.class.getName()); + } } diff --git a/src/test/java/org/radarcns/util/serde/KafkaAvroSerializerTest.java b/src/test/java/org/radarcns/util/serde/KafkaAvroSerializerTest.java index b682fa50..6c937f48 100644 --- a/src/test/java/org/radarcns/util/serde/KafkaAvroSerializerTest.java +++ b/src/test/java/org/radarcns/util/serde/KafkaAvroSerializerTest.java @@ -33,7 +33,7 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.junit.Test; -import org.radarcns.key.MeasurementKey; +import org.radarcns.kafka.ObservationKey; import org.radarcns.producer.rest.SchemaRetriever; import org.radarcns.producer.rest.ParsedSchemaMetadata; @@ -44,7 +44,7 @@ public void serialize() throws Exception { testSerialization("this", Schema.create(Type.STRING)); testSerialization(10, Schema.create(Type.INT)); - MeasurementKey key = new MeasurementKey("a", "b"); + ObservationKey key = new ObservationKey("test", "a", "b"); testSerialization(key, key.getSchema()); Schema genericSchema = Schema.createRecord(Arrays.asList( diff --git a/src/test/resources/integration.yml b/src/test/resources/integration.yml index f62cb69c..c1b4f5e8 100644 --- a/src/test/resources/integration.yml +++ b/src/test/resources/integration.yml @@ -26,7 +26,7 @@ number_of_devices: 1 data: - file: integration_test.csv topic: integration_test - key_schema: org.radarcns.key.MeasurementKey + key_schema: org.radarcns.kafka.ObservationKey value_schema: org.radarcns.aggregator.DoubleArrayAggregator frequency: 32 sensor: TEMPERATURE \ No newline at end of file