Skip to content

Commit

Permalink
Merge pull request #44 from RADAR-CNS/release-0.7
Browse files Browse the repository at this point in the history
Release 0.7
  • Loading branch information
blootsvoets committed Dec 14, 2017
2 parents c6d439c + 1101d55 commit 63ef700
Show file tree
Hide file tree
Showing 25 changed files with 591 additions and 821 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ repositories {
}
dependencies {
compile group: 'org.radarcns', name: 'radar-commons', version: '0.6.3'
compile group: 'org.radarcns', name: 'radar-commons', version: '0.7'
}
```

Expand All @@ -26,7 +26,7 @@ repositories {
}
dependencies {
testCompile group: 'org.radarcns', name: 'radar-commons-testing', version: '0.6.3'
testCompile group: 'org.radarcns', name: 'radar-commons-testing', version: '0.7'
}
```

Expand All @@ -51,7 +51,7 @@ configurations.all {
}
dependencies {
compile group: 'org.radarcns', name: 'radar-commons', version: '0.6.4-SNAPSHOT', changing: true
compile group: 'org.radarcns', name: 'radar-commons', version: '0.7.1-SNAPSHOT', changing: true
}
```

Expand Down
24 changes: 11 additions & 13 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

plugins {
// Get bintray version
id 'com.jfrog.bintray' version '1.7.3'
id 'com.jfrog.artifactory' version '4.4.18'
id 'com.jfrog.bintray' version '1.8.0'
id 'com.jfrog.artifactory' version '4.5.4'
}

allprojects {
Expand All @@ -36,23 +36,21 @@ allprojects {
// Configuration //
//---------------------------------------------------------------------------//

version = '0.6.3'
version = '0.7'
group = 'org.radarcns'
ext.githubRepoName = 'RADAR-CNS/RADAR-Commons'

ext.slf4jVersion = '1.7.21'
ext.slf4jVersion = '1.7.25'
ext.kafkaVersion = '0.11.0.1'
ext.avroVersion = '1.8.2'
ext.confluentVersion = '3.3.0'
ext.log4jVersion = '2.7'
ext.jacksonVersion = '2.8.5'
ext.okhttpVersion = '3.8.0'
ext.confluentVersion = '3.3.1'
ext.jacksonVersion = '2.9.3'
ext.okhttpVersion = '3.9.1'
ext.junitVersion = '4.12'
ext.mockitoVersion = '2.2.29'
ext.mathVersion = '3.0'
ext.mockitoVersion = '2.13.0'
ext.hamcrestVersion = '1.3'
ext.codacyVersion = '1.0.10'
ext.radarSchemasVersion = '0.2'
ext.codacyVersion = '2.0.1'
ext.radarSchemasVersion = '0.2.3'
ext.orgJsonVersion = '20170516'

ext.githubUrl = 'https://github.com/' + githubRepoName + '.git'
Expand Down Expand Up @@ -366,6 +364,6 @@ artifactoryPublish {
}

task wrapper(type: Wrapper) {
gradleVersion = '4.1'
gradleVersion = '4.4'
distributionType 'all'
}
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.1-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-4.4-all.zip
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
public class MockDevice<K extends SpecificRecord> extends Thread {
private static final Logger logger = LoggerFactory.getLogger(MockDevice.class);
private final int baseFrequency;
private final KafkaSender<K, SpecificRecord> sender;
private final KafkaSender sender;
private final AtomicBoolean stopping;
private final List<RecordGenerator<K>> generators;
private final K key;
Expand All @@ -52,8 +52,7 @@ public class MockDevice<K extends SpecificRecord> extends Thread {
* @param key key to send all messages with
* @param generators data generators that produce the data we send
*/
public MockDevice(KafkaSender<K, SpecificRecord> sender, K key,
List<RecordGenerator<K>> generators) {
public MockDevice(KafkaSender sender, K key, List<RecordGenerator<K>> generators) {
this.generators = generators;
this.key = key;
baseFrequency = computeBaseFrequency(generators);
Expand Down Expand Up @@ -86,7 +85,7 @@ public void run() {
int frequency = generators.get(i).getConfig().getFrequency();
if (frequency > 0 && beat % (baseFrequency / frequency) == 0) {
Record<K, SpecificRecord> record = recordIterators.get(i).next();
topicSenders.get(i).send(record.offset, record.key, record.value);
topicSenders.get(i).send(record.key, record.value);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@

package org.radarcns.mock;

import java.io.IOException;
import org.apache.avro.specific.SpecificRecord;
import org.radarcns.data.AvroRecordData;
import org.radarcns.data.Record;
import org.radarcns.kafka.ObservationKey;
import org.radarcns.mock.data.MockCsvParser;
import org.radarcns.producer.KafkaSender;
import org.radarcns.producer.KafkaTopicSender;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;

/**
* Send mock data from a CSV file.
*
Expand All @@ -33,8 +35,7 @@ public class MockFileSender {
private final KafkaSender sender;
private final MockCsvParser parser;

public MockFileSender(KafkaSender<ObservationKey, SpecificRecord> sender,
MockCsvParser parser) {
public MockFileSender(KafkaSender sender, MockCsvParser parser) {
this.parser = parser;
this.sender = sender;
}
Expand All @@ -46,10 +47,11 @@ public MockFileSender(KafkaSender<ObservationKey, SpecificRecord> sender,
@SuppressWarnings("unchecked")
public void send() throws IOException {
try (KafkaTopicSender topicSender = sender.sender(parser.getTopic())) {
Collection<Record> records = new ArrayList<>();
while (parser.hasNext()) {
Record<SpecificRecord, SpecificRecord> record = parser.next();
topicSender.send(record.offset, record.key, record.value);
records.add(parser.next());
}
topicSender.send(new AvroRecordData(parser.getTopic(), records));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,45 @@

package org.radarcns.mock;

import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import static org.radarcns.util.serde.AbstractKafkaAvroSerde.SCHEMA_REGISTRY_CONFIG;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.avro.specific.SpecificRecord;
import org.radarcns.config.ServerConfig;
import org.radarcns.config.YamlConfigLoader;
import org.radarcns.data.SpecificRecordEncoder;
import org.radarcns.kafka.ObservationKey;
import org.radarcns.mock.config.BasicMockConfig;
import org.radarcns.mock.config.MockDataConfig;
import org.radarcns.mock.data.MockCsvParser;
import org.radarcns.mock.data.RecordGenerator;
import org.radarcns.passive.empatica.EmpaticaE4Acceleration;
import org.radarcns.passive.empatica.EmpaticaE4BatteryLevel;
import org.radarcns.passive.empatica.EmpaticaE4BloodVolumePulse;
import org.radarcns.passive.empatica.EmpaticaE4ElectroDermalActivity;
import org.radarcns.passive.empatica.EmpaticaE4InterBeatInterval;
import org.radarcns.passive.empatica.EmpaticaE4Temperature;
import org.radarcns.kafka.ObservationKey;
import org.radarcns.mock.config.BasicMockConfig;
import org.radarcns.mock.config.MockDataConfig;
import org.radarcns.mock.data.MockCsvParser;
import org.radarcns.mock.data.RecordGenerator;
import org.radarcns.producer.BatchedKafkaSender;
import org.radarcns.producer.KafkaSender;
import org.radarcns.producer.rest.SchemaRetriever;
import org.radarcns.producer.direct.DirectSender;
import org.radarcns.producer.BatchedKafkaSender;
import org.radarcns.producer.rest.ConnectionState;
import org.radarcns.producer.rest.ManagedConnectionPool;
import org.radarcns.producer.rest.RestSender;
import org.radarcns.producer.rest.SchemaRetriever;
import org.radarcns.util.serde.KafkaAvroSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import static org.radarcns.util.serde.AbstractKafkaAvroSerde.SCHEMA_REGISTRY_CONFIG;

/**
* A Mock Producer class that can be used to stream data. It can use MockFileSender and MockDevice
* for testing purposes, with direct or indirect streaming.
Expand All @@ -66,7 +65,7 @@ public class MockProducer {

private final List<MockDevice<ObservationKey>> devices;
private final List<MockFileSender> files;
private final List<KafkaSender<ObservationKey, SpecificRecord>> senders;
private final List<KafkaSender> senders;
private final SchemaRetriever retriever;

/**
Expand All @@ -89,7 +88,7 @@ public MockProducer(BasicMockConfig mockConfig, File root) throws IOException {
int numDevices = mockConfig.getNumberOfDevices();

retriever = new SchemaRetriever(mockConfig.getSchemaRegistry(), 10);
List<KafkaSender<ObservationKey, SpecificRecord>> tmpSenders = null;
List<KafkaSender> tmpSenders = null;

try {
devices = new ArrayList<>(numDevices);
Expand Down Expand Up @@ -127,7 +126,7 @@ public MockProducer(BasicMockConfig mockConfig, File root) throws IOException {
}
} catch (Exception ex) {
if (tmpSenders != null) {
for (KafkaSender<?, ?> sender : tmpSenders) {
for (KafkaSender sender : tmpSenders) {
sender.close();
}
}
Expand All @@ -138,7 +137,7 @@ public MockProducer(BasicMockConfig mockConfig, File root) throws IOException {
senders = tmpSenders;
}

private List<KafkaSender<ObservationKey, SpecificRecord>> createSenders(
private List<KafkaSender> createSenders(
BasicMockConfig mockConfig, int numDevices) {

if (mockConfig.isDirectProducer()) {
Expand All @@ -150,42 +149,39 @@ private List<KafkaSender<ObservationKey, SpecificRecord>> createSenders(
}

/** Create senders that directly produce data to Kafka. */
private List<KafkaSender<ObservationKey, SpecificRecord>> createDirectSenders(int numDevices,
private List<KafkaSender> createDirectSenders(int numDevices,
SchemaRetriever retriever, String brokerPaths) {
List<KafkaSender<ObservationKey, SpecificRecord>> result = new ArrayList<>(numDevices);
List<KafkaSender> result = new ArrayList<>(numDevices);
for (int i = 0; i < numDevices; i++) {
Properties properties = new Properties();
properties.put(KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put(SCHEMA_REGISTRY_CONFIG, retriever);
properties.put(BOOTSTRAP_SERVERS_CONFIG, brokerPaths);

result.add(new DirectSender<ObservationKey, SpecificRecord>(properties));
result.add(new DirectSender(properties));
}
return result;
}

/** Create senders that produce data to Kafka via the REST proxy. */
private List<KafkaSender<ObservationKey, SpecificRecord>> createRestSenders(int numDevices,
private List<KafkaSender> createRestSenders(int numDevices,
SchemaRetriever retriever, ServerConfig restProxy, boolean useCompression) {
List<KafkaSender<ObservationKey, SpecificRecord>> result = new ArrayList<>(numDevices);
List<KafkaSender> result = new ArrayList<>(numDevices);
ConnectionState sharedState = new ConnectionState(10, TimeUnit.SECONDS);
RestSender.Builder<ObservationKey, SpecificRecord> restBuilder =
new RestSender.Builder<ObservationKey, SpecificRecord>()
RestSender.Builder restBuilder =
new RestSender.Builder()
.server(restProxy)
.schemaRetriever(retriever)
.useCompression(useCompression)
.encoders(new SpecificRecordEncoder(false),
new SpecificRecordEncoder(false))
.connectionState(sharedState)
.connectionTimeout(10, TimeUnit.SECONDS);

for (int i = 0; i < numDevices; i++) {
RestSender<ObservationKey, SpecificRecord> firstSender = restBuilder
RestSender firstSender = restBuilder
.connectionPool(new ManagedConnectionPool())
.build();

result.add(new BatchedKafkaSender<>(firstSender, 1_000, 1000));
result.add(new BatchedKafkaSender(firstSender, 1000, 1000));
}
return result;
}
Expand Down Expand Up @@ -214,7 +210,7 @@ public void shutdown() throws IOException, InterruptedException {
device.join(5_000L);
}
logger.info("Closing channels");
for (KafkaSender<ObservationKey, SpecificRecord> sender : senders) {
for (KafkaSender sender : senders) {
sender.close();
}
retriever.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public class MockCsvParser<K extends SpecificRecord> implements Closeable {
private final BufferedReader bufferedReader;
private final FileReader fileReader;
private List<String> currentLine;
private long offset;

/**
* Base constructor.
Expand All @@ -62,7 +61,7 @@ public MockCsvParser(MockDataConfig config, File root)
throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException,
IllegalAccessException, IOException {
//noinspection unchecked
topic = (AvroTopic<K, SpecificRecord>) config.parseAvroTopic();
topic = config.parseAvroTopic();

fileReader = new FileReader(config.getDataFile(root));
bufferedReader = new BufferedReader(fileReader);
Expand All @@ -73,7 +72,6 @@ public MockCsvParser(MockDataConfig config, File root)
headerMap.put(header.get(i), i);
}
currentLine = csvReader.parseLine();
offset = 0;
}

public AvroTopic getTopic() {
Expand All @@ -99,7 +97,7 @@ public Record<K, SpecificRecord> next() throws IOException {

currentLine = csvReader.parseLine();

return new Record<>(offset++, key, value);
return new Record<>(key, value);
}

/**
Expand Down
Loading

0 comments on commit 63ef700

Please sign in to comment.