diff --git a/buildSrc/src/main/kotlin/Versions.kt b/buildSrc/src/main/kotlin/Versions.kt index a3a8250e..fcdf9826 100644 --- a/buildSrc/src/main/kotlin/Versions.kt +++ b/buildSrc/src/main/kotlin/Versions.kt @@ -1,6 +1,6 @@ @Suppress("ConstPropertyName", "MemberVisibilityCanBePrivate") object Versions { - const val project = "0.5.0" + const val project = "0.5.1" const val java = 11 const val kotlin = "1.9.10" @@ -11,6 +11,8 @@ object Versions { const val kafka = "$confluent-ce" const val avro = "1.11.0" + const val managementPortal = "2.0.0" + // From image const val jackson = "2.14.2" diff --git a/kafka-connect-fitbit-source/build.gradle.kts b/kafka-connect-fitbit-source/build.gradle.kts index 932ccfef..f6994a25 100644 --- a/kafka-connect-fitbit-source/build.gradle.kts +++ b/kafka-connect-fitbit-source/build.gradle.kts @@ -6,7 +6,9 @@ dependencies { api("io.confluent:kafka-connect-avro-converter:${Versions.confluent}") api("org.radarbase:radar-schemas-commons:${Versions.radarSchemas}") implementation("org.radarbase:radar-commons-kotlin:${Versions.radarCommons}") + implementation("org.radarbase:oauth-client-util:${Versions.managementPortal}") + api("com.squareup.okhttp3:okhttp:${Versions.okhttp}") implementation(platform("com.fasterxml.jackson:jackson-bom:${Versions.jackson}")) implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-yaml") implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310") diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java index 36017b23..ceafad5a 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java @@ -17,10 +17,11 @@ package org.radarbase.connect.rest.fitbit; -import static io.ktor.http.URLUtilsKt.URLBuilder; import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE; import java.lang.reflect.InvocationTargetException; +import java.net.MalformedURLException; +import java.net.URL; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; @@ -30,9 +31,9 @@ import java.util.List; import java.util.Map; -import io.ktor.http.URLParserException; -import io.ktor.http.Url; import okhttp3.Headers; +import okhttp3.HttpUrl; + import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.NonEmptyString; @@ -331,6 +332,39 @@ public String toString() { Width.SHORT, FITBIT_INTRADAY_HEART_RATE_TOPIC_DISPLAY) + .define(FITBIT_INTRADAY_HEART_RATE_VARIABILITY_TOPIC_CONFIG, + Type.STRING, + FITBIT_INTRADAY_HEART_RATE_VARIABILITY_TOPIC_DEFAULT, + nonControlChar, + Importance.LOW, + FITBIT_INTRADAY_HEART_RATE_VARIABILITY_TOPIC_DOC, + group, + ++orderInGroup, + Width.SHORT, + FITBIT_INTRADAY_HEART_RATE_VARIABILITY_TOPIC_DISPLAY) + + .define(FITBIT_BREATHING_RATE_TOPIC_CONFIG, + Type.STRING, + FITBIT_BREATHING_RATE_TOPIC_DEFAULT, + nonControlChar, + Importance.LOW, + FITBIT_BREATHING_RATE_TOPIC_DOC, + group, + ++orderInGroup, + Width.SHORT, + FITBIT_BREATHING_RATE_TOPIC_DISPLAY) + + .define(FITBIT_SKIN_TEMPERATURE_TOPIC_CONFIG, + Type.STRING, + FITBIT_SKIN_TEMPERATURE_TOPIC_DEFAULT, + nonControlChar, + Importance.LOW, + FITBIT_SKIN_TEMPERATURE_TOPIC_DOC, + group, + ++orderInGroup, + Width.SHORT, + FITBIT_SKIN_TEMPERATURE_TOPIC_DISPLAY) + .define(FITBIT_RESTING_HEART_RATE_TOPIC_CONFIG, Type.STRING, FITBIT_RESTING_HEART_RATE_TOPIC_DEFAULT, @@ -497,18 +531,18 @@ public Path getFitbitUserCredentialsPath() { return Paths.get(getString(FITBIT_USER_CREDENTIALS_DIR_CONFIG)); } - public Url getFitbitUserRepositoryUrl() { + public HttpUrl getFitbitUserRepositoryUrl() { String urlString = getString(FITBIT_USER_REPOSITORY_URL_CONFIG).trim(); if (urlString.charAt(urlString.length() - 1) != '/') { urlString += '/'; } - try { - return URLBuilder(urlString).build(); - } catch (URLParserException ex) { + HttpUrl url = HttpUrl.parse(urlString); + if (url == null) { throw new ConfigException(FITBIT_USER_REPOSITORY_URL_CONFIG, getString(FITBIT_USER_REPOSITORY_URL_CONFIG), - "User repository URL " + urlString + " cannot be parsed as URL: " + ex); + "User repository URL " + urlString + " cannot be parsed as URL."); } + return url; } public Headers getClientCredentials() { @@ -551,17 +585,15 @@ public String getFitbitUserRepositoryClientSecret() { return getPassword(FITBIT_USER_REPOSITORY_CLIENT_SECRET_CONFIG).value(); } - public Url getFitbitUserRepositoryTokenUrl() { + public URL getFitbitUserRepositoryTokenUrl() { String value = getString(FITBIT_USER_REPOSITORY_TOKEN_URL_CONFIG); if (value == null || value.isEmpty()) { return null; } else { try { - return URLBuilder(value).build(); - } catch (URLParserException ex) { - throw new ConfigException(FITBIT_USER_REPOSITORY_URL_CONFIG, - getString(FITBIT_USER_REPOSITORY_URL_CONFIG), - "Fitbit user repository token URL " + value + " cannot be parsed as URL: " + ex); + return new URL(getString(FITBIT_USER_REPOSITORY_TOKEN_URL_CONFIG)); + } catch (MalformedURLException ex) { + throw new ConfigException("Fitbit user repository token URL is invalid."); } } } diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.kt b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.kt index d89a7dc3..c26c2af5 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.kt +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.kt @@ -37,6 +37,7 @@ import io.ktor.client.statement.request import io.ktor.http.ContentType import io.ktor.http.HttpMethod import io.ktor.http.HttpStatusCode +import io.ktor.http.URLBuilder import io.ktor.http.Url import io.ktor.http.contentLength import io.ktor.http.contentType @@ -85,8 +86,8 @@ class ServiceUserRepository : UserRepository { val containedUsers = config.fitbitUsers.toHashSet() client = createClient( - baseUrl = config.fitbitUserRepositoryUrl, - tokenUrl = config.fitbitUserRepositoryTokenUrl, + baseUrl = URLBuilder(config.fitbitUserRepositoryUrl.toString()).build(), + tokenUrl = URLBuilder(config.fitbitUserRepositoryTokenUrl.toString()).build(), clientId = config.fitbitUserRepositoryClientId, clientSecret = config.fitbitUserRepositoryClientSecret, ) diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepositoryLegacy.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepositoryLegacy.java new file mode 100644 index 00000000..2f5086d4 --- /dev/null +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepositoryLegacy.java @@ -0,0 +1,249 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + package org.radarbase.connect.rest.fitbit.user; + + import static org.radarbase.connect.rest.converter.PayloadToSourceRecordConverter.MIN_INSTANT; + import static org.radarbase.connect.rest.fitbit.request.FitbitRequestGenerator.JSON_READER; + + import com.fasterxml.jackson.core.JsonProcessingException; + import com.fasterxml.jackson.databind.ObjectReader; + import java.io.IOException; + import java.net.ProtocolException; + import java.net.URL; + import java.time.Duration; + import java.time.Instant; + import java.util.HashMap; + import java.util.HashSet; + import java.util.Map; + import java.util.NoSuchElementException; + import java.util.Set; + import java.util.concurrent.atomic.AtomicReference; + import java.util.stream.Collectors; + import java.util.stream.Stream; + import okhttp3.Credentials; + import okhttp3.HttpUrl; + import okhttp3.MediaType; + import okhttp3.OkHttpClient; + import okhttp3.Request; + import okhttp3.RequestBody; + import okhttp3.Response; + import okhttp3.ResponseBody; + import org.apache.kafka.common.config.ConfigException; + import org.radarbase.connect.rest.RestSourceConnectorConfig; + import org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig; + import org.radarbase.exception.TokenException; + import org.radarbase.oauth.OAuth2Client; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + @SuppressWarnings("unused") + public class ServiceUserRepositoryLegacy implements UserRepository { + private static final Logger logger = LoggerFactory.getLogger(ServiceUserRepositoryLegacy.class); + + private static final ObjectReader USER_LIST_READER = JSON_READER.forType(Users.class); + private static final ObjectReader USER_READER = JSON_READER.forType(User.class); + private static final ObjectReader OAUTH_READER = JSON_READER.forType(OAuth2UserCredentials.class); + private static final RequestBody EMPTY_BODY = + RequestBody.create("", MediaType.parse("application/json; charset=utf-8")); + private static final Duration FETCH_THRESHOLD = Duration.ofMinutes(1L); + private static final Duration CONNECTION_TIMEOUT = Duration.ofSeconds(60); + private static final Duration CONNECTION_READ_TIMEOUT = Duration.ofSeconds(90); + + private final OkHttpClient client; + private final Map cachedCredentials; + private final AtomicReference nextFetch = new AtomicReference<>(MIN_INSTANT); + + private HttpUrl baseUrl; + private final HashSet containedUsers; + private Set timedCachedUsers = new HashSet<>(); + private OAuth2Client repositoryClient; + private String basicCredentials; + + public ServiceUserRepositoryLegacy() { + this.client = new OkHttpClient.Builder() + .connectTimeout(CONNECTION_TIMEOUT) + .readTimeout(CONNECTION_READ_TIMEOUT) + .build(); + this.cachedCredentials = new HashMap<>(); + this.containedUsers = new HashSet<>(); + } + + @Override + public User get(String key) throws IOException { + Request request = requestFor("users/" + key).build(); + return makeRequest(request, USER_READER); + } + + @Override + public void initialize(RestSourceConnectorConfig config) { + FitbitRestSourceConnectorConfig fitbitConfig = (FitbitRestSourceConnectorConfig) config; + this.baseUrl = fitbitConfig.getFitbitUserRepositoryUrl(); + this.containedUsers.addAll(fitbitConfig.getFitbitUsers()); + + URL tokenUrl = fitbitConfig.getFitbitUserRepositoryTokenUrl(); + String clientId = fitbitConfig.getFitbitUserRepositoryClientId(); + String clientSecret = fitbitConfig.getFitbitUserRepositoryClientSecret(); + + if (tokenUrl != null) { + if (clientId.isEmpty()) { + throw new ConfigException("Client ID for user repository is not set."); + } + this.repositoryClient = new OAuth2Client.Builder() + .credentials(clientId, clientSecret) + .endpoint(tokenUrl) + .scopes("SUBJECT.READ MEASUREMENT.CREATE") + .httpClient(client) + .build(); + } else if (clientId != null) { + basicCredentials = Credentials.basic(clientId, clientSecret); + } + } + + @Override + public Stream stream() { + if (nextFetch.get().equals(MIN_INSTANT)) { + try { + applyPendingUpdates(); + } catch (IOException ex) { + logger.error("Failed to initially get users from repository", ex); + } + } + return this.timedCachedUsers.stream() + .filter(User::isComplete); + } + + @Override + public String getAccessToken(User user) throws IOException, UserNotAuthorizedException { + if (!user.isAuthorized()) { + throw new UserNotAuthorizedException("User is not authorized"); + } + OAuth2UserCredentials credentials = cachedCredentials.get(user.getId()); + if (credentials != null && !credentials.isAccessTokenExpired()) { + return credentials.getAccessToken(); + } else { + Request request = requestFor("users/" + user.getId() + "/token").build(); + return requestAccessToken(user, request); + } + } + + @Override + public String refreshAccessToken(User user) throws IOException, UserNotAuthorizedException { + if (!user.isAuthorized()) { + throw new UserNotAuthorizedException("User is not authorized"); + } + Request request = requestFor("users/" + user.getId() + "/token") + .post(EMPTY_BODY) + .build(); + return requestAccessToken(user, request); + } + + private String requestAccessToken(User user, Request request) + throws UserNotAuthorizedException, IOException { + try { + OAuth2UserCredentials credentials = makeRequest(request, OAUTH_READER); + cachedCredentials.put(user.getId(), credentials); + return credentials.getAccessToken(); + } catch (HttpResponseException ex) { + if (ex.getStatusCode() == 407) { + cachedCredentials.remove(user.getId()); + if (user instanceof LocalUser) { + ((LocalUser) user).setIsAuthorized(false); + } + throw new UserNotAuthorizedException(ex.getMessage()); + } + throw ex; + } + } + + @Override + public boolean hasPendingUpdates() { + Instant nextFetchTime = nextFetch.get(); + Instant now = Instant.now(); + return now.isAfter(nextFetchTime); + } + + @Override + public void applyPendingUpdates() throws IOException { + logger.info("Requesting user information from webservice"); + Request request = requestFor("users?source-type=FitBit").build(); + this.timedCachedUsers = + this.makeRequest(request, USER_LIST_READER).getUsers().stream() + .filter(u -> u.isComplete() + && (containedUsers.isEmpty() + || containedUsers.contains(u.getVersionedId()))) + .collect(Collectors.toSet()); + nextFetch.set(Instant.now().plus(FETCH_THRESHOLD)); + } + + private Request.Builder requestFor(String relativeUrl) throws IOException { + HttpUrl url = baseUrl.resolve(relativeUrl); + if (url == null) { + throw new IllegalArgumentException("Relative URL is invalid"); + } + Request.Builder builder = new Request.Builder().url(url); + String authorization = requestAuthorization(); + if (authorization != null) { + builder.addHeader("Authorization", authorization); + } + + return builder; + } + + private String requestAuthorization() throws IOException { + if (repositoryClient != null) { + try { + return "Bearer " + repositoryClient.getValidToken().getAccessToken(); + } catch (TokenException ex) { + throw new IOException(ex); + } + } else if (basicCredentials != null) { + return basicCredentials; + } else { + return null; + } + } + + private T makeRequest(Request request, ObjectReader reader) throws IOException { + logger.info("Requesting info from {}", request.url()); + try (Response response = client.newCall(request).execute()) { + ResponseBody body = response.body(); + + if (response.code() == 404) { + throw new NoSuchElementException("URL " + request.url() + " does not exist"); + } else if (!response.isSuccessful() || body == null) { + String message = "Failed to make request"; + if (response.code() > 0) { + message += " (HTTP status code " + response.code() + ')'; + } + if (body != null) { + message += body.string(); + } + throw new HttpResponseException(message, response.code()); + } + String bodyString = body.string(); + try { + return reader.readValue(bodyString); + } catch (JsonProcessingException ex) { + logger.error("Failed to parse JSON: {}\n{}", ex, bodyString); + throw ex; + } + } catch (ProtocolException ex) { + throw new IOException("Failed to make request to user repository", ex); + } + } + } \ No newline at end of file diff --git a/kafka-connect-oura-source/build.gradle.kts b/kafka-connect-oura-source/build.gradle.kts index b80d4801..78149a71 100644 --- a/kafka-connect-oura-source/build.gradle.kts +++ b/kafka-connect-oura-source/build.gradle.kts @@ -5,6 +5,7 @@ dependencies { api("io.confluent:kafka-connect-avro-converter:${Versions.confluent}") api("org.radarbase:radar-schemas-commons:${Versions.radarSchemas}") implementation("org.radarbase:radar-commons-kotlin:${Versions.radarCommons}") + implementation("org.radarbase:oauth-client-util:${Versions.managementPortal}") api("com.squareup.okhttp3:okhttp:${Versions.okhttp}") implementation(platform("com.fasterxml.jackson:jackson-bom:${Versions.jackson}")) diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraRestSourceConnectorConfig.java b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraRestSourceConnectorConfig.java index 41ae3f0e..c21c038f 100644 --- a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraRestSourceConnectorConfig.java +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraRestSourceConnectorConfig.java @@ -19,7 +19,6 @@ import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE; -import static io.ktor.http.URLUtilsKt.URLBuilder; import java.lang.reflect.InvocationTargetException; import java.net.MalformedURLException; import java.net.URL; @@ -44,9 +43,8 @@ import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.errors.ConnectException; -import org.radarbase.connect.rest.oura.user.OuraServiceUserRepository; -import io.ktor.http.URLParserException; -import io.ktor.http.Url; +import org.radarbase.connect.rest.oura.user.OuraUserRepository; +import org.radarbase.connect.rest.oura.user.OuraServiceUserRepositoryLegacy; public class OuraRestSourceConnectorConfig extends AbstractConfig { public static final Pattern COLON_PATTERN = Pattern.compile(":"); @@ -104,7 +102,7 @@ public class OuraRestSourceConnectorConfig extends AbstractConfig { private static final String OURA_USER_REPOSITORY_TOKEN_URL_DOC = "OAuth 2.0 token url for retrieving client credentials."; private static final String OURA_USER_REPOSITORY_TOKEN_URL_DISPLAY = "OAuth 2.0 token URL."; - private OuraServiceUserRepository userRepository; + private OuraUserRepository userRepository; private final Headers clientCredentials; public OuraRestSourceConnectorConfig(ConfigDef config, Map parsedConfig, boolean doLog) { @@ -195,7 +193,7 @@ public String toString() { .define(OURA_USER_REPOSITORY_CONFIG, Type.CLASS, - OuraServiceUserRepository.class, + OuraServiceUserRepositoryLegacy.class, Importance.MEDIUM, OURA_USER_REPOSITORY_DOC, group, @@ -257,7 +255,7 @@ public String getOuraClientSecret() { return getPassword(OURA_API_SECRET_CONFIG).value(); } - public OuraServiceUserRepository getUserRepository(OuraServiceUserRepository reuse) { + public OuraUserRepository getUserRepository(OuraUserRepository reuse) { if (reuse != null && reuse.getClass().equals(getClass(OURA_USER_REPOSITORY_CONFIG))) { userRepository = reuse; } else { @@ -267,15 +265,15 @@ public OuraServiceUserRepository getUserRepository(OuraServiceUserRepository reu return userRepository; } - public OuraServiceUserRepository getUserRepository() { + public OuraUserRepository getUserRepository() { userRepository.initialize(this); return userRepository; } @SuppressWarnings("unchecked") - public OuraServiceUserRepository createUserRepository() { + public OuraUserRepository createUserRepository() { try { - return ((Class) + return ((Class) getClass(OURA_USER_REPOSITORY_CONFIG)).getDeclaredConstructor().newInstance(); } catch (IllegalAccessException | InstantiationException | InvocationTargetException | NoSuchMethodException e) { @@ -283,18 +281,18 @@ public OuraServiceUserRepository createUserRepository() { } } - public Url getOuraUserRepositoryUrl() { + public HttpUrl getOuraUserRepositoryUrl() { String urlString = getString(OURA_USER_REPOSITORY_URL_CONFIG).trim(); if (urlString.charAt(urlString.length() - 1) != '/') { urlString += '/'; } - try { - return URLBuilder(urlString).build(); - } catch (URLParserException ex) { + HttpUrl url = HttpUrl.parse(urlString); + if (url == null) { throw new ConfigException(OURA_USER_REPOSITORY_URL_CONFIG, getString(OURA_USER_REPOSITORY_URL_CONFIG), - "User repository URL " + urlString + " cannot be parsed as URL: " + ex); + "User repository URL " + urlString + " cannot be parsed as URL."); } + return url; } public Headers getClientCredentials() { @@ -317,18 +315,16 @@ public String getOuraUserRepositoryClientSecret() { return getPassword(OURA_USER_REPOSITORY_CLIENT_SECRET_CONFIG).value(); } - public Url getOuraUserRepositoryTokenUrl() { + public URL getOuraUserRepositoryTokenUrl() { String value = getString(OURA_USER_REPOSITORY_TOKEN_URL_CONFIG); if (value == null || value.isEmpty()) { return null; } else { try { - return URLBuilder(value).build(); - } catch (URLParserException ex) { - throw new ConfigException(OURA_USER_REPOSITORY_TOKEN_URL_CONFIG, - getString(OURA_USER_REPOSITORY_TOKEN_URL_CONFIG), - "Oura user repository token URL " + value + " cannot be parsed as URL: " + ex); + return new URL(getString(OURA_USER_REPOSITORY_TOKEN_URL_CONFIG)); + } catch (MalformedURLException e) { + throw new ConfigException("Oura user repository token URL is invalid."); } } } -} +} \ No newline at end of file diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceConnector.java b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceConnector.java index f6959ee6..9bf6b865 100644 --- a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceConnector.java +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceConnector.java @@ -29,7 +29,7 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.radarbase.oura.user.User; -import org.radarbase.connect.rest.oura.user.OuraServiceUserRepository; +import org.radarbase.connect.rest.oura.user.OuraUserRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +45,7 @@ public class OuraSourceConnector extends AbstractRestSourceConnector { private static final Logger logger = LoggerFactory.getLogger(OuraSourceConnector.class); private ScheduledExecutorService executor; private Set configuredUsers; - private OuraServiceUserRepository repository; + private OuraUserRepository repository; @Override public void start(Map props) { diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceTask.java b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceTask.java index 3cd47a93..293a792f 100644 --- a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceTask.java +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceTask.java @@ -35,7 +35,7 @@ import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.storage.OffsetStorageReader; import org.radarbase.connect.rest.oura.offset.KafkaOffsetManager; -import org.radarbase.connect.rest.oura.user.OuraServiceUserRepository; +import org.radarbase.connect.rest.oura.user.OuraUserRepository; import org.radarbase.connect.rest.oura.util.VersionUtil; import org.radarbase.oura.converter.TopicData; import org.radarbase.oura.request.OuraRequestGenerator; @@ -57,7 +57,7 @@ public class OuraSourceTask extends SourceTask { private static final Logger logger = LoggerFactory.getLogger(OuraSourceTask.class); private OkHttpClient baseClient; - private OuraServiceUserRepository userRepository; + private OuraUserRepository userRepository; private List routes; private OuraRequestGenerator ouraRequestGenerator; private AvroData avroData = new AvroData(20); diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/offset/KafkaOffsetManager.java b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/offset/KafkaOffsetManager.java index 360d5f82..fbb4329b 100644 --- a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/offset/KafkaOffsetManager.java +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/offset/KafkaOffsetManager.java @@ -31,11 +31,15 @@ public KafkaOffsetManager( } public void initialize(List> partitions) { - this.offsets = this.offsetStorageReader.offsets(partitions).entrySet().stream() + if (this.offsetStorageReader != null) { + this.offsets = this.offsetStorageReader.offsets(partitions).entrySet().stream() .filter(e -> e.getValue() != null && e.getValue().containsKey(TIMESTAMP_OFFSET_KEY)) .collect(Collectors.toMap( e -> (String) e.getKey().get("user") + "-" + e.getKey().get("route"), e -> Instant.ofEpochMilli(((Number) e.getValue().get(TIMESTAMP_OFFSET_KEY)).longValue()))); + } else { + logger.warn("Offset storage reader is null, will resume from an empty state."); + } } @Override diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepository.kt b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepository.kt index d22ad285..ef20736f 100644 --- a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepository.kt +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepository.kt @@ -37,6 +37,7 @@ import io.ktor.client.statement.request import io.ktor.http.ContentType import io.ktor.http.HttpMethod import io.ktor.http.HttpStatusCode +import io.ktor.http.URLBuilder import io.ktor.http.Url import io.ktor.http.contentLength import io.ktor.http.contentType @@ -56,7 +57,6 @@ import org.radarbase.ktor.auth.ClientCredentialsConfig import org.radarbase.ktor.auth.clientCredentials import org.radarbase.oura.user.OuraUser import org.radarbase.oura.user.User -import org.radarbase.oura.user.UserRepository import org.slf4j.LoggerFactory import java.io.IOException import java.util.concurrent.ConcurrentHashMap @@ -67,7 +67,7 @@ import kotlin.time.Duration.Companion.minutes import kotlin.time.Duration.Companion.seconds @Suppress("unused") -class OuraServiceUserRepository : UserRepository { +class OuraServiceUserRepository : OuraUserRepository() { private lateinit var userCache: CachedSet private lateinit var client: HttpClient private val credentialCaches = ConcurrentHashMap>() @@ -76,33 +76,34 @@ class OuraServiceUserRepository : UserRepository { private val mapper = ObjectMapper().registerKotlinModule().registerModule(JavaTimeModule()) @Throws(IOException::class) - override fun get(key: String): User = runBlocking(Dispatchers.Default) { - makeRequest { url("users/$key") } - } + override fun get(key: String): User = + runBlocking(Dispatchers.Default) { + makeRequest { url("users/$key") } + } - fun initialize( - config: OuraRestSourceConnectorConfig, - ) { + override fun initialize(config: OuraRestSourceConnectorConfig) { val containedUsers = config.ouraUsers.toHashSet() - client = createClient( - baseUrl = config.ouraUserRepositoryUrl, - tokenUrl = config.ouraUserRepositoryTokenUrl, - clientId = config.ouraUserRepositoryClientId, - clientSecret = config.ouraUserRepositoryClientSecret, - ) + client = + createClient( + baseUrl = URLBuilder(config.ouraUserRepositoryUrl.toString()).build(), + tokenUrl = URLBuilder(config.ouraUserRepositoryTokenUrl.toString()).build(), + clientId = config.ouraUserRepositoryClientId, + clientSecret = config.ouraUserRepositoryClientSecret, + ) - userCache = CachedSet( - CacheConfig(refreshDuration = 1.hours, retryDuration = 1.minutes), - ) { - makeRequest { url("users?source-type=Oura") } - .users - .toHashSet() - .filterTo(HashSet()) { u -> - u.isComplete() && - (containedUsers.isEmpty() || u.versionedId in containedUsers) - } - } + userCache = + CachedSet( + CacheConfig(refreshDuration = 1.hours, retryDuration = 1.minutes), + ) { + makeRequest { url("users?source-type=Oura") } + .users + .toHashSet() + .filterTo(HashSet()) { u -> + u.isComplete() && + (containedUsers.isEmpty() || u.versionedId in containedUsers) + } + } } private fun createClient( @@ -110,65 +111,68 @@ class OuraServiceUserRepository : UserRepository { tokenUrl: Url?, clientId: String?, clientSecret: String?, - ): HttpClient = HttpClient(CIO) { - if (tokenUrl != null) { - install(Auth) { - clientCredentials( - ClientCredentialsConfig( - tokenUrl.toString(), - clientId, - clientSecret, - ).copyWithEnv("MANAGEMENT_PORTAL"), - baseUrl.host, - ) - } - install(ContentNegotiation) { - json( - Json { - ignoreUnknownKeys = true - }, - ) - } - } else if (clientId != null && clientSecret != null) { - install(Auth) { - basic { - credentials { - BasicAuthCredentials(username = clientId, password = clientSecret) - } - realm = "Access to the '/' path" - sendWithoutRequest { - it.url.host == baseUrl.host + ): HttpClient = + HttpClient(CIO) { + if (tokenUrl != null) { + install(Auth) { + clientCredentials( + ClientCredentialsConfig( + tokenUrl.toString(), + clientId, + clientSecret, + ).copyWithEnv("MANAGEMENT_PORTAL"), + baseUrl.host, + ) + } + install(ContentNegotiation) { + json( + Json { + ignoreUnknownKeys = true + }, + ) + } + } else if (clientId != null && clientSecret != null) { + install(Auth) { + basic { + credentials { + BasicAuthCredentials(username = clientId, password = clientSecret) + } + realm = "Access to the '/' path" + sendWithoutRequest { + it.url.host == baseUrl.host + } } } } - } - defaultRequest { - url.takeFrom(baseUrl) - } + defaultRequest { + url.takeFrom(baseUrl) + } - install(ContentNegotiation) { - jackson { - registerModule(JavaTimeModule()) // support java.time.* types + install(ContentNegotiation) { + jackson { + registerModule(JavaTimeModule()) // support java.time.* types + } } - } - install(HttpTimeout) { - connectTimeoutMillis = 60.seconds.inWholeMilliseconds - requestTimeoutMillis = 90.seconds.inWholeMilliseconds + install(HttpTimeout) { + connectTimeoutMillis = 60.seconds.inWholeMilliseconds + requestTimeoutMillis = 90.seconds.inWholeMilliseconds + } } - } - override fun stream(): Sequence = runBlocking(Dispatchers.Default) { - val valueInCache = userCache.getFromCache() - .takeIf { it is CachedValue.CacheValue } - ?.getOrThrow() + override fun stream(): Sequence = + runBlocking(Dispatchers.Default) { + val valueInCache = + userCache.getFromCache() + .takeIf { it is CachedValue.CacheValue } + ?.getOrThrow() - (valueInCache ?: userCache.get()) - .stream() - .filter { it.isComplete() } - .asSequence() - } + (valueInCache ?: userCache.get()) + .stream() + .filter { it.isComplete() } + .asSequence() + } @Throws(IOException::class, UserNotAuthorizedException::class) override fun getAccessToken(user: User): String { @@ -184,17 +188,18 @@ class OuraServiceUserRepository : UserRepository { } @Throws(IOException::class, UserNotAuthorizedException::class) - fun refreshAccessToken(user: User): String { + override fun refreshAccessToken(user: User): String { if (!user.isAuthorized) { throw UserNotAuthorizedException("User is not authorized") } return runBlocking(Dispatchers.Default) { - val token = requestAccessToken(user) { - url("users/${user.id}/token") - method = HttpMethod.Post - setBody("{}") - contentType(ContentType.Application.Json) - } + val token = + requestAccessToken(user) { + url("users/${user.id}/token") + method = HttpMethod.Post + setBody("{}") + contentType(ContentType.Application.Json) + } credentialCache(user).set(token) token.accessToken } @@ -222,12 +227,13 @@ class OuraServiceUserRepository : UserRepository { throw ex } - fun hasPendingUpdates(): Boolean = runBlocking(Dispatchers.Default) { - userCache.isStale() - } + override fun hasPendingUpdates(): Boolean = + runBlocking(Dispatchers.Default) { + userCache.isStale() + } @Throws(IOException::class) - fun applyPendingUpdates() { + override fun applyPendingUpdates() { logger.info("Requesting user information from webservice") runBlocking(Dispatchers.Default) { @@ -237,26 +243,28 @@ class OuraServiceUserRepository : UserRepository { private suspend inline fun makeRequest( crossinline builder: HttpRequestBuilder.() -> Unit, - ): T = withContext(Dispatchers.IO) { - val response = client.request(builder) - val contentLength = response.contentLength() - val hasBody = contentLength != null && contentLength > 0 - if (response.status == HttpStatusCode.NotFound) { - throw NoSuchElementException("URL " + response.request.url + " does not exist") - } else if (!response.status.isSuccess() || !hasBody) { - val message = buildString { - append("Failed to make request (HTTP status code ") - append(response.status) - append(')') - if (hasBody) { - append(": ") - append(response.bodyAsText()) - } + ): T = + withContext(Dispatchers.IO) { + val response = client.request(builder) + val contentLength = response.contentLength() + val hasBody = contentLength != null && contentLength > 0 + if (response.status == HttpStatusCode.NotFound) { + throw NoSuchElementException("URL " + response.request.url + " does not exist") + } else if (!response.status.isSuccess() || !hasBody) { + val message = + buildString { + append("Failed to make request (HTTP status code ") + append(response.status) + append(')') + if (hasBody) { + append(": ") + append(response.bodyAsText()) + } + } + throw HttpResponseException(message, response.status.value) } - throw HttpResponseException(message, response.status.value) + mapper.readValue(response.bodyAsText()) } - mapper.readValue(response.bodyAsText()) - } companion object { private val logger = LoggerFactory.getLogger(OuraServiceUserRepository::class.java) diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepositoryLegacy.java b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepositoryLegacy.java new file mode 100644 index 00000000..6cf055fe --- /dev/null +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepositoryLegacy.java @@ -0,0 +1,258 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + package org.radarbase.connect.rest.oura.user; + + import com.fasterxml.jackson.core.JsonFactory; + import com.fasterxml.jackson.core.JsonProcessingException; + import com.fasterxml.jackson.databind.ObjectMapper; + import com.fasterxml.jackson.databind.ObjectReader; + import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + + import kotlin.sequences.*; + + import java.io.IOException; + import java.net.ProtocolException; + import java.net.URL; + import java.time.Duration; + import java.time.Instant; + import java.util.HashMap; + import java.util.HashSet; + import java.util.Map; + import java.util.NoSuchElementException; + import java.util.Set; + import java.util.List; + import java.util.concurrent.atomic.AtomicReference; + import java.util.stream.Collectors; + import java.util.stream.Stream; + import okhttp3.Credentials; + import okhttp3.HttpUrl; + import okhttp3.MediaType; + import okhttp3.OkHttpClient; + import okhttp3.Request; + import okhttp3.RequestBody; + import okhttp3.Response; + import okhttp3.ResponseBody; + import org.apache.kafka.common.config.ConfigException; + import org.radarbase.connect.rest.oura.OuraRestSourceConnectorConfig; + import org.radarbase.exception.TokenException; + import org.radarbase.oauth.OAuth2Client; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + import org.radarbase.oura.user.UserRepository; + import org.radarbase.oura.user.User; + import org.radarbase.oura.user.OuraUser; + import org.radarbase.oura.user.UserNotAuthorizedException; + import static kotlin.sequences.SequencesKt.*; + + @SuppressWarnings("unused") + public class OuraServiceUserRepositoryLegacy extends OuraUserRepository { + Instant MIN_INSTANT = Instant.EPOCH; + + protected static final JsonFactory JSON_FACTORY = new JsonFactory(); + protected static final ObjectReader JSON_READER = new ObjectMapper(JSON_FACTORY) + .registerModule(new JavaTimeModule()) + .reader(); + private static final Logger logger = LoggerFactory.getLogger(OuraServiceUserRepositoryLegacy.class); + + private static final ObjectReader USER_LIST_READER = JSON_READER.forType(OuraUsers.class); + private static final ObjectReader USER_READER = JSON_READER.forType(User.class); + private static final ObjectReader OAUTH_READER = JSON_READER.forType(OAuth2UserCredentials.class); + private static final RequestBody EMPTY_BODY = + RequestBody.create("", MediaType.parse("application/json; charset=utf-8")); + private static final Duration FETCH_THRESHOLD = Duration.ofMinutes(1L); + private static final Duration CONNECTION_TIMEOUT = Duration.ofSeconds(60); + private static final Duration CONNECTION_READ_TIMEOUT = Duration.ofSeconds(90); + + private final OkHttpClient client; + private final Map cachedCredentials; + private final AtomicReference nextFetch = new AtomicReference<>(MIN_INSTANT); + + private HttpUrl baseUrl; + private final HashSet containedUsers; + private Set timedCachedUsers = new HashSet<>(); + private OAuth2Client repositoryClient; + private String basicCredentials; + + public OuraServiceUserRepositoryLegacy() { + this.client = new OkHttpClient.Builder() + .connectTimeout(CONNECTION_TIMEOUT) + .readTimeout(CONNECTION_READ_TIMEOUT) + .build(); + this.cachedCredentials = new HashMap<>(); + this.containedUsers = new HashSet<>(); + } + + @Override + public User get(String key) throws IOException { + Request request = requestFor("users/" + key).build(); + return makeRequest(request, USER_READER); + } + + public void initialize(OuraRestSourceConnectorConfig config) { + OuraRestSourceConnectorConfig ouraConfig = (OuraRestSourceConnectorConfig) config; + this.baseUrl = ouraConfig.getOuraUserRepositoryUrl(); + this.containedUsers.addAll(ouraConfig.getOuraUsers()); + + URL tokenUrl = ouraConfig.getOuraUserRepositoryTokenUrl(); + String clientId = ouraConfig.getOuraUserRepositoryClientId(); + String clientSecret = ouraConfig.getOuraUserRepositoryClientSecret(); + + if (tokenUrl != null) { + if (clientId.isEmpty()) { + throw new ConfigException("Client ID for user repository is not set."); + } + this.repositoryClient = new OAuth2Client.Builder() + .credentials(clientId, clientSecret) + .endpoint(tokenUrl) + .scopes("SUBJECT.READ MEASUREMENT.CREATE") + .httpClient(client) + .build(); + } else if (clientId != null) { + basicCredentials = Credentials.basic(clientId, clientSecret); + } + } + + @Override + public Sequence stream() { + if (nextFetch.get().equals(MIN_INSTANT)) { + try { + applyPendingUpdates(); + } catch (IOException ex) { + logger.error("Failed to initially get users from repository", ex); + } + } + return SequencesKt.asSequence(this.timedCachedUsers.stream().iterator()); + } + + @Override + public String getAccessToken(User user) throws IOException, UserNotAuthorizedException { + if (!user.isAuthorized()) { + throw new UserNotAuthorizedException("User is not authorized"); + } + OAuth2UserCredentials credentials = cachedCredentials.get(user.getId()); + if (credentials != null && !credentials.isAccessTokenExpired()) { + return credentials.getAccessToken(); + } else { + Request request = requestFor("users/" + user.getId() + "/token").build(); + return requestAccessToken(user, request); + } + } + + @Override + public String refreshAccessToken(User user) throws IOException, UserNotAuthorizedException { + if (!user.isAuthorized()) { + throw new UserNotAuthorizedException("User is not authorized"); + } + Request request = requestFor("users/" + user.getId() + "/token") + .post(EMPTY_BODY) + .build(); + return requestAccessToken(user, request); + } + + private String requestAccessToken(User user, Request request) + throws UserNotAuthorizedException, IOException { + try { + OAuth2UserCredentials credentials = makeRequest(request, OAUTH_READER); + cachedCredentials.put(user.getId(), credentials); + return credentials.getAccessToken(); + } catch (HttpResponseException ex) { + if (ex.getStatusCode() == 407) { + cachedCredentials.remove(user.getId()); + if (user instanceof User) { + // ((User) user).setIsAuthorized(false); + } + throw new UserNotAuthorizedException(ex.getMessage()); + } + throw ex; + } + } + + public boolean hasPendingUpdates() { + Instant nextFetchTime = nextFetch.get(); + Instant now = Instant.now(); + return now.isAfter(nextFetchTime); + } + + public void applyPendingUpdates() throws IOException { + logger.info("Requesting user information from webservice"); + Request request = requestFor("users?source-type=Oura").build(); + this.timedCachedUsers = + this.makeRequest(request, USER_LIST_READER).getUsers().stream() + .filter(u -> u.isComplete() && (containedUsers.isEmpty() + || containedUsers.contains(u.getVersionedId()))) + .collect(Collectors.toSet()); + nextFetch.set(Instant.now().plus(FETCH_THRESHOLD)); + } + + private Request.Builder requestFor(String relativeUrl) throws IOException { + HttpUrl url = baseUrl.resolve(relativeUrl); + if (url == null) { + throw new IllegalArgumentException("Relative URL is invalid"); + } + Request.Builder builder = new Request.Builder().url(url); + String authorization = requestAuthorization(); + if (authorization != null) { + builder.addHeader("Authorization", authorization); + } + + return builder; + } + + private String requestAuthorization() throws IOException { + if (repositoryClient != null) { + try { + return "Bearer " + repositoryClient.getValidToken().getAccessToken(); + } catch (TokenException ex) { + throw new IOException(ex); + } + } else if (basicCredentials != null) { + return basicCredentials; + } else { + return null; + } + } + + private T makeRequest(Request request, ObjectReader reader) throws IOException { + logger.info("Requesting info from {}", request.url()); + try (Response response = client.newCall(request).execute()) { + ResponseBody body = response.body(); + + if (response.code() == 404) { + throw new NoSuchElementException("URL " + request.url() + " does not exist"); + } else if (!response.isSuccessful() || body == null) { + String message = "Failed to make request"; + if (response.code() > 0) { + message += " (HTTP status code " + response.code() + ')'; + } + if (body != null) { + message += body.string(); + } + throw new HttpResponseException(message, response.code()); + } + String bodyString = body.string(); + try { + return reader.readValue(bodyString); + } catch (JsonProcessingException ex) { + logger.error("Failed to parse JSON: {}\n{}", ex, bodyString); + throw ex; + } + } catch (ProtocolException ex) { + throw new IOException("Failed to make request to user repository", ex); + } + } + } \ No newline at end of file diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraUserRepository.kt b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraUserRepository.kt new file mode 100644 index 00000000..3a21cd5d --- /dev/null +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraUserRepository.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.radarbase.connect.rest.oura.user + +import org.radarbase.connect.rest.oura.OuraRestSourceConnectorConfig +import org.radarbase.oura.user.User +import org.radarbase.oura.user.UserNotAuthorizedException +import org.radarbase.oura.user.UserRepository +import org.slf4j.LoggerFactory +import java.io.IOException + +@Suppress("unused") +abstract class OuraUserRepository : UserRepository { + abstract fun initialize(config: OuraRestSourceConnectorConfig) + + @Throws(IOException::class, UserNotAuthorizedException::class) + abstract fun refreshAccessToken(user: User): String + + @Throws(IOException::class) + abstract fun applyPendingUpdates() + + abstract fun hasPendingUpdates(): Boolean + + companion object { + private val logger = LoggerFactory.getLogger(OuraUserRepository::class.java) + } +} diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/UserNotAuthorizedException.java b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/UserNotAuthorizedException.java index ce705e7f..7832cf0b 100644 --- a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/UserNotAuthorizedException.java +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/UserNotAuthorizedException.java @@ -17,7 +17,7 @@ package org.radarbase.connect.rest.oura.user; -public class UserNotAuthorizedException extends Exception { +public class UserNotAuthorizedException extends RuntimeException { public UserNotAuthorizedException(String message) { super(message); } diff --git a/oura-library/src/main/kotlin/org/radarbase/oura/converter/OuraRingConfigurationConverter.kt b/oura-library/src/main/kotlin/org/radarbase/oura/converter/OuraRingConfigurationConverter.kt index 53d321fc..302ef722 100644 --- a/oura-library/src/main/kotlin/org/radarbase/oura/converter/OuraRingConfigurationConverter.kt +++ b/oura-library/src/main/kotlin/org/radarbase/oura/converter/OuraRingConfigurationConverter.kt @@ -17,12 +17,18 @@ class OuraRingConfigurationConverter( root: JsonNode, user: User, ): Sequence> { - val array = root.get("data") - ?: return emptySequence() + val array = + root.get("data") + ?: return emptySequence() return array.asSequence() .mapCatching { - val setupTime = OffsetDateTime.parse(it["set_up_at"].textValue()) - val setupTimeInstant = setupTime.toInstant() + val setUpAt = it["set_up_at"] + val setupTimeInstant = + setUpAt?.textValue()?.let { + OffsetDateTime.parse( + it, + ) + }?.toInstant() TopicData( key = user.observationKey, topic = topic, @@ -32,9 +38,7 @@ class OuraRingConfigurationConverter( } } - private fun JsonNode.toRingConfiguration( - setupTime: Instant, - ): OuraRingConfiguration { + private fun JsonNode.toRingConfiguration(setupTime: Instant?): OuraRingConfiguration { val data = this return OuraRingConfiguration.newBuilder().apply { time = System.currentTimeMillis() / 1000.0 @@ -44,7 +48,10 @@ class OuraRingConfigurationConverter( design = data.get("design").textValue()?.classifyDesign() firmwareVersion = data.get("firmware_version").textValue() hardwareType = data.get("hardware_type").textValue()?.classifyHardware() - setUpAt = setupTime.toEpochMilli() / 1000.0 + setUpAt = + setupTime?.toEpochMilli()?.let { + it / 1000.0 + } size = data.get("size").intValue() }.build() } @@ -77,6 +84,7 @@ class OuraRingConfigurationConverter( else -> OuraRingHardwareType.UNKNOWN } } + companion object { val logger = LoggerFactory.getLogger(OuraRingConfigurationConverter::class.java) } diff --git a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt index 9a74ef4e..d7729270 100644 --- a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt +++ b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt @@ -16,14 +16,14 @@ import java.time.Duration import java.time.Instant import kotlin.streams.asSequence -class OuraRequestGenerator @JvmOverloads +class OuraRequestGenerator +@JvmOverloads constructor( private val userRepository: UserRepository, private val defaultQueryRange: Duration = Duration.ofDays(15), private val ouraOffsetManager: OuraOffsetManager, public val routes: List = OuraRouteFactory.getRoutes(userRepository), ) : RequestGenerator { - private val userNextRequest: MutableMap = mutableMapOf() public var nextRequestTime: Instant = Instant.MIN @@ -31,7 +31,10 @@ constructor( private val shouldBackoff: Boolean get() = Instant.now() < nextRequestTime - override fun requests(user: User, max: Int): Sequence { + override fun requests( + user: User, + max: Int, + ): Sequence { return if (user.ready()) { routes.asSequence() .flatMap { route -> @@ -43,7 +46,10 @@ constructor( } } - override fun requests(route: Route, max: Int): Sequence { + override fun requests( + route: Route, + max: Int, + ): Sequence { return userRepository .stream() .flatMap { user -> @@ -56,7 +62,11 @@ constructor( .takeWhile { !shouldBackoff } } - override fun requests(route: Route, user: User, max: Int): Sequence { + override fun requests( + route: Route, + user: User, + max: Int, + ): Sequence { return if (user.ready()) { return generateRequests(route, user).takeWhile { !shouldBackoff } } else { @@ -64,16 +74,21 @@ constructor( } } - fun generateRequests(route: Route, user: User): Sequence { + fun generateRequests( + route: Route, + user: User, + ): Sequence { val offset = ouraOffsetManager.getOffset(route, user) val startDate = user.startDate - val startOffset: Instant = if (offset == null) { - logger.debug("No offsets found for $user, using the start date.") - startDate - } else { - logger.debug("Offsets found in persistence.") - offset.offset.coerceAtLeast(startDate) - } + val startOffset: Instant = + if (offset == null) { + logger.info("No offsets found for $user, using the start date.") + startDate + } else { + val offsetTime = offset.offset + logger.info("Offsets found in persistence: " + offsetTime.toString()) + offsetTime.coerceAtLeast(startDate) + } val endDate = if (user.endDate >= Instant.now()) Instant.now() else user.endDate if (Duration.between(startOffset, endDate).toDays() <= ONE_DAY) { logger.info("Interval between dates is too short. Backing off..") @@ -84,25 +99,31 @@ constructor( return route.generateRequests(user, startOffset, endTime, USER_MAX_REQUESTS) } - fun handleResponse(req: RestRequest, response: Response): OuraResult> { + fun handleResponse( + req: RestRequest, + response: Response, + ): OuraResult> { if (response.isSuccessful) { return OuraResult.Success>(requestSuccessful(req, response)) } else { try { OuraResult.Error(requestFailed(req, response)) - } catch (e: TooManyRequestsException) {} finally { + } catch (e: TooManyRequestsException) { + } finally { return OuraResult.Success(listOf()) } } } - override fun requestSuccessful(request: RestRequest, response: Response): List { + override fun requestSuccessful( + request: RestRequest, + response: Response, + ): List { logger.debug("Request successful: {}..", request.request) val body: ResponseBody? = response.body val data = body?.bytes()!! - val records = request.route.converters.flatMap { - it.convert(request, response.headers, data) - } + val records = + request.route.converters.flatMap { it.convert(request, response.headers, data) } val offset = records.maxByOrNull { it -> it.offset }?.offset if (offset != null) { logger.info("Writing ${records.size} records to offsets...") @@ -111,6 +132,17 @@ constructor( request.user, Instant.ofEpochSecond(offset).plus(Duration.ofMillis(500)), ) + val currentNextRequestTime = userNextRequest[request.user.versionedId] + val nextRequestTime = Instant.now().plus(SUCCESS_BACK_OFF_TIME) + userNextRequest[request.user.versionedId] = + currentNextRequestTime?.let { + if (currentNextRequestTime > nextRequestTime) { + currentNextRequestTime + } else { + nextRequestTime + } + } + ?: nextRequestTime } else { if (request.startDate.plus(TIME_AFTER_REQUEST).isBefore(Instant.now())) { ouraOffsetManager.updateOffsets( @@ -123,7 +155,10 @@ constructor( return records } - override fun requestFailed(request: RestRequest, response: Response): OuraError { + override fun requestFailed( + request: RestRequest, + response: Response, + ): OuraError { return when (response.code) { 429 -> { logger.info("Too many requests, rate limit reached. Backing off...") @@ -132,10 +167,13 @@ constructor( } 403 -> { logger.warn( - "User ${request.user} has expired." + - "Please renew the subscription...", + "User ${request.user} has expired." + "Please renew the subscription...", ) - userNextRequest[request.user.versionedId] = Instant.now().plus(USER_BACK_OFF_TIME) + userNextRequest[request.user.versionedId] = + Instant.now() + .plus( + USER_BACK_OFF_TIME, + ) OuraAccessForbiddenError( "Oura subscription has expired or API data not available..", IOException("Unauthorized"), @@ -145,9 +183,14 @@ constructor( 401 -> { logger.warn( "User ${request.user} access token is" + - " expired, malformed, or revoked. " + response.body?.string(), + " expired, malformed, or revoked. " + + response.body?.string(), ) - userNextRequest[request.user.versionedId] = Instant.now().plus(USER_BACK_OFF_TIME) + userNextRequest[request.user.versionedId] = + Instant.now() + .plus( + USER_BACK_OFF_TIME, + ) OuraUnauthorizedAccessError( "Access token expired or revoked..", IOException("Unauthorized"), @@ -173,7 +216,11 @@ constructor( } 404 -> { logger.warn("Not found..") - OuraNotFoundError(response.body!!.string(), IOException("Data not found"), "404") + OuraNotFoundError( + response.body!!.string(), + IOException("Data not found"), + "404", + ) } else -> { logger.warn("Request Failed: {}, {}", request, response) @@ -196,6 +243,7 @@ constructor( private val ONE_DAY = 1L private val TIME_AFTER_REQUEST = Duration.ofDays(30) private val USER_BACK_OFF_TIME = Duration.ofMinutes(2L) + private val SUCCESS_BACK_OFF_TIME = Duration.ofSeconds(3L) private val USER_MAX_REQUESTS = 20 val JSON_FACTORY = JsonFactory() val JSON_READER = ObjectMapper(JSON_FACTORY).registerModule(JavaTimeModule()).reader()