From 1039b5733d914cc492d51fbb0e50d12bdec4957a Mon Sep 17 00:00:00 2001 From: Yatharth Ranjan Date: Thu, 21 Nov 2019 14:50:07 +0000 Subject: [PATCH 1/7] snapshot version --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 99bf71d0..dfd2ccf2 100644 --- a/build.gradle +++ b/build.gradle @@ -11,7 +11,7 @@ subprojects { apply plugin: 'java-library' group = 'org.radarbase' - version = '0.2.4' + version = '0.2.5-SNAPSHOT' sourceCompatibility = 1.8 targetCompatibility = 1.8 From 7dc03687f298e5223ddbbfc3871dc0a2c5de472b Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Mon, 9 Dec 2019 16:50:24 +0100 Subject: [PATCH 2/7] Version the user so it can be reset --- .../rest/fitbit/FitbitSourceConnector.java | 2 +- .../request/FitbitRequestGenerator.java | 4 ++-- .../rest/fitbit/route/FitbitPollingRoute.java | 10 ++++----- .../connect/rest/fitbit/user/LocalUser.java | 22 ++++++++++++++----- .../fitbit/user/ServiceUserRepository.java | 2 +- .../connect/rest/fitbit/user/User.java | 11 ++++++++++ .../rest/fitbit/user/YamlUserRepository.java | 4 ++-- 7 files changed, 38 insertions(+), 17 deletions(-) diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitSourceConnector.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitSourceConnector.java index 8ef73971..af6b14c4 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitSourceConnector.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitSourceConnector.java @@ -98,7 +98,7 @@ private List> configureTasks(int maxTasks) { try { List> userTasks = fitbitConfig.getUserRepository().stream() - .map(User::getId) + .map(User::getVersionedId) // group users based on their hashCode // in principle this allows for more efficient reconfigurations for a fixed number of tasks, // since that allows existing tasks to only handle small modifications users to handle. diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/request/FitbitRequestGenerator.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/request/FitbitRequestGenerator.java index 25589057..b4b0a98f 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/request/FitbitRequestGenerator.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/request/FitbitRequestGenerator.java @@ -105,7 +105,7 @@ public OkHttpClient getClient(User user) { public Map> getPartitions(String route) { try { return userRepository.stream() - .collect(Collectors.toMap(User::getId, u -> getPartition(route, u))); + .collect(Collectors.toMap(User::getVersionedId, u -> getPartition(route, u))); } catch (IOException e) { logger.warn("Failed to initialize user partitions for route {}: {}", route, e.toString()); return Collections.emptyMap(); @@ -114,7 +114,7 @@ public Map> getPartitions(String route) { public Map getPartition(String route, User user) { Map partition = new HashMap<>(4); - partition.put("user", user.getId()); + partition.put("user", user.getVersionedId()); partition.put("route", route); return partition; } diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java index df1b212f..4e636ce5 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java @@ -143,7 +143,7 @@ public void initialize(RestSourceConnectorConfig config) { @Override public void requestSucceeded(RestRequest request, SourceRecord record) { lastPollPerUser.put(((FitbitRestRequest) request).getUser().getId(), lastPoll); - String userKey = ((FitbitRestRequest) request).getUser().getId(); + String userKey = ((FitbitRestRequest) request).getUser().getVersionedId(); Instant offset = Instant.ofEpochMilli((Long) record.sourceOffset().get(TIMESTAMP_OFFSET_KEY)); offsets.put(userKey, offset); } @@ -154,7 +154,7 @@ public void requestEmpty(RestRequest request) { FitbitRestRequest fitbitRequest = (FitbitRestRequest) request; Instant endOffset = fitbitRequest.getDateRange().end().toInstant(); if (DAYS.between(endOffset, lastPoll) >= HISTORICAL_TIME_DAYS) { - String key = fitbitRequest.getUser().getId(); + String key = fitbitRequest.getUser().getVersionedId(); offsets.put(key, endOffset); } } @@ -197,7 +197,7 @@ public Stream requests() { return userRepository.stream() .map(u -> new AbstractMap.SimpleImmutableEntry<>(u, nextPoll(u))) .filter(u -> lastPoll.isAfter(u.getValue())) - .sorted(Comparator.comparing(Map.Entry::getValue)) + .sorted(Map.Entry.comparingByValue()) .flatMap(u -> this.createRequests(u.getKey())) .filter(Objects::nonNull); } catch (IOException e) { @@ -216,7 +216,7 @@ public Instant getTimeOfNextRequest() { } private Map getPartition(User user) { - return partitions.computeIfAbsent(user.getId(), + return partitions.computeIfAbsent(user.getVersionedId(), k -> generator.getPartition(routeName, user)); } @@ -279,7 +279,7 @@ public Instant getLastPoll() { } protected Instant getOffset(User user) { - return offsets.getOrDefault(user.getId(), user.getStartDate().minus(ONE_NANO)); + return offsets.getOrDefault(user.getVersionedId(), user.getStartDate().minus(ONE_NANO)); } /** diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/LocalUser.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/LocalUser.java index 1160612a..cc0119a7 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/LocalUser.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/LocalUser.java @@ -34,6 +34,7 @@ public class LocalUser implements User { private static final Pattern ILLEGAL_CHARACTERS_PATTERN = Pattern.compile("[^a-zA-Z0-9_-]"); private String id; + private String version; private String externalUserId; private String projectId; private String userId; @@ -94,9 +95,19 @@ public void setFitbitUserId(String id) { this.externalUserId = id; } + @Override + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + public LocalUser copy() { LocalUser copy = new LocalUser(); copy.id = id; + copy.version = version; copy.externalUserId = externalUserId; copy.projectId = projectId; copy.userId = userId; @@ -116,7 +127,8 @@ public synchronized SchemaAndValue getObservationKey(AvroData avroData) { @Override public String toString() { - return "LocalUser{" + "id='" + id + '\'' + return "LocalUser{id='" + id + '\'' + + ", version='" + version + '\'' + ", externalUserId='" + externalUserId + '\'' + ", projectId='" + projectId + '\'' + ", userId='" + userId + '\'' @@ -134,19 +146,17 @@ public boolean equals(Object o) { } LocalUser localUser = (LocalUser) o; return Objects.equals(id, localUser.id) + && Objects.equals(version, localUser.version) && Objects.equals(externalUserId, localUser.externalUserId) && Objects.equals(projectId, localUser.projectId) && Objects.equals(userId, localUser.userId) && Objects.equals(sourceId, localUser.sourceId) && Objects.equals(startDate, localUser.startDate) - && Objects.equals(endDate, localUser.endDate) - && Objects.equals(observationKey, localUser.observationKey); + && Objects.equals(endDate, localUser.endDate); } @Override public int hashCode() { - - return Objects - .hash(id, externalUserId, projectId, userId, sourceId, startDate, endDate, observationKey); + return Objects.hash(id, version); } } diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.java index 41bf64e4..7a8d5e16 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.java @@ -99,7 +99,7 @@ public Stream stream() throws IOException { this.timedCachedUsers = this.makeRequest(request, USER_LIST_READER).getUsers().stream() .filter(u -> u.isComplete() - && (containedUsers.isEmpty() || containedUsers.contains(u.getId()))) + && (containedUsers.isEmpty() || containedUsers.contains(u.getVersionedId()))) .collect(Collectors.toSet()); return this.timedCachedUsers.stream(); diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/User.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/User.java index da0ec3f0..49e9e90b 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/User.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/User.java @@ -31,6 +31,8 @@ static SchemaAndValue computeObservationKey(AvroData avroData, User user) { String getId(); + String getVersion(); + String getExternalUserId(); String getProjectId(); @@ -43,6 +45,15 @@ static SchemaAndValue computeObservationKey(AvroData avroData, User user) { String getSourceId(); + default String getVersionedId() { + String version = getVersion(); + if (version == null) { + return getId(); + } else { + return getId() + "#" + version; + } + } + SchemaAndValue getObservationKey(AvroData avroData); default Boolean isComplete() { diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/YamlUserRepository.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/YamlUserRepository.java index e59f21b6..cbcf35f4 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/YamlUserRepository.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/YamlUserRepository.java @@ -134,7 +134,7 @@ public Stream stream() { Stream users = this.users.values().stream() .filter(lockedTest(u -> u.getOAuth2Credentials().hasRefreshToken())); if (!configuredUsers.isEmpty()) { - users = users.filter(lockedTest(u -> configuredUsers.contains(u.getId()))); + users = users.filter(lockedTest(u -> configuredUsers.contains(u.getVersionedId()))); } return users.map(lockedApply(LocalUser::copy)); } @@ -297,7 +297,7 @@ private void store(Path path, LocalUser user) { * Local user that is protected by a multi-threading lock to avoid simultaneous IO * and modifications. */ - private final class LockedUser { + private static final class LockedUser { final Lock lock = new ReentrantLock(); final LocalUser user; final Path path; From f4d6d47d748d1c87649bb737c3351ffc8e0b628c Mon Sep 17 00:00:00 2001 From: Yatharth Ranjan Date: Wed, 22 Jan 2020 19:34:18 +0000 Subject: [PATCH 3/7] Use HTTPS in repo urls maven has stopped supporting HTTP since Jan 15 2020. See https://stackoverflow.com/a/59764670/8175739 for more info --- build.gradle | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/build.gradle b/build.gradle index dfd2ccf2..42d5d120 100644 --- a/build.gradle +++ b/build.gradle @@ -18,10 +18,10 @@ subprojects { repositories { mavenCentral() - maven { url "http://packages.confluent.io/maven/" } - maven { url "http://repo.maven.apache.org/maven2" } + maven { url "https://packages.confluent.io/maven/" } + maven { url "https://repo.maven.apache.org/maven2" } jcenter() - maven { url 'http://oss.jfrog.org/artifactory/oss-snapshot-local/' } + maven { url 'https://oss.jfrog.org/artifactory/oss-snapshot-local/' } } } From 05cd1ca0e2e04e4a283d0b5f02c721bfe6656c43 Mon Sep 17 00:00:00 2001 From: yatharthranjan Date: Fri, 1 May 2020 20:29:21 +0100 Subject: [PATCH 4/7] Add support for pending updates from user repository --- .editorconfig | 7 +- .../FitbitRestSourceConnectorConfig.java | 71 ++++++++++++++++--- .../rest/fitbit/FitbitSourceConnector.java | 49 +++++++------ .../fitbit/request/TokenAuthenticator.java | 2 +- .../fitbit/user/ServiceUserRepository.java | 40 ++++++----- .../rest/fitbit/user/UserRepository.java | 15 ++++ .../rest/fitbit/user/YamlUserRepository.java | 18 ++++- 7 files changed, 148 insertions(+), 54 deletions(-) diff --git a/.editorconfig b/.editorconfig index 81765cc9..4801aa6a 100644 --- a/.editorconfig +++ b/.editorconfig @@ -10,8 +10,9 @@ insert_final_newline = true charset = utf-8 indent_style = space indent_size = 2 -continuation_indent_size = 4 +ij_continuation_indent_size = 4 +max_line_length=100 -[*.gradle,*.py] +[{*.gradle, *.py}] indent_size = 4 -continuation_indent_size = 8 +ij_continuation_indent_size = 8 diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java index d530efc2..9611cf4a 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java @@ -33,7 +33,6 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.NonEmptyString; -import org.apache.kafka.common.config.ConfigDef.Range; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Validator; import org.apache.kafka.common.config.ConfigDef.Width; @@ -123,21 +122,22 @@ public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig { private static final String FITBIT_INTRADAY_CALORIES_TOPIC_DISPLAY = "Intraday calories topic"; private static final String FITBIT_INTRADAY_CALORIES_TOPIC_DEFAULT = "connect_fitbit_intraday_calories"; - private final UserRepository userRepository; + public static final String FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_CONFIG = "fitbit.user.firebase.collection.fitbit.name"; + private static final String FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_DOC = "Firestore Collection for retrieving Fitbit Auth details. Only used when a Firebase based user repository is used."; + private static final String FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_DISPLAY = "Firebase Fitbit collection name."; + private static final String FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_DEFAULT = "fitbit"; + + public static final String FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_CONFIG = "fitbit.user.firebase.collection.user.name"; + private static final String FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_DOC = "Firestore Collection for retrieving User details. Only used when a Firebase based user repository is used."; + private static final String FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_DISPLAY = "Firebase User collection name."; + private static final String FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_DEFAULT = "users"; + + private UserRepository userRepository; private final Headers clientCredentials; - @SuppressWarnings("unchecked") public FitbitRestSourceConnectorConfig(ConfigDef config, Map parsedConfig, boolean doLog) { super(config, parsedConfig, doLog); - try { - userRepository = ((Class) - getClass(FITBIT_USER_REPOSITORY_CONFIG)).getDeclaredConstructor().newInstance(); - } catch (IllegalAccessException | InstantiationException - | InvocationTargetException | NoSuchMethodException e) { - throw new ConnectException("Invalid class for: " + SOURCE_PAYLOAD_CONVERTER_CONFIG, e); - } - String credentialString = getFitbitClient() + ":" + getFitbitClientSecret(); String credentialsBase64 = Base64.getEncoder().encodeToString( credentialString.getBytes(StandardCharsets.UTF_8)); @@ -318,6 +318,26 @@ public String toString() { ++orderInGroup, Width.SHORT, FITBIT_INTRADAY_CALORIES_TOPIC_DISPLAY) + + .define(FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_CONFIG, + Type.STRING, + FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_DEFAULT, + Importance.LOW, + FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_DOC, + group, + ++orderInGroup, + Width.SHORT, + FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_DISPLAY) + + .define(FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_CONFIG, + Type.STRING, + FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_DEFAULT, + Importance.LOW, + FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_DOC, + group, + ++orderInGroup, + Width.SHORT, + FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_DISPLAY) ; } @@ -333,11 +353,32 @@ public String getFitbitClientSecret() { return getPassword(FITBIT_API_SECRET_CONFIG).value(); } + public UserRepository getUserRepository(UserRepository reuse) { + if (reuse != null && reuse.getClass().equals(getClass(FITBIT_USER_REPOSITORY_CONFIG))) { + userRepository = reuse; + } else { + userRepository = createUserRepository(); + } + userRepository.initialize(this); + return userRepository; + } + public UserRepository getUserRepository() { userRepository.initialize(this); return userRepository; } + @SuppressWarnings("unchecked") + public UserRepository createUserRepository() { + try { + return ((Class) + getClass(FITBIT_USER_REPOSITORY_CONFIG)).getDeclaredConstructor().newInstance(); + } catch (IllegalAccessException | InstantiationException + | InvocationTargetException | NoSuchMethodException e) { + throw new ConnectException("Invalid class for: " + SOURCE_PAYLOAD_CONVERTER_CONFIG, e); + } + } + public String getFitbitIntradayStepsTopic() { return getString(FITBIT_INTRADAY_STEPS_TOPIC_CONFIG); } @@ -398,4 +439,12 @@ public Duration getTooManyRequestsCooldownInterval() { public String getFitbitIntradayCaloriesTopic() { return getString(FITBIT_INTRADAY_CALORIES_TOPIC_CONFIG); } + + public String getFitbitUserRepositoryFirestoreFitbitCollection() { + return getString(FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_CONFIG); + } + + public String getFitbitUserRepositoryFirestoreUserCollection() { + return getString(FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_CONFIG); + } } diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitSourceConnector.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitSourceConnector.java index af6b14c4..3b7a6edb 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitSourceConnector.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitSourceConnector.java @@ -28,12 +28,11 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import java.util.stream.Stream; - import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.radarbase.connect.rest.AbstractRestSourceConnector; import org.radarbase.connect.rest.fitbit.user.User; +import org.radarbase.connect.rest.fitbit.user.UserRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +41,7 @@ public class FitbitSourceConnector extends AbstractRestSourceConnector { private static final Logger logger = LoggerFactory.getLogger(FitbitSourceConnector.class); private ScheduledExecutorService executor; private Set configuredUsers; - + private UserRepository repository; @Override public void start(Map props) { @@ -50,18 +49,24 @@ public void start(Map props) { executor = Executors.newSingleThreadScheduledExecutor(); executor.scheduleAtFixedRate(() -> { - try { - logger.info("Requesting latest user details..."); - Set newUsers = getConfig(props, false).getUserRepository().stream() - .collect(Collectors.toSet()); - if (configuredUsers != null && !newUsers.equals(configuredUsers)) { - logger.info("User info mismatch found. Requesting reconfiguration..."); - reconfigure(); + if (repository.hasPendingUpdates()) { + try { + logger.info("Requesting latest user details..."); + repository.applyPendingUpdates(); + Set newUsers = + getConfig(props, false).getUserRepository(repository).stream() + .collect(Collectors.toSet()); + if (configuredUsers != null && !newUsers.equals(configuredUsers)) { + logger.info("User info mismatch found. Requesting reconfiguration..."); + reconfigure(); + } + } catch (IOException e) { + logger.warn("Failed to refresh users: {}", e.toString()); } - } catch (IOException e) { - logger.warn("Failed to refresh users: {}", e.toString()); + } else { + logger.info("No pending updates found. Not attempting to refresh users."); } - },0, 5, TimeUnit.MINUTES); + }, 0, 5, TimeUnit.MINUTES); } @Override @@ -78,7 +83,9 @@ private FitbitRestSourceConnectorConfig getConfig(Map conf, bool @Override public FitbitRestSourceConnectorConfig getConfig(Map conf) { - return getConfig(conf, true); + FitbitRestSourceConnectorConfig connectorConfig = getConfig(conf, true); + repository = connectorConfig.getUserRepository(repository); + return connectorConfig; } @Override @@ -94,14 +101,16 @@ public List> taskConfigs(int maxTasks) { private List> configureTasks(int maxTasks) { Map baseConfig = config.originalsStrings(); FitbitRestSourceConnectorConfig fitbitConfig = getConfig(baseConfig); + if (repository == null) { + repository = fitbitConfig.getUserRepository(null); + } // Divide the users over tasks try { - - List> userTasks = fitbitConfig.getUserRepository().stream() + List> userTasks = fitbitConfig.getUserRepository(repository).stream() .map(User::getVersionedId) - // group users based on their hashCode - // in principle this allows for more efficient reconfigurations for a fixed number of tasks, - // since that allows existing tasks to only handle small modifications users to handle. + // group users based on their hashCode, in principle, this allows for more efficient + // reconfigurations for a fixed number of tasks, since that allows existing tasks to + // only handle small modifications users to handle. .collect(Collectors.groupingBy( u -> Math.abs(u.hashCode()) % maxTasks, Collectors.joining(","))) @@ -114,7 +123,7 @@ private List> configureTasks(int maxTasks) { .collect(Collectors.toList()); this.configuredUsers = fitbitConfig.getUserRepository().stream() .collect(Collectors.toSet()); - logger.info("Received userTask Configs {}" , userTasks); + logger.info("Received userTask Configs {}", userTasks); return userTasks; } catch (IOException ex) { throw new ConfigException("Cannot read users", ex); diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/request/TokenAuthenticator.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/request/TokenAuthenticator.java index dac192d1..f8b1fd28 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/request/TokenAuthenticator.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/request/TokenAuthenticator.java @@ -55,7 +55,7 @@ public Request authenticate(Route requestRoute, Response response) throws IOExce .header("Authorization", "Bearer " + newAccessToken) .build(); } catch (NotAuthorizedException ex) { - logger.error("Cannot get a new refresh token for user {}. Cancelling request.", user); + logger.error("Cannot get a new refresh token for user {}. Cancelling request.", user, ex); return null; } } diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.java index 7a8d5e16..9a7e066c 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.java @@ -85,23 +85,7 @@ public void initialize(RestSourceConnectorConfig config) { } @Override - public Stream stream() throws IOException { - Instant nextFetchTime = nextFetch.get(); - Instant now = Instant.now(); - if (!now.isAfter(nextFetchTime) - || !nextFetch.compareAndSet(nextFetchTime, now.plus(FETCH_THRESHOLD))) { - logger.debug("Providing cached user information..."); - return timedCachedUsers.stream(); - } - - logger.info("Requesting user information from webservice"); - Request request = requestFor("users?source-type=FitBit").build(); - this.timedCachedUsers = - this.makeRequest(request, USER_LIST_READER).getUsers().stream() - .filter(u -> u.isComplete() - && (containedUsers.isEmpty() || containedUsers.contains(u.getVersionedId()))) - .collect(Collectors.toSet()); - + public Stream stream() { return this.timedCachedUsers.stream(); } @@ -124,6 +108,28 @@ public String refreshAccessToken(User user) throws IOException, NotAuthorizedExc return credentials.getAccessToken(); } + @Override + public boolean hasPendingUpdates() { + Instant nextFetchTime = nextFetch.get(); + Instant now = Instant.now(); + return now.isAfter(nextFetchTime); + } + + @Override + public void applyPendingUpdates() throws IOException { + logger.info("Requesting user information from webservice"); + Request request = requestFor("users?source-type=FitBit").build(); + this.timedCachedUsers = + this.makeRequest(request, USER_LIST_READER).getUsers().stream() + .filter( + u -> + u.isComplete() + && (containedUsers.isEmpty() + || containedUsers.contains(u.getVersionedId()))) + .collect(Collectors.toSet()); + nextFetch.set(Instant.now().plus(FETCH_THRESHOLD)); + } + private Request.Builder requestFor(String relativeUrl) { HttpUrl url = baseUrl.resolve(relativeUrl); if (url == null) { diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/UserRepository.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/UserRepository.java index 34eacec0..fc757c6a 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/UserRepository.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/UserRepository.java @@ -58,4 +58,19 @@ public interface UserRepository extends RestSourceTool { * @throws java.util.NoSuchElementException if the user does not exists in this repository. */ String refreshAccessToken(User user) throws IOException, NotAuthorizedException; + + /** + * The functions allows the repository to supply when there are pending updates. + * This gives more control to the user repository in updating and caching users. + * @return {@code true} if there are new updates available, {@code false} otherwise. + */ + boolean hasPendingUpdates(); + + /** + * Apply any pending updates to users. This could include, for instance, refreshing a cache + * of users with latest information. + * This is called when {@link #hasPendingUpdates()} is {@code true}. + * @throws IOException if there was an error when applying updates. + */ + void applyPendingUpdates() throws IOException; } diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/YamlUserRepository.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/YamlUserRepository.java index cbcf35f4..60ca17de 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/YamlUserRepository.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/YamlUserRepository.java @@ -112,7 +112,10 @@ private void updateUsers() { || !nextFetch.compareAndSet(nextFetchTime, now.plus(FETCH_THRESHOLD))) { return; } + forceUpdateUsers(); + } + private void forceUpdateUsers() { try { Map newMap = Files.walk(credentialsDir) .filter(p -> Files.isRegularFile(p) @@ -129,8 +132,6 @@ private void updateUsers() { @Override public Stream stream() { - updateUsers(); - Stream users = this.users.values().stream() .filter(lockedTest(u -> u.getOAuth2Credentials().hasRefreshToken())); if (!configuredUsers.isEmpty()) { @@ -177,6 +178,19 @@ public String refreshAccessToken(User user) throws IOException { return refreshAccessToken(user, NUM_RETRIES); } + @Override + public boolean hasPendingUpdates() { + Instant nextFetchTime = nextFetch.get(); + Instant now = Instant.now(); + return now.isAfter(nextFetchTime); + } + + @Override + public void applyPendingUpdates() { + forceUpdateUsers(); + nextFetch.set(Instant.now().plus(FETCH_THRESHOLD)); + } + /** * Refreshes the Fitbit access token on the current host, using the locally stored refresh token. * If successful, the tokens are locally stored. From f4033b9b5bfa44f3a8ba464c51147570a205f7b1 Mon Sep 17 00:00:00 2001 From: yatharthranjan Date: Fri, 1 May 2020 20:45:13 +0100 Subject: [PATCH 5/7] Add support for firebase firestore database. - adds specific support for data structure for covid collab in firestore --- kafka-connect-fitbit-source/build.gradle | 1 + .../CovidCollabFirebaseUserRepository.java | 242 ++++++++++++++++++ .../firebase/FirebaseFitbitAuthDetails.java | 94 +++++++ .../fitbit/user/firebase/FirebaseUser.java | 149 +++++++++++ .../user/firebase/FirebaseUserDetails.java | 32 +++ .../user/firebase/FirebaseUserRepository.java | 92 +++++++ .../firebase/FitbitOAuth2UserCredentials.java | 131 ++++++++++ .../user/firebase/FitbitTokenService.java | 87 +++++++ 8 files changed, 828 insertions(+) create mode 100644 kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/CovidCollabFirebaseUserRepository.java create mode 100644 kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseFitbitAuthDetails.java create mode 100644 kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUser.java create mode 100644 kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUserDetails.java create mode 100644 kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUserRepository.java create mode 100644 kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FitbitOAuth2UserCredentials.java create mode 100644 kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FitbitTokenService.java diff --git a/kafka-connect-fitbit-source/build.gradle b/kafka-connect-fitbit-source/build.gradle index 3350f562..b05f1632 100644 --- a/kafka-connect-fitbit-source/build.gradle +++ b/kafka-connect-fitbit-source/build.gradle @@ -6,6 +6,7 @@ dependencies { implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: jacksonVersion implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: jacksonVersion + implementation 'com.google.firebase:firebase-admin:6.12.2' // Included in connector runtime compileOnly group: 'org.apache.kafka', name: 'connect-api', version: kafkaVersion diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/CovidCollabFirebaseUserRepository.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/CovidCollabFirebaseUserRepository.java new file mode 100644 index 00000000..6fdf54fc --- /dev/null +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/CovidCollabFirebaseUserRepository.java @@ -0,0 +1,242 @@ +package org.radarbase.connect.rest.fitbit.user.firebase; + +import com.google.cloud.firestore.CollectionReference; +import com.google.cloud.firestore.DocumentChange; +import com.google.cloud.firestore.DocumentSnapshot; +import com.google.cloud.firestore.EventListener; +import com.google.cloud.firestore.FirestoreException; +import com.google.cloud.firestore.ListenerRegistration; +import com.google.cloud.firestore.QuerySnapshot; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; +import javax.ws.rs.NotAuthorizedException; +import org.radarbase.connect.rest.RestSourceConnectorConfig; +import org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig; +import org.radarbase.connect.rest.fitbit.user.User; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The User repository that supports the covid-collab application (https://covid-collab.org/). The + * data is stored in Firebase Firestore. This user repository reads data from this source and + * creates user objects. To ease the creation of objects a new {@link User} {@link FirebaseUser} is + * created. + * + *

Structure in Firestore is :- 1) Fitbit Collection -> User Document(uuid) -> Fitbit Details. 2) + * Users Collection -> User Document(uuid) -> User Details. + * + *

See {@link FirebaseFitbitAuthDetails} for the keys present in Ftibit Details for each User. + * See {@link FirebaseUserDetails} for the keys present in User Details for each User. + */ +public class CovidCollabFirebaseUserRepository extends FirebaseUserRepository { + + protected static final String FITBIT_TOKEN_ENDPOINT = "https://api.fitbit.com/oauth2/token"; + private static final Logger logger = + LoggerFactory.getLogger(CovidCollabFirebaseUserRepository.class); + + private Map cachedUsers = new HashMap<>(); + private CollectionReference userCollection; + private CollectionReference fitbitCollection; + private FitbitTokenService fitbitTokenService; + private List allowedUsers; + private ListenerRegistration fitbitCollectionListenerRegistration; + private boolean hasPendingUpdates = true; + + @Override + public User get(String key) throws IOException { + return this.cachedUsers.getOrDefault(key, createUser(key)); + } + + @Override + public Stream stream() { + return cachedUsers.values().stream(); + } + + @Override + public String getAccessToken(User user) throws IOException, NotAuthorizedException { + FitbitOAuth2UserCredentials credentials = + cachedUsers.get(user.getId()).getFitbitAuthDetails().getOauth2Credentials(); + if (credentials == null || credentials.isAccessTokenExpired()) { + return refreshAccessToken(user); + } + return credentials.getAccessToken(); + } + + @Override + public String refreshAccessToken(User user) throws IOException, NotAuthorizedException { + FirebaseFitbitAuthDetails authDetails = cachedUsers.get(user.getId()).getFitbitAuthDetails(); + + logger.debug("Refreshing token for User: {}", cachedUsers.get(user.getId())); + if (!authDetails.getOauth2Credentials().hasRefreshToken()) { + logger.error("No refresh Token present"); + throw new NotAuthorizedException("The user does not contain a refresh token"); + } + + // Make call to fitbit to get new refresh and access token. + logger.info("Requesting to refreshToken."); + FitbitOAuth2UserCredentials userCredentials = + fitbitTokenService.refreshToken(authDetails.getOauth2Credentials().getRefreshToken()); + logger.debug("Token Refreshed."); + + if (userCredentials.hasRefreshToken() && userCredentials.getAccessToken() != null) { + authDetails.setOauth2Credentials(userCredentials); + updateDocument(fitbitCollection.document(user.getId()), authDetails); + this.cachedUsers.get(user.getId()).setFitbitAuthDetails(authDetails); + return userCredentials.getAccessToken(); + } else { + throw new IOException("There was a problem refreshing the token."); + } + } + + @Override + public boolean hasPendingUpdates() { + return this.hasPendingUpdates; + } + + @Override + public void applyPendingUpdates() throws IOException { + if (this.hasPendingUpdates()) { + this.hasPendingUpdates = false; + } else { + throw new IOException( + "No pending updates available. Try calling this method only when updates are available"); + } + } + + @Override + public void initialize(RestSourceConnectorConfig config) { + super.initialize(config); + + FitbitRestSourceConnectorConfig fitbitConfig = (FitbitRestSourceConnectorConfig) config; + this.fitbitCollection = + getFirestore().collection(fitbitConfig.getFitbitUserRepositoryFirestoreFitbitCollection()); + this.userCollection = + getFirestore().collection(fitbitConfig.getFitbitUserRepositoryFirestoreUserCollection()); + + this.fitbitTokenService = + new FitbitTokenService( + fitbitConfig.getFitbitClient(), + fitbitConfig.getFitbitClientSecret(), + FITBIT_TOKEN_ENDPOINT); + + /** + * Currently, we only listen for the fitbit collection, as it contains most information while + * the user collection only contains project Id which is not supposed to change. The user + * document is pulled every time the corresponding fitbit document is pulled, so it will be + * sufficiently upto date. Moreover, not every document in the user collection will have linked + * the fitbit. In the future, we might listen to user collection too if required. + */ + if (this.fitbitCollectionListenerRegistration == null) { + this.fitbitCollectionListenerRegistration = initListener(fitbitCollection, this::onEvent); + logger.info("Added listener to Fitbit collection for real-time updates."); + } + + this.allowedUsers = fitbitConfig.getFitbitUsers(); + } + + private ListenerRegistration initListener( + CollectionReference collectionReference, EventListener eventListener) { + return collectionReference.addSnapshotListener(eventListener); + } + + protected FirebaseUser createUser(String uuid) throws IOException { + DocumentSnapshot fitbitDocumentSnapshot = getDocument(uuid, fitbitCollection); + DocumentSnapshot userDocumentSnapshot = getDocument(uuid, userCollection); + + return createUser(userDocumentSnapshot, fitbitDocumentSnapshot); + } + + protected FirebaseUser createUser( + DocumentSnapshot userSnapshot, DocumentSnapshot fitbitSnapshot) { + // Get the fitbit document for the user which contains Auth Info + FirebaseFitbitAuthDetails authDetails = + fitbitSnapshot.toObject(FirebaseFitbitAuthDetails.class); + // Get the user document for the user which contains User Details + FirebaseUserDetails userDetails = userSnapshot.toObject(FirebaseUserDetails.class); + + logger.debug("Auth details: {}", authDetails); + logger.debug("User Details: {}", userDetails); + + // if auth details are not available, skip this user. + if (authDetails == null || authDetails.getOauth2Credentials() == null) { + logger.warn( + "The auth details for user {} in the database are not valid. Skipping...", + fitbitSnapshot.getId()); + return null; + } + + // If no user details found, create one with default project. + if (userDetails == null) { + userDetails = new FirebaseUserDetails(); + } + + FirebaseUser user = new FirebaseUser(); + user.setUuid(fitbitSnapshot.getId()); + user.setUserId(fitbitSnapshot.getId()); + user.setFitbitAuthDetails(authDetails); + user.setFirebaseUserDetails(userDetails); + return user; + } + + private void updateUser(DocumentSnapshot fitbitDocumentSnapshot) { + try { + FirebaseUser user = + createUser( + getDocument(fitbitDocumentSnapshot.getId(), userCollection), fitbitDocumentSnapshot); + logger.debug("User to be updated: {}", user); + if (user != null + && user.isComplete() + && (allowedUsers.isEmpty() || allowedUsers.contains(user.getId()))) { + FirebaseUser user1 = this.cachedUsers.put(fitbitDocumentSnapshot.getId(), user); + if (user1 == null) { + logger.info("Created new User: {}", fitbitDocumentSnapshot.getId()); + } else { + logger.info("Updated existing user: {}", user1); + logger.debug("Updated user is: {}", user); + } + this.hasPendingUpdates = true; + } else { + removeUser(fitbitDocumentSnapshot); + } + } catch (IOException e) { + logger.error( + "The update of the user {} was not possible.", fitbitDocumentSnapshot.getId(), e); + } + } + + private void removeUser(DocumentSnapshot documentSnapshot) { + FirebaseUser user = this.cachedUsers.remove(documentSnapshot.getId()); + if (user != null) { + logger.info("Removed User: {}:", user); + this.hasPendingUpdates = true; + } + } + + private void onEvent(QuerySnapshot snapshots, FirestoreException e) { + if (e != null) { + logger.warn("Listen for updates failed: " + e); + return; + } + + logger.debug( + "OnEvent Called: {}, {}", + snapshots.getDocumentChanges().size(), + snapshots.getDocuments().size()); + for (DocumentChange dc : snapshots.getDocumentChanges()) { + logger.debug("Type: {}", dc.getType()); + switch (dc.getType()) { + case ADDED: + case MODIFIED: + this.updateUser(dc.getDocument()); + break; + case REMOVED: + this.removeUser(dc.getDocument()); + default: + break; + } + } + } +} diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseFitbitAuthDetails.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseFitbitAuthDetails.java new file mode 100644 index 00000000..10ed0d9e --- /dev/null +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseFitbitAuthDetails.java @@ -0,0 +1,94 @@ +package org.radarbase.connect.rest.fitbit.user.firebase; + +import com.google.cloud.firestore.annotation.Exclude; +import com.google.cloud.firestore.annotation.IgnoreExtraProperties; +import com.google.cloud.firestore.annotation.PropertyName; +import java.time.Instant; +import java.util.Date; +import java.util.UUID; + +/** + * POJO corresponding to the Fitbit Auth details document for a user in Firestore. Currently, + * consists of OAuth 2 details, sourceId and date range for data collection. + */ +@IgnoreExtraProperties +public class FirebaseFitbitAuthDetails { + + protected static final String DEFAULT_SOURCE_ID = "fitbit"; + private String sourceId = getDefaultSourceId(); + private Date startDate; + private Date endDate; + private String version; + + private FitbitOAuth2UserCredentials oauth2Credentials; + + public FirebaseFitbitAuthDetails() { + this.oauth2Credentials = new FitbitOAuth2UserCredentials(); + this.startDate = Date.from(Instant.parse("2017-01-01T00:00:00Z")); + this.endDate = Date.from(Instant.parse("9999-12-31T23:59:59.999Z")); + this.version = null; + } + + @Exclude + protected static String getDefaultSourceId() { + return DEFAULT_SOURCE_ID + "-" + UUID.randomUUID(); + } + + @PropertyName("version") + public String getVersion() { + return version; + } + + @PropertyName("version") + public void setVersion(String version) { + if (version != null && !version.trim().isEmpty()) { + this.version = version; + } + } + + @PropertyName("start_date") + public Date getStartDate() { + return startDate; + } + + @PropertyName("start_date") + public void setStartDate(Date startDate) { + if (startDate != null) { + this.startDate = startDate; + } + } + + @PropertyName("end_date") + public Date getEndDate() { + return endDate; + } + + @PropertyName("end_date") + public void setEndDate(Date endDate) { + if (endDate != null) { + this.endDate = endDate; + } + } + + @PropertyName("source_id") + public String getSourceId() { + return sourceId; + } + + @PropertyName("source_id") + public void setSourceId(String sourceId) { + if (sourceId != null && !sourceId.trim().isEmpty()) { + this.sourceId = sourceId; + } + } + + @PropertyName("oauth2") + public FitbitOAuth2UserCredentials getOauth2Credentials() { + return oauth2Credentials; + } + + @PropertyName("oauth2") + public void setOauth2Credentials(FitbitOAuth2UserCredentials oauth2Credentials) { + this.oauth2Credentials = oauth2Credentials; + } +} diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUser.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUser.java new file mode 100644 index 00000000..bb61431d --- /dev/null +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUser.java @@ -0,0 +1,149 @@ +package org.radarbase.connect.rest.fitbit.user.firebase; + +import io.confluent.connect.avro.AvroData; +import java.time.Instant; +import java.util.Objects; +import java.util.regex.Pattern; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.radarbase.connect.rest.fitbit.user.User; + +/** + * {@link User} implementation corresponding with how users are stored in Firestore. The fitbit + * details are stored in one collection while user details in another collection. This combines the + * data from the two to form the User instance. + */ +public class FirebaseUser implements User { + + protected static final Pattern ILLEGAL_CHARACTERS_PATTERN = Pattern.compile("[^a-zA-Z0-9_-]"); + private String uuid; + private String userId; + + private FirebaseUserDetails firebaseUserDetails; + private FirebaseFitbitAuthDetails fitbitAuthDetails; + + private SchemaAndValue observationKey; + + public FirebaseUser() { + firebaseUserDetails = new FirebaseUserDetails(); + fitbitAuthDetails = new FirebaseFitbitAuthDetails(); + } + + public String getId() { + return this.uuid; + } + + public String getVersion() { + return fitbitAuthDetails.getVersion(); + } + + public void setUuid(String uuid) { + this.uuid = ILLEGAL_CHARACTERS_PATTERN.matcher(uuid).replaceAll("-"); + } + + public String getExternalUserId() { + return fitbitAuthDetails.getOauth2Credentials() == null + ? null + : fitbitAuthDetails.getOauth2Credentials().getUserId(); + } + + public String getProjectId() { + return firebaseUserDetails.getProjectId(); + } + + public String getUserId() { + return userId; + } + + public void setUserId(String userId) { + this.userId = userId; + } + + public Instant getStartDate() { + return fitbitAuthDetails.getStartDate().toInstant(); + } + + public Instant getEndDate() { + return fitbitAuthDetails.getEndDate().toInstant(); + } + + public String getSourceId() { + return fitbitAuthDetails.getSourceId(); + } + + public FirebaseUserDetails getFirebaseUserDetails() { + return firebaseUserDetails; + } + + public void setFirebaseUserDetails(FirebaseUserDetails firebaseUserDetails) { + this.firebaseUserDetails = firebaseUserDetails; + } + + public FirebaseFitbitAuthDetails getFitbitAuthDetails() { + return fitbitAuthDetails; + } + + public void setFitbitAuthDetails(FirebaseFitbitAuthDetails fitbitAuthDetails) { + this.fitbitAuthDetails = fitbitAuthDetails; + } + + public synchronized SchemaAndValue getObservationKey(AvroData avroData) { + if (observationKey == null) { + observationKey = User.computeObservationKey(avroData, this); + } + return observationKey; + } + + @Override + public String toString() { + return "FirebaseUser{" + + "uuid='" + + uuid + + '\'' + + ", fitbitUserId='" + + getExternalUserId() + + '\'' + + ", projectId='" + + getProjectId() + + '\'' + + ", userId='" + + userId + + '\'' + + ", sourceId='" + + fitbitAuthDetails.getSourceId() + + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FirebaseUser firebaseUser = (FirebaseUser) o; + return Objects.equals(uuid, firebaseUser.uuid) + && Objects.equals(getExternalUserId(), firebaseUser.getExternalUserId()) + && Objects.equals(getProjectId(), firebaseUser.getProjectId()) + && Objects.equals(userId, firebaseUser.userId) + && Objects.equals(getSourceId(), firebaseUser.getSourceId()) + && Objects.equals(getStartDate(), firebaseUser.getStartDate()) + && Objects.equals(getEndDate(), firebaseUser.getEndDate()) + && Objects.equals(observationKey, firebaseUser.observationKey); + } + + @Override + public int hashCode() { + + return Objects.hash( + uuid, + getExternalUserId(), + getProjectId(), + userId, + getSourceId(), + getStartDate(), + getEndDate(), + observationKey); + } +} diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUserDetails.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUserDetails.java new file mode 100644 index 00000000..1333345e --- /dev/null +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUserDetails.java @@ -0,0 +1,32 @@ +package org.radarbase.connect.rest.fitbit.user.firebase; + +import com.google.cloud.firestore.annotation.IgnoreExtraProperties; +import com.google.cloud.firestore.annotation.PropertyName; + +/** + * POJO corresponding to the User details document in Firestore. Currently, we only need projectId + * but can be expanded in the future. + */ +@IgnoreExtraProperties +public class FirebaseUserDetails { + + protected static final String DEFAULT_PROJECT_ID = "radar-firebase-default-project"; + + private String projectId; + + public FirebaseUserDetails() { + this.projectId = DEFAULT_PROJECT_ID; + } + + @PropertyName("project_id") + public String getProjectId() { + return projectId; + } + + @PropertyName("project_id") + public void setProjectId(String projectId) { + if (projectId != null && !projectId.trim().isEmpty()) { + this.projectId = projectId; + } + } +} diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUserRepository.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUserRepository.java new file mode 100644 index 00000000..47551785 --- /dev/null +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUserRepository.java @@ -0,0 +1,92 @@ +package org.radarbase.connect.rest.fitbit.user.firebase; + +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.firestore.CollectionReference; +import com.google.cloud.firestore.DocumentReference; +import com.google.cloud.firestore.DocumentSnapshot; +import com.google.cloud.firestore.Firestore; +import com.google.cloud.firestore.SetOptions; +import com.google.firebase.FirebaseApp; +import com.google.firebase.FirebaseOptions; +import com.google.firebase.cloud.FirestoreClient; +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.ws.rs.NotAuthorizedException; +import org.radarbase.connect.rest.RestSourceConnectorConfig; +import org.radarbase.connect.rest.fitbit.user.UserRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base class for Firebase User repositories. Initialises the Firebase Admin SDK for transactions + * with Firebase. + */ +public abstract class FirebaseUserRepository implements UserRepository { + + private static final Logger logger = LoggerFactory.getLogger(FirebaseUserRepository.class); + + @Override + public void initialize(RestSourceConnectorConfig config) { + + // The path to the credentials file should be provided by + // the GOOGLE_APPLICATION_CREDENTIALS env var. + // See https://firebase.google.com/docs/admin/setup#initialize-sdk for more details. + FirebaseOptions options; + try { + options = + new FirebaseOptions.Builder() + .setCredentials(GoogleCredentials.getApplicationDefault()) + .build(); + } catch (IOException exc) { + logger.error("Failed to get credentials for Firebase app.", exc); + throw new IllegalStateException(exc); + } + + try { + FirebaseApp.initializeApp(options); + } catch (IllegalStateException exc) { + logger.warn("Firebase app was already initialised. {}", exc.getMessage()); + } + } + + protected Firestore getFirestore() { + return FirestoreClient.getFirestore(); + } + + /** + * Get a document from a Collection in Firestore. + * + * @param key The document ID to pull from the collection + * @param collection The collection reference to query + * @return the document + * @throws IOException If there was a problem getting the document + */ + protected DocumentSnapshot getDocument(String key, CollectionReference collection) + throws IOException { + DocumentSnapshot documentSnapshot; + try { + documentSnapshot = collection.document(key).get().get(20, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new IOException(e); + } + return documentSnapshot; + } + + /** + * Writes the specified object to a Firestore document. + * + * @param documentReference document reference for the document to be updated + * @param object The POJO to write to the document + * @throws IOException If there was a problem updating the document + */ + protected synchronized void updateDocument(DocumentReference documentReference, Object object) + throws IOException { + try { + documentReference.set(object, SetOptions.merge()).get(20, TimeUnit.SECONDS); + } catch (InterruptedException | TimeoutException | ExecutionException e) { + throw new IOException(e); + } + } +} diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FitbitOAuth2UserCredentials.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FitbitOAuth2UserCredentials.java new file mode 100644 index 00000000..80e491f5 --- /dev/null +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FitbitOAuth2UserCredentials.java @@ -0,0 +1,131 @@ +package org.radarbase.connect.rest.fitbit.user.firebase; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.cloud.firestore.annotation.Exclude; +import com.google.cloud.firestore.annotation.IgnoreExtraProperties; +import com.google.cloud.firestore.annotation.PropertyName; +import java.time.Duration; +import java.time.Instant; + +/** + * Fitbit Credentials POJO to help serialize and deserialize fitbit token data obtained using + * Firebase SDK. Mostly similar to {@link + * org.radarbase.connect.rest.fitbit.user.OAuth2UserCredentials} but adds all the properties + * returned by fitbit when refreshing tokens. Also, add public setter/getter for all properties so + * that can be used without jackson because Firebase SDK uses its own custom mapper based on {@link + * PropertyName} annotation. Note this is still compatible with Jackson library. + * + *

see {@link com.google.cloud.firestore.CustomClassMapper.BeanMapper} for more details. + */ +@IgnoreExtraProperties +@JsonIgnoreProperties(ignoreUnknown = true) +public class FitbitOAuth2UserCredentials { + protected static final Duration DEFAULT_EXPIRY = Duration.ofHours(1); + protected static final Duration EXPIRY_TIME_MARGIN = Duration.ofMinutes(5); + protected Instant expiresAt; + + @JsonProperty("access_token") + private String accessToken; + + @JsonProperty("refresh_token") + private String refreshToken; + + @JsonProperty("expires_in") + private Long expiresIn; + + @JsonProperty private String scope; + + @JsonProperty("token_type") + private String tokenType; + + @JsonProperty("user_id") + private String userId; + + public FitbitOAuth2UserCredentials() {} + + @Exclude + @JsonIgnore + public static Instant getExpiresAt(Duration expiresIn) { + return Instant.now().plus(expiresIn).minus(EXPIRY_TIME_MARGIN); + } + + @PropertyName("access_token") + public String getAccessToken() { + return accessToken; + } + + @PropertyName("access_token") + public void setAccessToken(String accessToken) { + this.accessToken = accessToken; + if (expiresAt == null) { + expiresAt = getExpiresAt(DEFAULT_EXPIRY); + } + } + + @PropertyName("refresh_token") + public String getRefreshToken() { + return refreshToken; + } + + @PropertyName("refresh_token") + public void setRefreshToken(String refreshToken) { + this.refreshToken = refreshToken; + } + + @PropertyName("expires_in") + public Long getExpiresIn() { + return expiresIn; + } + + @PropertyName("expires_in") + public void setExpiresIn(Long expiresIn) { + this.expiresIn = expiresIn; + this.expiresAt = + getExpiresAt( + expiresIn != null && expiresIn > 0L ? Duration.ofSeconds(expiresIn) : DEFAULT_EXPIRY); + } + + @PropertyName("scope") + public String getScope() { + return scope; + } + + @PropertyName("scope") + public void setScope(String scope) { + this.scope = scope; + } + + @PropertyName("token_type") + public String getTokenType() { + return tokenType; + } + + @PropertyName("token_type") + public void setTokenType(String tokenType) { + this.tokenType = tokenType; + } + + @PropertyName("user_id") + public String getUserId() { + return userId; + } + + @PropertyName("user_id") + public void setUserId(String userId) { + this.userId = userId; + } + + @Exclude + @JsonIgnore + public Boolean isAccessTokenExpired() { + return expiresAt == null || Instant.now().isAfter(expiresAt); + } + + @Exclude + @JsonIgnore + public boolean hasRefreshToken() { + return refreshToken != null && !refreshToken.isEmpty(); + } +} diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FitbitTokenService.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FitbitTokenService.java new file mode 100644 index 00000000..10ecacd6 --- /dev/null +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FitbitTokenService.java @@ -0,0 +1,87 @@ +package org.radarbase.connect.rest.fitbit.user.firebase; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import javax.ws.rs.NotAuthorizedException; +import okhttp3.Credentials; +import okhttp3.FormBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.ResponseBody; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FitbitTokenService { + + private static final Logger logger = LoggerFactory.getLogger(FitbitTokenService.class); + private final OkHttpClient client; + private final ObjectMapper mapper; + private String clientId; + private String clientSecret; + private String tokenEndpoint; + + public FitbitTokenService(String clientId, String clientSecret, String tokenEndpoint) { + this.clientId = clientId; + this.clientSecret = clientSecret; + this.tokenEndpoint = tokenEndpoint; + this.mapper = new ObjectMapper().disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + this.client = + new OkHttpClient.Builder() + .connectTimeout(20, TimeUnit.SECONDS) + .writeTimeout(20, TimeUnit.SECONDS) + .readTimeout(30, TimeUnit.SECONDS) + .build(); + } + + public FitbitOAuth2UserCredentials processTokenRequest(FormBody form) + throws NotAuthorizedException, IOException { + String credentials = Credentials.basic(clientId, clientSecret); + + Request request = + new Request.Builder() + .addHeader("Accept", "application/json") + .addHeader("Authorization", credentials) + .addHeader("Content-Type", "application/x-www-form-urlencoded") + .url(tokenEndpoint) + .post(form) + .build(); + + try (Response response = client.newCall(request).execute()) { + if (response.isSuccessful()) { + ResponseBody responseBody = response.body(); + if (responseBody == null) { + logger.error("Got empty response body"); + throw new IOException("No response from server"); + } + + try { + String responseBodyStr = responseBody.string(); + logger.debug("Response: {}", responseBodyStr); + return mapper.readValue(responseBodyStr, FitbitOAuth2UserCredentials.class); + } catch (IOException e) { + logger.error("Error when deserializing response.", e); + throw new NotAuthorizedException("Cannot read token response", e); + } + } else { + throw new NotAuthorizedException( + "Failed to execute the request : Response-code :" + + response.code() + + " received when requesting token from server with " + + "message " + + response.message()); + } + } + } + + public synchronized FitbitOAuth2UserCredentials refreshToken(String oldToken) throws IOException { + FormBody form = + new FormBody.Builder() + .add("grant_type", "refresh_token") + .add("refresh_token", oldToken) + .build(); + return this.processTokenRequest(form); + } +} From a26ca00ab9e6e10db3ee529b2861070cf338855d Mon Sep 17 00:00:00 2001 From: yatharthranjan Date: Fri, 1 May 2020 20:58:55 +0100 Subject: [PATCH 6/7] code cleanup --- .../rest/fitbit/user/firebase/FirebaseUserRepository.java | 3 +-- .../connect/rest/fitbit/user/firebase/FitbitTokenService.java | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUserRepository.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUserRepository.java index 47551785..198dccce 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUserRepository.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUserRepository.java @@ -13,7 +13,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import javax.ws.rs.NotAuthorizedException; import org.radarbase.connect.rest.RestSourceConnectorConfig; import org.radarbase.connect.rest.fitbit.user.UserRepository; import org.slf4j.Logger; @@ -81,7 +80,7 @@ protected DocumentSnapshot getDocument(String key, CollectionReference collectio * @param object The POJO to write to the document * @throws IOException If there was a problem updating the document */ - protected synchronized void updateDocument(DocumentReference documentReference, Object object) + protected void updateDocument(DocumentReference documentReference, Object object) throws IOException { try { documentReference.set(object, SetOptions.merge()).get(20, TimeUnit.SECONDS); diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FitbitTokenService.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FitbitTokenService.java index 10ecacd6..97f3311d 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FitbitTokenService.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FitbitTokenService.java @@ -76,7 +76,7 @@ public FitbitOAuth2UserCredentials processTokenRequest(FormBody form) } } - public synchronized FitbitOAuth2UserCredentials refreshToken(String oldToken) throws IOException { + public FitbitOAuth2UserCredentials refreshToken(String oldToken) throws IOException { FormBody form = new FormBody.Builder() .add("grant_type", "refresh_token") From 6bafb928a417dbf59b27f84a4722484a6b08e6c2 Mon Sep 17 00:00:00 2001 From: yatharthranjan Date: Mon, 4 May 2020 16:17:55 +0100 Subject: [PATCH 7/7] Bump minor version --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 42d5d120..6dff616c 100644 --- a/build.gradle +++ b/build.gradle @@ -11,7 +11,7 @@ subprojects { apply plugin: 'java-library' group = 'org.radarbase' - version = '0.2.5-SNAPSHOT' + version = '0.3.0' sourceCompatibility = 1.8 targetCompatibility = 1.8