diff --git a/build.gradle b/build.gradle index 0b2b501b..2a494b41 100644 --- a/build.gradle +++ b/build.gradle @@ -11,7 +11,7 @@ subprojects { apply plugin: 'java-library' group = 'org.radarbase' - version = '0.3.2-SNAPSHOT' + version = '0.3.2' sourceCompatibility = 1.8 targetCompatibility = 1.8 diff --git a/docker-compose.yml b/docker-compose.yml index feb68935..cc5a01c4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,7 +9,7 @@ services: # Zookeeper Cluster # #---------------------------------------------------------------------------# zookeeper-1: - image: confluentinc/cp-zookeeper:5.1.0 + image: confluentinc/cp-zookeeper:5.5.1 environment: ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_CLIENT_PORT: 2181 @@ -19,7 +19,7 @@ services: ZOOKEEPER_SERVERS: zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888 zookeeper-2: - image: confluentinc/cp-zookeeper:5.1.0 + image: confluentinc/cp-zookeeper:5.5.1 environment: ZOOKEEPER_SERVER_ID: 2 ZOOKEEPER_CLIENT_PORT: 2181 @@ -29,7 +29,7 @@ services: ZOOKEEPER_SERVERS: zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888 zookeeper-3: - image: confluentinc/cp-zookeeper:5.1.0 + image: confluentinc/cp-zookeeper:5.5.1 environment: ZOOKEEPER_SERVER_ID: 3 ZOOKEEPER_CLIENT_PORT: 2181 @@ -42,7 +42,7 @@ services: # Kafka Cluster # #---------------------------------------------------------------------------# kafka-1: - image: confluentinc/cp-kafka:5.1.0 + image: confluentinc/cp-kafka:5.5.1 depends_on: - zookeeper-1 - zookeeper-2 @@ -61,7 +61,7 @@ services: KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false" kafka-2: - image: confluentinc/cp-kafka:5.1.0 + image: confluentinc/cp-kafka:5.5.1 depends_on: - zookeeper-1 - zookeeper-2 @@ -80,7 +80,7 @@ services: KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false" kafka-3: - image: confluentinc/cp-kafka:5.1.0 + image: confluentinc/cp-kafka:5.5.1 depends_on: - zookeeper-1 - zookeeper-2 @@ -102,7 +102,7 @@ services: # Schema Registry # #---------------------------------------------------------------------------# schema-registry-1: - image: confluentinc/cp-schema-registry:5.1.0 + image: confluentinc/cp-schema-registry:5.5.1 depends_on: - zookeeper-1 - zookeeper-2 @@ -124,7 +124,7 @@ services: # REST proxy # #---------------------------------------------------------------------------# rest-proxy-1: - image: confluentinc/cp-kafka-rest:5.1.0 + image: confluentinc/cp-kafka-rest:5.5.1 depends_on: - zookeeper-1 - zookeeper-2 diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitAvroConverter.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitAvroConverter.java index 097434c3..c48b4abe 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitAvroConverter.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitAvroConverter.java @@ -27,18 +27,14 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; -import java.util.Objects; import java.util.Optional; -import java.util.OptionalDouble; -import java.util.OptionalLong; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; -import okhttp3.Response; -import okhttp3.ResponseBody; +import okhttp3.Headers; +import org.apache.avro.Schema.Field; import org.apache.avro.generic.IndexedRecord; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.source.SourceRecord; @@ -74,19 +70,18 @@ public FitbitAvroConverter(AvroData avroData) { @Override public Collection convert( - RestRequest restRequest, Response response) throws IOException { - ResponseBody body = response.body(); - if (body == null) { + RestRequest restRequest, Headers headers, byte[] data) throws IOException { + if (data == null) { throw new IOException("Failed to read body"); } - JsonNode activities = JSON_READER.readTree(body.charStream()); + JsonNode activities = JSON_READER.readTree(data); User user = ((FitbitRestRequest) restRequest).getUser(); final SchemaAndValue key = user.getObservationKey(avroData); double timeReceived = System.currentTimeMillis() / 1000d; return processRecords((FitbitRestRequest)restRequest, activities, timeReceived) - .filter(Objects::nonNull) + .filter(t -> validateRecord((FitbitRestRequest)restRequest, t)) .map(t -> { SchemaAndValue avro = avroData.toConnectData(t.value.getSchema(), t.value); Map offset = Collections.singletonMap( @@ -98,6 +93,22 @@ public Collection convert( .collect(Collectors.toList()); } + private boolean validateRecord(FitbitRestRequest request, TopicData record) { + if (record == null) { + return false; + } + Instant endDate = request.getUser().getEndDate(); + if (endDate == null) { + return true; + } + Field timeField = record.value.getSchema().getField("time"); + if (timeField != null) { + long time = (long) (((Double)record.value.get(timeField.pos()) * 1000.0)); + return Instant.ofEpochMilli(time).isBefore(endDate); + } + return true; + } + /** Process the JSON records generated by given request. */ protected abstract Stream processRecords( FitbitRestRequest request, diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java index 7913493a..4a108491 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java @@ -312,7 +312,7 @@ protected Duration getLookbackTime() { */ protected Instant nextPoll(User user) { Instant offset = getOffset(user); - if (offset.isAfter(user.getEndDate())) { + if (offset.isAfter(user.getEndDate().minus(getEndDateThreshold()))) { return nearFuture(); } else { Instant nextPoll = lastPollPerUser.getOrDefault(user.getId(), MIN_INSTANT) @@ -321,6 +321,10 @@ protected Instant nextPoll(User user) { } } + private TemporalAmount getEndDateThreshold() { + return Duration.ofHours(1); + } + /** * Generate one date per day, using UTC time zone. The first date will have the time from the * given startDate. Following time stamps will start at 00:00. This will not up to the date of diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/LocalUser.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/LocalUser.java index cc0119a7..b37c8393 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/LocalUser.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/LocalUser.java @@ -45,6 +45,9 @@ public class LocalUser implements User { @JsonProperty("oauth2") private OAuth2UserCredentials oauth2Credentials = new OAuth2UserCredentials(); + @JsonProperty("authorized") + private Boolean isAuthorized; + @JsonIgnore private SchemaAndValue observationKey; @@ -95,6 +98,15 @@ public void setFitbitUserId(String id) { this.externalUserId = id; } + @Override + public boolean isAuthorized() { + if(isAuthorized == null) { + return !oauth2Credentials.isAccessTokenExpired() + || oauth2Credentials.hasRefreshToken(); + } + return isAuthorized; + } + @Override public String getVersion() { return version; @@ -115,6 +127,7 @@ public LocalUser copy() { copy.endDate = endDate; copy.sourceId = sourceId; copy.oauth2Credentials = oauth2Credentials; + copy.isAuthorized = isAuthorized; return copy; } @@ -133,6 +146,7 @@ public String toString() { + ", projectId='" + projectId + '\'' + ", userId='" + userId + '\'' + ", sourceId='" + sourceId + '\'' + + ", isAuthorized='" + isAuthorized() + '\'' + '}'; } @@ -152,7 +166,8 @@ public boolean equals(Object o) { && Objects.equals(userId, localUser.userId) && Objects.equals(sourceId, localUser.sourceId) && Objects.equals(startDate, localUser.startDate) - && Objects.equals(endDate, localUser.endDate); + && Objects.equals(endDate, localUser.endDate) + && Objects.equals(isAuthorized(), localUser.isAuthorized()); } @Override diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/User.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/User.java index 49e9e90b..f76aef70 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/User.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/User.java @@ -45,6 +45,8 @@ static SchemaAndValue computeObservationKey(AvroData avroData, User user) { String getSourceId(); + boolean isAuthorized(); + default String getVersionedId() { String version = getVersion(); if (version == null) { @@ -60,6 +62,7 @@ default Boolean isComplete() { return getEndDate() != null && getStartDate() != null && getProjectId() != null - && getUserId() != null; + && getUserId() != null + && isAuthorized(); } } diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/YamlUserRepository.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/YamlUserRepository.java index 4dfb50d5..71bed3b7 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/YamlUserRepository.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/YamlUserRepository.java @@ -132,6 +132,9 @@ private void forceUpdateUsers() { @Override public Stream stream() { + if (nextFetch.get().equals(MIN_INSTANT)) { + applyPendingUpdates(); + } Stream users = this.users.values().stream() .filter(lockedTest(u -> u.getOAuth2Credentials().hasRefreshToken())); if (!configuredUsers.isEmpty()) { diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUser.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUser.java index bb61431d..2102edc8 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUser.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUser.java @@ -70,6 +70,12 @@ public String getSourceId() { return fitbitAuthDetails.getSourceId(); } + @Override + public boolean isAuthorized() { + return !fitbitAuthDetails.getOauth2Credentials().isAccessTokenExpired() + || fitbitAuthDetails.getOauth2Credentials().hasRefreshToken(); + } + public FirebaseUserDetails getFirebaseUserDetails() { return firebaseUserDetails; } diff --git a/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/converter/BytesPayloadConverter.java b/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/converter/BytesPayloadConverter.java index 3448079b..891ae478 100644 --- a/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/converter/BytesPayloadConverter.java +++ b/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/converter/BytesPayloadConverter.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; +import okhttp3.Headers; import okhttp3.Response; import okhttp3.ResponseBody; import org.apache.kafka.connect.data.Schema; @@ -36,15 +37,13 @@ public class BytesPayloadConverter implements PayloadToSourceRecordConverter { // Just bytes for incoming messages @Override - public Collection convert(RestRequest request, Response response) throws IOException { + public Collection convert(RestRequest request, Headers headers, byte[] data) { Map sourceOffset = Collections.singletonMap( TIMESTAMP_OFFSET_KEY, currentTimeMillis()); - ResponseBody body = response.body(); - byte[] result = body != null ? body.bytes() : null; - String topic = topicSelector.getTopic(request, result); + String topic = topicSelector.getTopic(request, data); return Collections.singleton( new SourceRecord(request.getPartition(), sourceOffset, - topic, Schema.BYTES_SCHEMA, result)); + topic, Schema.BYTES_SCHEMA, data)); } @Override diff --git a/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/converter/PayloadToSourceRecordConverter.java b/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/converter/PayloadToSourceRecordConverter.java index 2c64eacb..1993618f 100644 --- a/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/converter/PayloadToSourceRecordConverter.java +++ b/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/converter/PayloadToSourceRecordConverter.java @@ -22,7 +22,7 @@ import java.time.Instant; import java.time.temporal.TemporalAmount; import java.util.Collection; -import okhttp3.Response; +import okhttp3.Headers; import org.apache.kafka.connect.source.SourceRecord; import org.radarbase.connect.rest.config.RestSourceTool; import org.radarbase.connect.rest.request.RestRequest; @@ -33,7 +33,7 @@ public interface PayloadToSourceRecordConverter extends RestSourceTool { TemporalAmount NEAR_FUTURE = Duration.ofDays(31L); Collection convert( - RestRequest request, Response response) throws IOException; + RestRequest request, Headers headers, byte[] data) throws IOException; static Instant nearFuture() { return Instant.now().plus(NEAR_FUTURE); diff --git a/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/converter/StringPayloadConverter.java b/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/converter/StringPayloadConverter.java index f9e36750..3690c8f5 100644 --- a/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/converter/StringPayloadConverter.java +++ b/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/converter/StringPayloadConverter.java @@ -19,12 +19,11 @@ import static java.lang.System.currentTimeMillis; -import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Collections; import java.util.Map; -import okhttp3.Response; -import okhttp3.ResponseBody; +import okhttp3.Headers; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.source.SourceRecord; import org.radarbase.connect.rest.RestSourceConnectorConfig; @@ -35,10 +34,9 @@ public class StringPayloadConverter implements PayloadToSourceRecordConverter { private TopicSelector topicSelector; @Override - public Collection convert(RestRequest request, Response response) throws IOException { + public Collection convert(RestRequest request, Headers headers, byte[] data) { Map sourceOffset = Collections.singletonMap(TIMESTAMP_OFFSET_KEY, currentTimeMillis()); - ResponseBody body = response.body(); - String result = body == null ? null : body.string(); + String result = data == null ? null : new String(data, StandardCharsets.UTF_8); String topic = topicSelector.getTopic(request, result); return Collections.singleton( new SourceRecord(request.getPartition(), sourceOffset, topic, diff --git a/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/request/RestRequest.java b/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/request/RestRequest.java index ace28b8c..0abc06d2 100644 --- a/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/request/RestRequest.java +++ b/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/request/RestRequest.java @@ -22,9 +22,11 @@ import java.util.Map; import java.util.function.Predicate; import java.util.stream.Stream; +import okhttp3.Headers; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; +import okhttp3.ResponseBody; import org.apache.kafka.connect.source.SourceRecord; /** @@ -82,18 +84,24 @@ public Stream handleRequest() throws IOException { Collection records; + byte[] data; + Headers headers; try (Response response = client.newCall(request).execute()) { if (!response.isSuccessful()) { route.requestFailed(this, response); return Stream.empty(); } - records = route.converter().convert(this, response); + headers = response.headers(); + ResponseBody body = response.body(); + data = body != null ? body.bytes() : null; } catch (IOException ex) { route.requestFailed(this, null); throw ex; } + records = route.converter().convert(this, headers, data); + if (records.isEmpty()) { route.requestEmpty(this); } else { diff --git a/scripts/REDCAP-FITBIT-AUTH-AUTO/requirements.txt b/scripts/REDCAP-FITBIT-AUTH-AUTO/requirements.txt index 5b42682c..4e31460f 100644 --- a/scripts/REDCAP-FITBIT-AUTH-AUTO/requirements.txt +++ b/scripts/REDCAP-FITBIT-AUTH-AUTO/requirements.txt @@ -2,8 +2,8 @@ certifi==2018.10.15 chardet==3.0.4 idna==2.7 PyCap==1.0.2 -PyYAML==5.2 +PyYAML==5.3.1 requests==2.22.0 selenium==3.14.1 semantic-version==2.3.1 -urllib3==1.25.8 +urllib3==1.25.9