Skip to content

Commit

Permalink
Merge pull request #71 from RADAR-base/release-0.3.2
Browse files Browse the repository at this point in the history
Release 0.3.2
  • Loading branch information
yatharthranjan committed Oct 20, 2020
2 parents fb42fc4 + a3f38c0 commit 9cb55d8
Show file tree
Hide file tree
Showing 13 changed files with 86 additions and 39 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ subprojects {
apply plugin: 'java-library'

group = 'org.radarbase'
version = '0.3.2-SNAPSHOT'
version = '0.3.2'

sourceCompatibility = 1.8
targetCompatibility = 1.8
Expand Down
16 changes: 8 additions & 8 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ services:
# Zookeeper Cluster #
#---------------------------------------------------------------------------#
zookeeper-1:
image: confluentinc/cp-zookeeper:5.1.0
image: confluentinc/cp-zookeeper:5.5.1
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
Expand All @@ -19,7 +19,7 @@ services:
ZOOKEEPER_SERVERS: zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888

zookeeper-2:
image: confluentinc/cp-zookeeper:5.1.0
image: confluentinc/cp-zookeeper:5.5.1
environment:
ZOOKEEPER_SERVER_ID: 2
ZOOKEEPER_CLIENT_PORT: 2181
Expand All @@ -29,7 +29,7 @@ services:
ZOOKEEPER_SERVERS: zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888

zookeeper-3:
image: confluentinc/cp-zookeeper:5.1.0
image: confluentinc/cp-zookeeper:5.5.1
environment:
ZOOKEEPER_SERVER_ID: 3
ZOOKEEPER_CLIENT_PORT: 2181
Expand All @@ -42,7 +42,7 @@ services:
# Kafka Cluster #
#---------------------------------------------------------------------------#
kafka-1:
image: confluentinc/cp-kafka:5.1.0
image: confluentinc/cp-kafka:5.5.1
depends_on:
- zookeeper-1
- zookeeper-2
Expand All @@ -61,7 +61,7 @@ services:
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false"

kafka-2:
image: confluentinc/cp-kafka:5.1.0
image: confluentinc/cp-kafka:5.5.1
depends_on:
- zookeeper-1
- zookeeper-2
Expand All @@ -80,7 +80,7 @@ services:
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false"

kafka-3:
image: confluentinc/cp-kafka:5.1.0
image: confluentinc/cp-kafka:5.5.1
depends_on:
- zookeeper-1
- zookeeper-2
Expand All @@ -102,7 +102,7 @@ services:
# Schema Registry #
#---------------------------------------------------------------------------#
schema-registry-1:
image: confluentinc/cp-schema-registry:5.1.0
image: confluentinc/cp-schema-registry:5.5.1
depends_on:
- zookeeper-1
- zookeeper-2
Expand All @@ -124,7 +124,7 @@ services:
# REST proxy #
#---------------------------------------------------------------------------#
rest-proxy-1:
image: confluentinc/cp-kafka-rest:5.1.0
image: confluentinc/cp-kafka-rest:5.5.1
depends_on:
- zookeeper-1
- zookeeper-2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,14 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okhttp3.Headers;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -74,19 +70,18 @@ public FitbitAvroConverter(AvroData avroData) {

@Override
public Collection<SourceRecord> convert(
RestRequest restRequest, Response response) throws IOException {
ResponseBody body = response.body();
if (body == null) {
RestRequest restRequest, Headers headers, byte[] data) throws IOException {
if (data == null) {
throw new IOException("Failed to read body");
}
JsonNode activities = JSON_READER.readTree(body.charStream());
JsonNode activities = JSON_READER.readTree(data);

User user = ((FitbitRestRequest) restRequest).getUser();
final SchemaAndValue key = user.getObservationKey(avroData);
double timeReceived = System.currentTimeMillis() / 1000d;

return processRecords((FitbitRestRequest)restRequest, activities, timeReceived)
.filter(Objects::nonNull)
.filter(t -> validateRecord((FitbitRestRequest)restRequest, t))
.map(t -> {
SchemaAndValue avro = avroData.toConnectData(t.value.getSchema(), t.value);
Map<String, ?> offset = Collections.singletonMap(
Expand All @@ -98,6 +93,22 @@ public Collection<SourceRecord> convert(
.collect(Collectors.toList());
}

private boolean validateRecord(FitbitRestRequest request, TopicData record) {
if (record == null) {
return false;
}
Instant endDate = request.getUser().getEndDate();
if (endDate == null) {
return true;
}
Field timeField = record.value.getSchema().getField("time");
if (timeField != null) {
long time = (long) (((Double)record.value.get(timeField.pos()) * 1000.0));
return Instant.ofEpochMilli(time).isBefore(endDate);
}
return true;
}

/** Process the JSON records generated by given request. */
protected abstract Stream<TopicData> processRecords(
FitbitRestRequest request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ protected Duration getLookbackTime() {
*/
protected Instant nextPoll(User user) {
Instant offset = getOffset(user);
if (offset.isAfter(user.getEndDate())) {
if (offset.isAfter(user.getEndDate().minus(getEndDateThreshold()))) {
return nearFuture();
} else {
Instant nextPoll = lastPollPerUser.getOrDefault(user.getId(), MIN_INSTANT)
Expand All @@ -321,6 +321,10 @@ protected Instant nextPoll(User user) {
}
}

private TemporalAmount getEndDateThreshold() {
return Duration.ofHours(1);
}

/**
* Generate one date per day, using UTC time zone. The first date will have the time from the
* given startDate. Following time stamps will start at 00:00. This will not up to the date of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public class LocalUser implements User {
@JsonProperty("oauth2")
private OAuth2UserCredentials oauth2Credentials = new OAuth2UserCredentials();

@JsonProperty("authorized")
private Boolean isAuthorized;

@JsonIgnore
private SchemaAndValue observationKey;

Expand Down Expand Up @@ -95,6 +98,15 @@ public void setFitbitUserId(String id) {
this.externalUserId = id;
}

@Override
public boolean isAuthorized() {
if(isAuthorized == null) {
return !oauth2Credentials.isAccessTokenExpired()
|| oauth2Credentials.hasRefreshToken();
}
return isAuthorized;
}

@Override
public String getVersion() {
return version;
Expand All @@ -115,6 +127,7 @@ public LocalUser copy() {
copy.endDate = endDate;
copy.sourceId = sourceId;
copy.oauth2Credentials = oauth2Credentials;
copy.isAuthorized = isAuthorized;
return copy;
}

Expand All @@ -133,6 +146,7 @@ public String toString() {
+ ", projectId='" + projectId + '\''
+ ", userId='" + userId + '\''
+ ", sourceId='" + sourceId + '\''
+ ", isAuthorized='" + isAuthorized() + '\''
+ '}';
}

Expand All @@ -152,7 +166,8 @@ public boolean equals(Object o) {
&& Objects.equals(userId, localUser.userId)
&& Objects.equals(sourceId, localUser.sourceId)
&& Objects.equals(startDate, localUser.startDate)
&& Objects.equals(endDate, localUser.endDate);
&& Objects.equals(endDate, localUser.endDate)
&& Objects.equals(isAuthorized(), localUser.isAuthorized());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ static SchemaAndValue computeObservationKey(AvroData avroData, User user) {

String getSourceId();

boolean isAuthorized();

default String getVersionedId() {
String version = getVersion();
if (version == null) {
Expand All @@ -60,6 +62,7 @@ default Boolean isComplete() {
return getEndDate() != null
&& getStartDate() != null
&& getProjectId() != null
&& getUserId() != null;
&& getUserId() != null
&& isAuthorized();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ private void forceUpdateUsers() {

@Override
public Stream<LocalUser> stream() {
if (nextFetch.get().equals(MIN_INSTANT)) {
applyPendingUpdates();
}
Stream<LockedUser> users = this.users.values().stream()
.filter(lockedTest(u -> u.getOAuth2Credentials().hasRefreshToken()));
if (!configuredUsers.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public String getSourceId() {
return fitbitAuthDetails.getSourceId();
}

@Override
public boolean isAuthorized() {
return !fitbitAuthDetails.getOauth2Credentials().isAccessTokenExpired()
|| fitbitAuthDetails.getOauth2Credentials().hasRefreshToken();
}

public FirebaseUserDetails getFirebaseUserDetails() {
return firebaseUserDetails;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import okhttp3.Headers;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.apache.kafka.connect.data.Schema;
Expand All @@ -36,15 +37,13 @@ public class BytesPayloadConverter implements PayloadToSourceRecordConverter {

// Just bytes for incoming messages
@Override
public Collection<SourceRecord> convert(RestRequest request, Response response) throws IOException {
public Collection<SourceRecord> convert(RestRequest request, Headers headers, byte[] data) {
Map<String, Long> sourceOffset = Collections.singletonMap(
TIMESTAMP_OFFSET_KEY, currentTimeMillis());
ResponseBody body = response.body();
byte[] result = body != null ? body.bytes() : null;
String topic = topicSelector.getTopic(request, result);
String topic = topicSelector.getTopic(request, data);
return Collections.singleton(
new SourceRecord(request.getPartition(), sourceOffset,
topic, Schema.BYTES_SCHEMA, result));
topic, Schema.BYTES_SCHEMA, data));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Collection;
import okhttp3.Response;
import okhttp3.Headers;
import org.apache.kafka.connect.source.SourceRecord;
import org.radarbase.connect.rest.config.RestSourceTool;
import org.radarbase.connect.rest.request.RestRequest;
Expand All @@ -33,7 +33,7 @@ public interface PayloadToSourceRecordConverter extends RestSourceTool {
TemporalAmount NEAR_FUTURE = Duration.ofDays(31L);

Collection<SourceRecord> convert(
RestRequest request, Response response) throws IOException;
RestRequest request, Headers headers, byte[] data) throws IOException;

static Instant nearFuture() {
return Instant.now().plus(NEAR_FUTURE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@

import static java.lang.System.currentTimeMillis;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okhttp3.Headers;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.radarbase.connect.rest.RestSourceConnectorConfig;
Expand All @@ -35,10 +34,9 @@ public class StringPayloadConverter implements PayloadToSourceRecordConverter {
private TopicSelector topicSelector;

@Override
public Collection<SourceRecord> convert(RestRequest request, Response response) throws IOException {
public Collection<SourceRecord> convert(RestRequest request, Headers headers, byte[] data) {
Map<String, Long> sourceOffset = Collections.singletonMap(TIMESTAMP_OFFSET_KEY, currentTimeMillis());
ResponseBody body = response.body();
String result = body == null ? null : body.string();
String result = data == null ? null : new String(data, StandardCharsets.UTF_8);
String topic = topicSelector.getTopic(request, result);
return Collections.singleton(
new SourceRecord(request.getPartition(), sourceOffset, topic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Stream;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.apache.kafka.connect.source.SourceRecord;

/**
Expand Down Expand Up @@ -82,18 +84,24 @@ public Stream<SourceRecord> handleRequest() throws IOException {

Collection<SourceRecord> records;

byte[] data;
Headers headers;
try (Response response = client.newCall(request).execute()) {
if (!response.isSuccessful()) {
route.requestFailed(this, response);
return Stream.empty();
}

records = route.converter().convert(this, response);
headers = response.headers();
ResponseBody body = response.body();
data = body != null ? body.bytes() : null;
} catch (IOException ex) {
route.requestFailed(this, null);
throw ex;
}

records = route.converter().convert(this, headers, data);

if (records.isEmpty()) {
route.requestEmpty(this);
} else {
Expand Down
4 changes: 2 additions & 2 deletions scripts/REDCAP-FITBIT-AUTH-AUTO/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ certifi==2018.10.15
chardet==3.0.4
idna==2.7
PyCap==1.0.2
PyYAML==5.2
PyYAML==5.3.1
requests==2.22.0
selenium==3.14.1
semantic-version==2.3.1
urllib3==1.25.8
urllib3==1.25.9

0 comments on commit 9cb55d8

Please sign in to comment.